add admin commands for system, users, gc, audit, security
All checks were successful
CI / Lint & Check (push) Successful in 11s

Phase 6 implementation:
- system: version, auth mode, storage volumes
- gc, gc-run: schedule, history, manual trigger
- users, user-create, user-delete, user-admin, user-passwd
- audit, audit-purge: view and purge audit logs
- cve-allow: manage system CVE allowlist
- scan-all: trigger system-wide vulnerability scan

Helpers: paginated_request(), confirm_action(), format_size()
This commit is contained in:
Username
2026-01-18 19:10:47 +01:00
parent 11f7bb5b7a
commit 969f0a5207
9 changed files with 1198 additions and 45 deletions

View File

@@ -14,13 +14,18 @@ Command-line interface for managing Harbor container registry. Provides quick ac
- Tag management - Tag management
- Artifact deletion - Artifact deletion
- Project configuration (auto-scan, auto-sbom) - Project configuration (auto-scan, auto-sbom)
- System information and storage status
- Garbage collection (status, trigger)
- User management (list, create, delete, admin privileges, password reset)
- Audit log viewing and purging
- CVE allowlist management
- System-wide vulnerability scanning
### Out of Scope ### Out of Scope
- User/group management (admin-only, rare) - LDAP/OIDC group management (external provider)
- System configuration (one-time setup)
- Replication rule creation (complex, rare) - Replication rule creation (complex, rare)
- Garbage collection triggers (scheduled) - Registry endpoint management (rare setup task)
## Success Criteria ## Success Criteria

View File

@@ -44,10 +44,10 @@ Additional functionality:
- [ ] `labels` command - manage artifact labels - [ ] `labels` command - manage artifact labels
- [ ] `copy` command - copy artifact between repos - [ ] `copy` command - copy artifact between repos
- [ ] `gc` command - show garbage collection status - [x] `gc` command - show garbage collection status
- [ ] `replication` command - list replication rules/executions - [ ] `replication` command - list replication rules/executions
- [ ] `quota` command - show project quota usage - [ ] `quota` command - show project quota usage
- [ ] `audit` command - show audit logs - [x] `audit` command - show audit logs
- [ ] Partial digest tab completion - [ ] Partial digest tab completion
## Phase 5: Distribution ## Phase 5: Distribution
@@ -59,11 +59,29 @@ Packaging and distribution:
- [ ] Man page generation - [ ] Man page generation
- [ ] Shell completions (bash/zsh/fish) - [ ] Shell completions (bash/zsh/fish)
## Phase 6: Admin (Complete)
Admin capabilities for Harbor system management:
- [x] `system` command - version, auth mode, storage volumes
- [x] `gc` command - schedule, history, manual trigger
- [x] `gc-run` command - trigger GC with dry-run/wait
- [x] `users` command - list users with pagination
- [x] `user-create` command - create local user
- [x] `user-delete` command - delete user
- [x] `user-admin` command - grant/revoke admin privileges
- [x] `user-passwd` command - reset user password
- [x] `audit` command - view audit logs with filters
- [x] `audit-purge` command - purge old audit logs
- [x] `cve-allow` command - manage system CVE allowlist
- [x] `scan-all` command - trigger system-wide vulnerability scan
## Dependencies ## Dependencies
``` ```
Phase 1 (done) ──> Phase 2 ─┬─> Phase 3 Phase 1 (done) ──> Phase 2 ─┬─> Phase 3
─> Phase 4 ──> Phase 5 ─> Phase 4 ──> Phase 5
└─> Phase 6 (done)
``` ```
Phase 3 and 4 can proceed in parallel after Phase 2. Phase 3, 4, and 6 can proceed in parallel after Phase 2.

View File

@@ -25,8 +25,11 @@ Active, prioritized work items.
| `labels` command | 4 | | `labels` command | 4 |
| `copy` command | 4 | | `copy` command | 4 |
| `quota` command | 4 | | `quota` command | 4 |
| `replication` command | 4 |
| pyproject.toml with entry points | 5 | | pyproject.toml with entry points | 5 |
| Shell completions | 5 | | Shell completions | 5 |
| `registries` command | deferred |
| `groups` command (LDAP/OIDC) | deferred |
## Completed ## Completed
@@ -41,3 +44,9 @@ Active, prioritized work items.
- [x] `--verify-ssl` flag for SSL certificate verification - [x] `--verify-ssl` flag for SSL certificate verification
- [x] Modular package structure (`src/harbor/`) - [x] Modular package structure (`src/harbor/`)
- [x] Documentation updated for new structure - [x] Documentation updated for new structure
- [x] Phase 6: Admin commands (v0.1.3)
- `system`, `gc`, `gc-run`, `users`
- `user-create`, `user-delete`, `user-admin`, `user-passwd`
- `audit`, `audit-purge`, `cve-allow`, `scan-all`
- `paginated_request()` helper
- `confirm_action()`, `format_size()`, `format_timestamp()` helpers

44
TODO.md
View File

@@ -12,20 +12,50 @@ Intake buffer for ideas, issues, and unrefined tasks.
- Health check endpoint monitoring - Health check endpoint monitoring
- Multi-registry support (switch between registries) - Multi-registry support (switch between registries)
## Deferred Admin Features
- `registries` - list configured registry endpoints
- `registry-create` - add registry endpoint
- `registry-test` - test registry connectivity
- `replication` - list replication rules/executions
- `groups` - user group management (LDAP/OIDC)
## Issues ## Issues
- SSL verification disabled globally (should be configurable) (none currently open)
- No timeout configuration for API calls
- Delete confirmation reads stdin (breaks piping)
- Partial digest matching fetches all artifacts (slow for large repos)
## Questions ## Questions
- Should `--json` output be available on all commands? - Should `--json` output be available on all commands?
- Should we support OIDC authentication? - Should we support OIDC authentication?
- Worth adding `--dry-run` for destructive operations?
## Debt ## Debt
- Error handling inconsistent across commands (none currently open)
- Some magic numbers (column widths, timeouts)
## Resolved
### Issues (fixed in v0.1.1)
- SSL verification disabled globally → added `--verify-ssl` flag
- No timeout configuration for API calls → added `--timeout` flag
- Delete confirmation reads stdin → added TTY check, requires `--force` in non-interactive mode
- Partial digest matching fetches all artifacts → limited to first 100 artifacts
### Debt (fixed in v0.1.1)
- Error handling inconsistent across commands → added `output.py` with `print_error()`
- Some magic numbers (column widths, timeouts) → added `constants.py`
### Features (added in v0.1.2)
- Artifact cleanup command (`clean`) with filters: `--untagged`, `--older-than`, `--keep`
- Dry-run mode for cleanup preview (`--dry-run`)
### Features (added in v0.1.3)
- Admin commands: `system`, `gc`, `gc-run`, `users`
- User management: `user-create`, `user-delete`, `user-admin`, `user-passwd`
- Audit and security: `audit`, `audit-purge`, `cve-allow`, `scan-all`
- Pagination helper for admin endpoints
- Confirmation helper for destructive actions

View File

@@ -6,17 +6,31 @@ import sys
from . import __version__ from . import __version__
from .commands import ( from .commands import (
cmd_artifacts, cmd_artifacts,
cmd_audit,
cmd_audit_purge,
cmd_clean,
cmd_config, cmd_config,
cmd_cve_allow,
cmd_delete, cmd_delete,
cmd_gc,
cmd_gc_run,
cmd_info, cmd_info,
cmd_projects, cmd_projects,
cmd_repos, cmd_repos,
cmd_sbom, cmd_sbom,
cmd_scan, cmd_scan,
cmd_scan_all,
cmd_system,
cmd_tags, cmd_tags,
cmd_user_admin,
cmd_user_create,
cmd_user_delete,
cmd_user_passwd,
cmd_users,
cmd_vulns, cmd_vulns,
) )
from .config import load_credentials from .config import load_credentials
from .constants import DEFAULT_TIMEOUT
def create_parser() -> argparse.ArgumentParser: def create_parser() -> argparse.ArgumentParser:
@@ -30,13 +44,19 @@ Examples:
%(prog)s repos library List repos in 'library' project %(prog)s repos library List repos in 'library' project
%(prog)s artifacts library flaskpaste List artifacts %(prog)s artifacts library flaskpaste List artifacts
%(prog)s info library flaskpaste Show latest artifact details %(prog)s info library flaskpaste Show latest artifact details
%(prog)s info library flaskpaste -d latest Show artifact by tag
%(prog)s vulns library flaskpaste -s high Show high severity vulns %(prog)s vulns library flaskpaste -s high Show high severity vulns
%(prog)s vulns library flaskpaste -P jaraco Filter by package name
%(prog)s scan library flaskpaste --wait Scan and wait for completion %(prog)s scan library flaskpaste --wait Scan and wait for completion
%(prog)s tags library flaskpaste List tags for latest artifact %(prog)s clean library flaskpaste --dry-run Preview cleanup
%(prog)s sbom library flaskpaste Show SBOM for artifact
%(prog)s config library --show Show project settings Admin commands (require admin privileges):
%(prog)s system Show system info and storage
%(prog)s gc Show GC schedule and history
%(prog)s gc-run --dry-run Trigger GC (dry run)
%(prog)s users List users
%(prog)s user-create alice --email a@b.c Create user
%(prog)s audit --operation pull View audit logs
%(prog)s cve-allow --add CVE-2024-1234 Add CVE to allowlist
%(prog)s scan-all Trigger system-wide scan
""" """
) )
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}") parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}")
@@ -44,6 +64,8 @@ Examples:
parser.add_argument("-u", "--user", help="Username", default=None) parser.add_argument("-u", "--user", help="Username", default=None)
parser.add_argument("-p", "--password", help="Password", default=None) parser.add_argument("-p", "--password", help="Password", default=None)
parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certificates (default: skip)") parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certificates (default: skip)")
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT,
help=f"API request timeout in seconds (default: {DEFAULT_TIMEOUT})")
subparsers = parser.add_subparsers(dest="command", help="Commands") subparsers = parser.add_subparsers(dest="command", help="Commands")
@@ -116,6 +138,103 @@ Examples:
p_config.add_argument("--auto-sbom", type=lambda x: x.lower() == "true", metavar="true|false", p_config.add_argument("--auto-sbom", type=lambda x: x.lower() == "true", metavar="true|false",
help="Enable/disable auto-SBOM generation on push") help="Enable/disable auto-SBOM generation on push")
# clean
p_clean = subparsers.add_parser("clean", help="Clean artifacts from repository")
p_clean.add_argument("project", help="Project name")
p_clean.add_argument("repo", help="Repository name")
p_clean.add_argument("--untagged", action="store_true", help="Only delete untagged artifacts")
p_clean.add_argument("--older-than", type=int, metavar="DAYS",
help="Only delete artifacts older than N days")
p_clean.add_argument("--keep", type=int, metavar="N",
help="Keep N most recent artifacts (applied before other filters)")
p_clean.add_argument("--dry-run", action="store_true", help="Show what would be deleted without deleting")
p_clean.add_argument("-f", "--force", action="store_true", help="Skip confirmation prompt")
# =========================================================================
# Admin Commands - Phase A: System Visibility
# =========================================================================
# system
p_system = subparsers.add_parser("system", help="Show Harbor system information")
p_system.add_argument("--json", action="store_true", help="Include raw JSON output")
# gc
p_gc = subparsers.add_parser("gc", help="Show garbage collection status")
p_gc.add_argument("--history", type=int, default=5, metavar="N",
help="Number of history entries to show (default: 5)")
p_gc.add_argument("--json", action="store_true", help="Include raw JSON output")
# gc-run
p_gc_run = subparsers.add_parser("gc-run", help="Trigger garbage collection")
p_gc_run.add_argument("--dry-run", action="store_true", help="Dry run (no actual deletion)")
p_gc_run.add_argument("--wait", action="store_true", help="Wait for GC to complete")
p_gc_run.add_argument("-f", "--force", action="store_true", help="Skip confirmation prompt")
# users
p_users = subparsers.add_parser("users", help="List Harbor users")
p_users.add_argument("--page", type=int, default=1, help="Page number (default: 1)")
p_users.add_argument("--limit", type=int, default=25, help="Results per page (default: 25)")
p_users.add_argument("--json", action="store_true", help="Include raw JSON output")
# =========================================================================
# Admin Commands - Phase B: User Management
# =========================================================================
# user-create
p_user_create = subparsers.add_parser("user-create", help="Create a new user")
p_user_create.add_argument("username", help="Username")
p_user_create.add_argument("--email", required=True, help="Email address")
p_user_create.add_argument("--password", help="Password (prompts if not provided)")
p_user_create.add_argument("--realname", help="Real name (defaults to username)")
p_user_create.add_argument("--comment", help="Comment")
p_user_create.add_argument("--admin", action="store_true", help="Grant admin privileges")
# user-delete
p_user_delete = subparsers.add_parser("user-delete", help="Delete a user")
p_user_delete.add_argument("username", help="Username to delete")
p_user_delete.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
# user-admin
p_user_admin = subparsers.add_parser("user-admin", help="Grant or revoke admin privileges")
p_user_admin.add_argument("username", help="Username")
p_user_admin.add_argument("--revoke", action="store_true", help="Revoke admin (default: grant)")
p_user_admin.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
# user-passwd
p_user_passwd = subparsers.add_parser("user-passwd", help="Reset user password")
p_user_passwd.add_argument("username", help="Username")
p_user_passwd.add_argument("--new-password", help="New password (prompts if not provided)")
# =========================================================================
# Admin Commands - Phase C: Audit and Security
# =========================================================================
# audit
p_audit = subparsers.add_parser("audit", help="View audit logs")
p_audit.add_argument("--page", type=int, default=1, help="Page number (default: 1)")
p_audit.add_argument("--limit", type=int, default=25, help="Results per page (default: 25)")
p_audit.add_argument("--operation", help="Filter by operation (create/delete/pull/push)")
p_audit.add_argument("--username", help="Filter by username")
p_audit.add_argument("--resource", help="Filter by resource")
p_audit.add_argument("--json", action="store_true", help="Include raw JSON output")
# audit-purge
p_audit_purge = subparsers.add_parser("audit-purge", help="Purge old audit logs")
p_audit_purge.add_argument("--retention", type=int, default=30, metavar="DAYS",
help="Keep logs newer than N days (default: 30)")
p_audit_purge.add_argument("--dry-run", action="store_true", help="Dry run")
p_audit_purge.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
# cve-allow
p_cve_allow = subparsers.add_parser("cve-allow", help="Manage system CVE allowlist")
p_cve_allow.add_argument("--add", metavar="CVE", help="Add CVE to allowlist")
p_cve_allow.add_argument("--remove", metavar="CVE", help="Remove CVE from allowlist")
p_cve_allow.add_argument("--json", action="store_true", help="Include raw JSON output")
# scan-all
p_scan_all = subparsers.add_parser("scan-all", help="Trigger system-wide vulnerability scan")
p_scan_all.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
return parser return parser
@@ -139,6 +258,7 @@ def main() -> int:
return 1 return 1
commands = { commands = {
# Core commands
"projects": cmd_projects, "projects": cmd_projects,
"repos": cmd_repos, "repos": cmd_repos,
"artifacts": cmd_artifacts, "artifacts": cmd_artifacts,
@@ -149,6 +269,22 @@ def main() -> int:
"tags": cmd_tags, "tags": cmd_tags,
"sbom": cmd_sbom, "sbom": cmd_sbom,
"config": cmd_config, "config": cmd_config,
"clean": cmd_clean,
# Admin - Phase A: System Visibility
"system": cmd_system,
"gc": cmd_gc,
"gc-run": cmd_gc_run,
"users": cmd_users,
# Admin - Phase B: User Management
"user-create": cmd_user_create,
"user-delete": cmd_user_delete,
"user-admin": cmd_user_admin,
"user-passwd": cmd_user_passwd,
# Admin - Phase C: Audit and Security
"audit": cmd_audit,
"audit-purge": cmd_audit_purge,
"cve-allow": cmd_cve_allow,
"scan-all": cmd_scan_all,
} }
return commands[args.command](args, user, password, url) return commands[args.command](args, user, password, url)

View File

@@ -7,6 +7,16 @@ import urllib.error
import urllib.request import urllib.request
from typing import Any from typing import Any
from .constants import (
ADMIN_MAX_PAGES,
ADMIN_PAGE_SIZE,
DEFAULT_PAGE_SIZE,
DEFAULT_TIMEOUT,
HTTP_SUCCESS,
SHA256_DIGEST_LENGTH,
SHA256_PREFIX,
)
def build_url(base: str, *parts: str, **params: str) -> str: def build_url(base: str, *parts: str, **params: str) -> str:
"""Build API URL from parts and optional query parameters.""" """Build API URL from parts and optional query parameters."""
@@ -24,6 +34,7 @@ def api_request(
method: str = "GET", method: str = "GET",
data: dict[str, Any] | None = None, data: dict[str, Any] | None = None,
verify_ssl: bool = False, verify_ssl: bool = False,
timeout: int = DEFAULT_TIMEOUT,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Make authenticated API request to Harbor. """Make authenticated API request to Harbor.
@@ -34,6 +45,7 @@ def api_request(
method: HTTP method method: HTTP method
data: Optional request body (JSON) data: Optional request body (JSON)
verify_ssl: Whether to verify SSL certificates verify_ssl: Whether to verify SSL certificates
timeout: Request timeout in seconds
Returns: Returns:
Response data as dict, or error dict with 'error' key Response data as dict, or error dict with 'error' key
@@ -54,9 +66,9 @@ def api_request(
req.data = json.dumps(data).encode() req.data = json.dumps(data).encode()
try: try:
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp: with urllib.request.urlopen(req, context=ctx, timeout=timeout) as resp:
# Handle responses with no body # Handle responses with no body
if resp.status in (200, 201, 202, 204): if resp.status in HTTP_SUCCESS:
body = resp.read().decode().strip() body = resp.read().decode().strip()
if not body: if not body:
return {"status": resp.status} return {"status": resp.status}
@@ -73,6 +85,53 @@ def api_request(
return {"error": str(e)} return {"error": str(e)}
def paginated_request(
url: str,
user: str,
password: str,
page_size: int = ADMIN_PAGE_SIZE,
max_pages: int = ADMIN_MAX_PAGES,
verify_ssl: bool = False,
timeout: int = DEFAULT_TIMEOUT,
) -> list[dict[str, Any]] | dict[str, Any]:
"""Fetch all pages from a paginated endpoint.
Args:
url: Base API URL (without pagination params)
user: Username for basic auth
password: Password for basic auth
page_size: Items per page
max_pages: Maximum pages to fetch
verify_ssl: Whether to verify SSL certificates
timeout: Request timeout in seconds
Returns:
List of all items from all pages, or error dict with 'error' key
"""
all_items: list[dict[str, Any]] = []
separator = "&" if "?" in url else "?"
for page in range(1, max_pages + 1):
page_url = f"{url}{separator}page={page}&page_size={page_size}"
data = api_request(page_url, user, password, verify_ssl=verify_ssl, timeout=timeout)
if isinstance(data, dict) and "error" in data:
return data
if not isinstance(data, list):
return {"error": "unexpected", "message": "Expected list response"}
if not data:
break
all_items.extend(data)
if len(data) < page_size:
break
return all_items
def resolve_digest( def resolve_digest(
project: str, project: str,
repo: str, repo: str,
@@ -93,6 +152,10 @@ def resolve_digest(
Returns: Returns:
Full digest string or None if not found Full digest string or None if not found
Note:
Partial digest matching is limited to the first 100 artifacts.
For repos with more artifacts, use full digest or tag name.
""" """
if not digest_or_tag: if not digest_or_tag:
# Get latest artifact # Get latest artifact
@@ -105,7 +168,7 @@ def resolve_digest(
return artifacts[0].get("digest") return artifacts[0].get("digest")
# If it looks like a full digest, use it directly # If it looks like a full digest, use it directly
if digest_or_tag.startswith("sha256:") and len(digest_or_tag) == 71: if digest_or_tag.startswith(SHA256_PREFIX) and len(digest_or_tag) == SHA256_DIGEST_LENGTH:
return digest_or_tag return digest_or_tag
# Try as tag first # Try as tag first
@@ -116,9 +179,9 @@ def resolve_digest(
if "error" not in artifact: if "error" not in artifact:
return artifact.get("digest") return artifact.get("digest")
# Try partial digest match # Try partial digest match (limited to first page for performance)
artifacts = api_request( artifacts = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts", f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?page_size={DEFAULT_PAGE_SIZE}",
user, password user, password
) )
if "error" in artifacts: if "error" in artifacts:
@@ -126,7 +189,7 @@ def resolve_digest(
for a in artifacts: for a in artifacts:
if a.get("digest", "").startswith(digest_or_tag) or \ if a.get("digest", "").startswith(digest_or_tag) or \
a.get("digest", "").startswith(f"sha256:{digest_or_tag}"): a.get("digest", "").startswith(f"{SHA256_PREFIX}{digest_or_tag}"):
return a.get("digest") return a.get("digest")
return digest_or_tag # Return as-is, let API handle error return digest_or_tag # Return as-is, let API handle error

View File

@@ -1,17 +1,49 @@
"""CLI command handlers.""" """CLI command handlers."""
import json import json
import sys
import time import time
from argparse import Namespace from argparse import Namespace
from datetime import datetime, timezone
from .client import api_request, resolve_digest from .client import api_request, paginated_request, resolve_digest
from .constants import (
ADMIN_PAGE_SIZE,
COL_ACTION,
COL_ADMIN,
COL_CREATED,
COL_CVE,
COL_DIGEST,
COL_EMAIL,
COL_OPERATION,
COL_PACKAGE,
COL_PROJECT,
COL_REPO,
COL_RESOURCE,
COL_SCHEDULE,
COL_SEVERITY,
COL_SIZE,
COL_STATUS,
COL_TAG,
COL_TAGS,
COL_TIME,
COL_USERNAME,
COL_VERSION,
DEFAULT_RESULT_LIMIT,
GC_SCHEDULE_MANUAL,
HTTP_ACCEPTED,
HTTP_CREATED,
HTTP_OK,
SCAN_POLL_INTERVAL,
)
from .output import confirm_action, format_size, format_timestamp, print_error
def cmd_projects(args: Namespace, user: str, password: str, url: str) -> int: def cmd_projects(args: Namespace, user: str, password: str, url: str) -> int:
"""List projects.""" """List projects."""
data = api_request(f"{url}/api/v2.0/projects", user, password) data = api_request(f"{url}/api/v2.0/projects", user, password)
if "error" in data: if "error" in data:
print(f"Error: {data}") print_error(data)
return 1 return 1
print(f"{'Project':<20} {'Public':<8} {'Repos':<6} {'Auto-Scan':<10} {'Auto-SBOM':<10}") print(f"{'Project':<20} {'Public':<8} {'Repos':<6} {'Auto-Scan':<10} {'Auto-SBOM':<10}")
@@ -28,7 +60,7 @@ def cmd_repos(args: Namespace, user: str, password: str, url: str) -> int:
project = args.project project = args.project
data = api_request(f"{url}/api/v2.0/projects/{project}/repositories", user, password) data = api_request(f"{url}/api/v2.0/projects/{project}/repositories", user, password)
if "error" in data: if "error" in data:
print(f"Error: {data}") print_error(data)
return 1 return 1
print(f"{'Repository':<40} {'Artifacts':<10} {'Pull Count':<12}") print(f"{'Repository':<40} {'Artifacts':<10} {'Pull Count':<12}")
@@ -48,7 +80,7 @@ def cmd_artifacts(args: Namespace, user: str, password: str, url: str) -> int:
user, password user, password
) )
if "error" in data: if "error" in data:
print(f"Error: {data}") print_error(data)
return 1 return 1
print(f"{'Digest':<20} {'Tags':<25} {'Size':<12} {'Scan Status':<15} {'Severity':<10}") print(f"{'Digest':<20} {'Tags':<25} {'Size':<12} {'Scan Status':<15} {'Severity':<10}")
@@ -84,7 +116,7 @@ def cmd_info(args: Namespace, user: str, password: str, url: str) -> int:
user, password user, password
) )
if "error" in data: if "error" in data:
print(f"Error: {data}") print_error(data)
return 1 return 1
# Basic info # Basic info
@@ -150,7 +182,7 @@ def cmd_vulns(args: Namespace, user: str, password: str, url: str) -> int:
user, password user, password
) )
if "error" in data: if "error" in data:
print(f"Error: {data}") print_error(data)
return 1 return 1
severity_filter = args.severity.upper() if args.severity else None severity_filter = args.severity.upper() if args.severity else None
@@ -217,25 +249,25 @@ def cmd_scan(args: Namespace, user: str, password: str, url: str) -> int:
user, password, method="POST" user, password, method="POST"
) )
if result.get("status") == 202: if result.get("status") == HTTP_ACCEPTED:
print(f"Scan triggered for {project}/{repo}@{digest[:19]}") print(f"Scan triggered for {project}/{repo}@{digest[:19]}")
if args.wait: if args.wait:
print("Waiting for scan to complete...", end="", flush=True) print("Waiting for scan to complete...", end="", flush=True)
max_wait = args.timeout max_wait = args.timeout
waited = 0 waited = 0
interval = 3
while waited < max_wait: while waited < max_wait:
time.sleep(interval) time.sleep(SCAN_POLL_INTERVAL)
waited += interval waited += SCAN_POLL_INTERVAL
artifact = api_request( artifact = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true", f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true",
user, password user, password
) )
if "error" in artifact: if "error" in artifact:
print(f"\nError checking status: {artifact}") print("\nError checking status: ", end="")
print_error(artifact)
return 1 return 1
scan = artifact.get("scan_overview", {}) scan = artifact.get("scan_overview", {})
@@ -258,7 +290,7 @@ def cmd_scan(args: Namespace, user: str, password: str, url: str) -> int:
return 0 return 0
else: else:
print(f"Error: {result}") print_error(result)
return 1 return 1
@@ -282,7 +314,7 @@ def cmd_delete(args: Namespace, user: str, password: str, url: str) -> int:
print(f"Deleted tag: {args.tag}") print(f"Deleted tag: {args.tag}")
return 0 return 0
else: else:
print(f"Error: {result}") print_error(result)
return 1 return 1
else: else:
# Delete artifact by digest # Delete artifact by digest
@@ -292,6 +324,9 @@ def cmd_delete(args: Namespace, user: str, password: str, url: str) -> int:
return 1 return 1
if not args.force: if not args.force:
if not sys.stdin.isatty():
print("Error: Cannot confirm deletion in non-interactive mode. Use --force.")
return 1
print(f"Will delete: {project}/{repo}@{digest[:19]}") print(f"Will delete: {project}/{repo}@{digest[:19]}")
confirm = input("Confirm deletion? [y/N]: ") confirm = input("Confirm deletion? [y/N]: ")
if confirm.lower() != "y": if confirm.lower() != "y":
@@ -306,7 +341,7 @@ def cmd_delete(args: Namespace, user: str, password: str, url: str) -> int:
print(f"Deleted: {digest[:19]}") print(f"Deleted: {digest[:19]}")
return 0 return 0
else: else:
print(f"Error: {result}") print_error(result)
return 1 return 1
@@ -326,11 +361,11 @@ def cmd_tags(args: Namespace, user: str, password: str, url: str) -> int:
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags", f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags",
user, password, method="POST", data={"name": args.add} user, password, method="POST", data={"name": args.add}
) )
if result.get("status") in (200, 201) or not result.get("error"): if result.get("status") in (HTTP_OK, HTTP_CREATED) or not result.get("error"):
print(f"Added tag: {args.add}") print(f"Added tag: {args.add}")
return 0 return 0
else: else:
print(f"Error: {result}") print_error(result)
return 1 return 1
# List tags # List tags
@@ -339,7 +374,7 @@ def cmd_tags(args: Namespace, user: str, password: str, url: str) -> int:
user, password user, password
) )
if "error" in artifact: if "error" in artifact:
print(f"Error: {artifact}") print_error(artifact)
return 1 return 1
tags = artifact.get("tags") or [] tags = artifact.get("tags") or []
@@ -362,7 +397,7 @@ def cmd_config(args: Namespace, user: str, password: str, url: str) -> int:
if args.show: if args.show:
data = api_request(f"{url}/api/v2.0/projects/{project}", user, password) data = api_request(f"{url}/api/v2.0/projects/{project}", user, password)
if "error" in data: if "error" in data:
print(f"Error: {data}") print_error(data)
return 1 return 1
meta = data.get("metadata", {}) meta = data.get("metadata", {})
@@ -391,7 +426,7 @@ def cmd_config(args: Namespace, user: str, password: str, url: str) -> int:
print(f"Updated {project}: {update['metadata']}") print(f"Updated {project}: {update['metadata']}")
return 0 return 0
else: else:
print(f"Error: {result}") print_error(result)
return 1 return 1
@@ -410,7 +445,7 @@ def cmd_sbom(args: Namespace, user: str, password: str, url: str) -> int:
user, password user, password
) )
if "error" in data: if "error" in data:
print(f"Error: {data}") print_error(data)
return 1 return 1
if args.json: if args.json:
@@ -437,3 +472,728 @@ def cmd_sbom(args: Namespace, user: str, password: str, url: str) -> int:
print(f"SBOM data: {sbom}") print(f"SBOM data: {sbom}")
return 0 return 0
def _parse_artifact_time(time_str: str | None) -> datetime | None:
"""Parse ISO 8601 timestamp from Harbor API."""
if not time_str:
return None
try:
# Handle various ISO formats Harbor may return
time_str = time_str.rstrip("Z").split(".")[0]
return datetime.fromisoformat(time_str).replace(tzinfo=timezone.utc)
except (ValueError, AttributeError):
return None
def _fetch_all_artifacts(
project: str, repo: str, user: str, password: str, url: str
) -> list[dict] | None:
"""Fetch all artifacts with pagination."""
artifacts = []
page = 1
page_size = 100
while True:
data = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts"
f"?page={page}&page_size={page_size}",
user, password
)
if "error" in data:
print_error(data)
return None
if not data:
break
artifacts.extend(data)
if len(data) < page_size:
break
page += 1
return artifacts
def cmd_clean(args: Namespace, user: str, password: str, url: str) -> int:
"""Clean artifacts from a repository.
Supports filtering by:
- Untagged artifacts (--untagged)
- Age in days (--older-than)
- Keep N most recent (--keep)
"""
project = args.project
repo = args.repo
# Fetch all artifacts
artifacts = _fetch_all_artifacts(project, repo, user, password, url)
if artifacts is None:
return 1
if not artifacts:
print("No artifacts found")
return 0
# Sort by push_time descending (newest first)
artifacts.sort(
key=lambda a: _parse_artifact_time(a.get("push_time")) or datetime.min.replace(tzinfo=timezone.utc),
reverse=True
)
# Build candidate list
candidates = []
now = datetime.now(timezone.utc)
for i, artifact in enumerate(artifacts):
digest = artifact.get("digest", "")
tags = artifact.get("tags") or []
push_time = _parse_artifact_time(artifact.get("push_time"))
# Skip if --keep and within retention count
if args.keep and i < args.keep:
continue
# Filter: untagged only
if args.untagged and tags:
continue
# Filter: older than N days
if args.older_than and push_time:
age_days = (now - push_time).days
if age_days < args.older_than:
continue
candidates.append({
"digest": digest,
"tags": [t.get("name", "") for t in tags],
"push_time": artifact.get("push_time", "N/A"),
"size": artifact.get("size", 0),
})
if not candidates:
print("No artifacts match the cleanup criteria")
return 0
# Calculate total size
total_size = sum(c["size"] for c in candidates)
size_mb = total_size / 1024 / 1024
# Display candidates
print(f"{'Digest':<20} {'Tags':<25} {'Push Time':<25} {'Size':<10}")
print("-" * 85)
for c in candidates:
digest = c["digest"][:19]
tags = ", ".join(c["tags"])[:24] or "(none)"
size = f"{c['size'] / 1024 / 1024:.1f}MB"
print(f"{digest:<20} {tags:<25} {c['push_time'][:24]:<25} {size:<10}")
print(f"\nTotal: {len(candidates)} artifact(s), {size_mb:.1f} MB")
# Dry run stops here
if args.dry_run:
print("\n(dry run - no changes made)")
return 0
# Confirm deletion
if not args.force:
if not sys.stdin.isatty():
print("\nError: Cannot confirm deletion in non-interactive mode. Use --force.")
return 1
confirm = input(f"\nDelete {len(candidates)} artifact(s)? [y/N]: ")
if confirm.lower() != "y":
print("Aborted")
return 1
# Delete artifacts
deleted = 0
failed = 0
for c in candidates:
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{c['digest']}",
user, password, method="DELETE"
)
if result.get("status") == HTTP_OK or not result.get("error"):
deleted += 1
print(f"Deleted: {c['digest'][:19]}")
else:
failed += 1
print(f"Failed: {c['digest'][:19]} - ", end="")
print_error(result)
print(f"\nDeleted: {deleted}, Failed: {failed}")
return 0 if failed == 0 else 1
# =============================================================================
# Admin Commands - Phase A: System Visibility
# =============================================================================
def cmd_system(args: Namespace, user: str, password: str, url: str) -> int:
"""Show Harbor system information."""
# Get basic system info
info = api_request(f"{url}/api/v2.0/systeminfo", user, password)
if "error" in info:
print_error(info)
return 1
print(f"Harbor Version: {info.get('harbor_version', 'N/A')}")
print(f"Registry URL: {info.get('registry_url', 'N/A')}")
print(f"Auth Mode: {info.get('auth_mode', 'N/A')}")
print(f"Project Creation: {info.get('project_creation_restriction', 'N/A')}")
print(f"Self-Registration:{info.get('self_registration', False)}")
print(f"Has CA Root: {info.get('has_ca_root', False)}")
# Get storage volumes (admin only)
volumes = api_request(f"{url}/api/v2.0/systeminfo/volumes", user, password)
if "error" not in volumes:
storage = volumes.get("storage", [])
if storage:
print("\nStorage:")
for vol in storage:
total = vol.get("total", 0)
free = vol.get("free", 0)
used = total - free
pct = (used / total * 100) if total > 0 else 0
print(f" {vol.get('name', 'unknown')}: {format_size(used)} / {format_size(total)} ({pct:.1f}% used)")
if args.json:
print("\n--- Raw JSON ---")
print(json.dumps({"systeminfo": info, "volumes": volumes}, indent=2))
return 0
def cmd_gc(args: Namespace, user: str, password: str, url: str) -> int:
"""Show garbage collection status and history."""
# Get GC schedule
schedule = api_request(f"{url}/api/v2.0/system/gc/schedule", user, password)
if "error" in schedule:
print_error(schedule)
return 1
sched_info = schedule.get("schedule", {})
print(f"Schedule Type: {sched_info.get('type', 'None')}")
if sched_info.get("cron"):
print(f"Cron: {sched_info.get('cron')}")
job_params = schedule.get("job_parameters", {})
if job_params:
print(f"Delete Untagged: {job_params.get('delete_untagged', False)}")
print(f"Dry Run: {job_params.get('dry_run', False)}")
workers = job_params.get("workers")
if workers:
print(f"Workers: {workers}")
# Get GC history
history_limit = args.history if hasattr(args, "history") and args.history else 5
history = api_request(
f"{url}/api/v2.0/system/gc?page=1&page_size={history_limit}",
user, password
)
if "error" not in history and history:
print(f"\n{'ID':<8} {'Status':<12} {'Start Time':<20} {'Duration':<12} {'Deleted':<10}")
print("-" * 65)
for job in history:
job_id = str(job.get("id", "N/A"))
status = job.get("job_status", "N/A")
start = format_timestamp(job.get("creation_time"))[:19]
# Calculate duration if end_time exists
duration = "N/A"
if job.get("update_time") and job.get("creation_time"):
try:
start_dt = datetime.fromisoformat(job["creation_time"].rstrip("Z").split(".")[0])
end_dt = datetime.fromisoformat(job["update_time"].rstrip("Z").split(".")[0])
secs = (end_dt - start_dt).total_seconds()
if secs < 60:
duration = f"{secs:.0f}s"
else:
duration = f"{secs / 60:.1f}m"
except (ValueError, KeyError):
pass
# Extract deleted count from job_parameters or extra info
deleted = "N/A"
job_params_hist = job.get("job_parameters", {})
if isinstance(job_params_hist, str):
try:
job_params_hist = json.loads(job_params_hist)
except json.JSONDecodeError:
job_params_hist = {}
print(f"{job_id:<8} {status:<12} {start:<20} {duration:<12} {deleted:<10}")
if args.json:
print("\n--- Raw JSON ---")
print(json.dumps({"schedule": schedule, "history": history}, indent=2))
return 0
def cmd_gc_run(args: Namespace, user: str, password: str, url: str) -> int:
"""Trigger garbage collection manually."""
dry_run = args.dry_run if hasattr(args, "dry_run") else False
if not dry_run and not confirm_action("Trigger garbage collection?", args.force):
print("Aborted")
return 0
# Build the schedule payload for manual trigger
payload = {
"schedule": {
"type": GC_SCHEDULE_MANUAL,
},
"parameters": {
"delete_untagged": not args.dry_run if hasattr(args, "dry_run") else True,
"dry_run": dry_run,
}
}
result = api_request(
f"{url}/api/v2.0/system/gc/schedule",
user, password, method="POST", data=payload
)
if result.get("status") in (HTTP_OK, HTTP_CREATED) or not result.get("error"):
mode = "dry run" if dry_run else "deletion"
print(f"GC triggered ({mode} mode)")
if args.wait:
print("Waiting for GC to complete...", end="", flush=True)
max_wait = 300 # 5 minutes
waited = 0
while waited < max_wait:
time.sleep(SCAN_POLL_INTERVAL)
waited += SCAN_POLL_INTERVAL
history = api_request(
f"{url}/api/v2.0/system/gc?page=1&page_size=1",
user, password
)
if "error" in history or not history:
continue
latest = history[0]
status = latest.get("job_status", "")
if status == "Success":
print(f"\nGC complete")
return 0
elif status in ("Error", "Failed", "Stopped"):
print(f"\nGC failed: {status}")
return 1
print(".", end="", flush=True)
print(f"\nTimeout after {max_wait}s - GC may still be running")
return 1
return 0
else:
print_error(result)
return 1
def cmd_users(args: Namespace, user: str, password: str, url: str) -> int:
"""List Harbor users."""
page = args.page if hasattr(args, "page") and args.page else 1
limit = args.limit if hasattr(args, "limit") and args.limit else ADMIN_PAGE_SIZE
data = api_request(
f"{url}/api/v2.0/users?page={page}&page_size={limit}",
user, password
)
if "error" in data:
print_error(data)
return 1
if not data:
print("No users found")
return 0
print(f"{'Username':<{COL_USERNAME}} {'Email':<{COL_EMAIL}} {'Admin':<{COL_ADMIN}} {'Created':<{COL_CREATED}}")
print("-" * 80)
for u in data:
username = u.get("username", "N/A")[:COL_USERNAME - 1]
email = u.get("email", "N/A")[:COL_EMAIL - 1]
admin = "Yes" if u.get("sysadmin_flag") else "No"
created = format_timestamp(u.get("creation_time"))[:COL_CREATED - 1]
print(f"{username:<{COL_USERNAME}} {email:<{COL_EMAIL}} {admin:<{COL_ADMIN}} {created:<{COL_CREATED}}")
if len(data) == limit:
print(f"\n(page {page}, use --page to see more)")
if args.json:
print("\n--- Raw JSON ---")
print(json.dumps(data, indent=2))
return 0
# =============================================================================
# Admin Commands - Phase B: User Management
# =============================================================================
def _get_user_by_name(username: str, user: str, password: str, url: str) -> dict | None:
"""Find user by username, return user dict or None."""
data = api_request(
f"{url}/api/v2.0/users/search?username={username}",
user, password
)
if "error" in data or not data:
return None
# Search returns list, find exact match
for u in data:
if u.get("username") == username:
return u
return None
def cmd_user_create(args: Namespace, user: str, password: str, url: str) -> int:
"""Create a new local user."""
username = args.username
email = args.email
# Get password from args or prompt
new_password = args.password
if not new_password:
if not sys.stdin.isatty():
print("Error: Password required in non-interactive mode. Use --password.")
return 1
import getpass
new_password = getpass.getpass(f"Password for {username}: ")
if not new_password:
print("Error: Password cannot be empty")
return 1
payload = {
"username": username,
"email": email,
"password": new_password,
"realname": args.realname if hasattr(args, "realname") and args.realname else username,
"comment": args.comment if hasattr(args, "comment") and args.comment else "",
}
result = api_request(
f"{url}/api/v2.0/users",
user, password, method="POST", data=payload
)
if result.get("status") == HTTP_CREATED or not result.get("error"):
print(f"Created user: {username}")
# Grant admin if requested
if args.admin:
created_user = _get_user_by_name(username, user, password, url)
if created_user:
admin_result = api_request(
f"{url}/api/v2.0/users/{created_user['user_id']}/sysadmin",
user, password, method="PUT", data={"sysadmin_flag": True}
)
if admin_result.get("status") == HTTP_OK or not admin_result.get("error"):
print(f"Granted admin privileges to: {username}")
else:
print(f"Warning: Failed to grant admin privileges")
print_error(admin_result)
return 0
else:
print_error(result)
return 1
def cmd_user_delete(args: Namespace, user: str, password: str, url: str) -> int:
"""Delete a user."""
username = args.username
# Find user ID
target = _get_user_by_name(username, user, password, url)
if not target:
print(f"Error: User '{username}' not found")
return 1
user_id = target.get("user_id")
if not user_id:
print("Error: Could not determine user ID")
return 1
if not confirm_action(f"Delete user '{username}' (ID: {user_id})?", args.force):
print("Aborted")
return 0
result = api_request(
f"{url}/api/v2.0/users/{user_id}",
user, password, method="DELETE"
)
if result.get("status") == HTTP_OK or not result.get("error"):
print(f"Deleted user: {username}")
return 0
else:
print_error(result)
return 1
def cmd_user_admin(args: Namespace, user: str, password: str, url: str) -> int:
"""Grant or revoke admin privileges."""
username = args.username
grant = not args.revoke if hasattr(args, "revoke") else True
# Find user ID
target = _get_user_by_name(username, user, password, url)
if not target:
print(f"Error: User '{username}' not found")
return 1
user_id = target.get("user_id")
if not user_id:
print("Error: Could not determine user ID")
return 1
action = "grant" if grant else "revoke"
if not confirm_action(f"{action.capitalize()} admin privileges for '{username}'?", args.force):
print("Aborted")
return 0
result = api_request(
f"{url}/api/v2.0/users/{user_id}/sysadmin",
user, password, method="PUT", data={"sysadmin_flag": grant}
)
if result.get("status") == HTTP_OK or not result.get("error"):
print(f"Admin privileges {action}ed for: {username}")
return 0
else:
print_error(result)
return 1
def cmd_user_passwd(args: Namespace, user: str, password: str, url: str) -> int:
"""Reset user password."""
username = args.username
# Find user ID
target = _get_user_by_name(username, user, password, url)
if not target:
print(f"Error: User '{username}' not found")
return 1
user_id = target.get("user_id")
if not user_id:
print("Error: Could not determine user ID")
return 1
# Get new password from args or prompt
new_password = args.new_password if hasattr(args, "new_password") else None
if not new_password:
if not sys.stdin.isatty():
print("Error: Password required in non-interactive mode. Use --new-password.")
return 1
import getpass
new_password = getpass.getpass(f"New password for {username}: ")
if not new_password:
print("Error: Password cannot be empty")
return 1
# Admin password reset requires old_password to be empty string
result = api_request(
f"{url}/api/v2.0/users/{user_id}/password",
user, password, method="PUT",
data={"old_password": "", "new_password": new_password}
)
if result.get("status") == HTTP_OK or not result.get("error"):
print(f"Password reset for: {username}")
return 0
else:
print_error(result)
return 1
# =============================================================================
# Admin Commands - Phase C: Audit and Security
# =============================================================================
def cmd_audit(args: Namespace, user: str, password: str, url: str) -> int:
"""View audit logs."""
page = args.page if hasattr(args, "page") and args.page else 1
limit = args.limit if hasattr(args, "limit") and args.limit else ADMIN_PAGE_SIZE
# Build query parameters
query_parts = [f"page={page}", f"page_size={limit}"]
if hasattr(args, "operation") and args.operation:
query_parts.append(f"operation={args.operation}")
if hasattr(args, "username") and args.username:
query_parts.append(f"username={args.username}")
if hasattr(args, "resource") and args.resource:
query_parts.append(f"resource={args.resource}")
query = "&".join(query_parts)
data = api_request(f"{url}/api/v2.0/audit-logs?{query}", user, password)
if "error" in data:
print_error(data)
return 1
if not data:
print("No audit logs found")
return 0
print(f"{'Time':<20} {'User':<15} {'Operation':<12} {'Resource':<30}")
print("-" * 80)
for log in data:
ts = format_timestamp(log.get("op_time"))[:19]
username = log.get("username", "N/A")[:14]
operation = log.get("operation", "N/A")[:11]
resource = log.get("resource", "N/A")[:29]
print(f"{ts:<20} {username:<15} {operation:<12} {resource:<30}")
if len(data) == limit:
print(f"\n(page {page}, use --page to see more)")
if args.json:
print("\n--- Raw JSON ---")
print(json.dumps(data, indent=2))
return 0
def cmd_audit_purge(args: Namespace, user: str, password: str, url: str) -> int:
"""Purge old audit logs."""
retention_days = args.retention if hasattr(args, "retention") and args.retention else 30
if not confirm_action(
f"Purge audit logs older than {retention_days} days?",
args.force
):
print("Aborted")
return 0
# Get current purge schedule
current = api_request(f"{url}/api/v2.0/system/purgeaudit/schedule", user, password)
# Build schedule payload
payload = {
"schedule": {
"type": GC_SCHEDULE_MANUAL,
},
"parameters": {
"audit_retention_hour": retention_days * 24,
"dry_run": args.dry_run if hasattr(args, "dry_run") else False,
"include_operations": "create,delete,pull,push",
}
}
result = api_request(
f"{url}/api/v2.0/system/purgeaudit/schedule",
user, password, method="POST", data=payload
)
if result.get("status") in (HTTP_OK, HTTP_CREATED) or not result.get("error"):
mode = "dry run" if args.dry_run else "purge"
print(f"Audit log {mode} triggered (retention: {retention_days} days)")
return 0
else:
print_error(result)
return 1
def cmd_cve_allow(args: Namespace, user: str, password: str, url: str) -> int:
"""Manage system CVE allowlist."""
# Get current allowlist
data = api_request(f"{url}/api/v2.0/system/CVEAllowlist", user, password)
if "error" in data:
print_error(data)
return 1
items = data.get("items", []) or []
# Add CVE
if hasattr(args, "add") and args.add:
cve_id = args.add.upper()
existing = {item.get("cve_id") for item in items}
if cve_id in existing:
print(f"CVE already in allowlist: {cve_id}")
return 0
items.append({"cve_id": cve_id})
result = api_request(
f"{url}/api/v2.0/system/CVEAllowlist",
user, password, method="PUT",
data={"items": items, "expires_at": data.get("expires_at")}
)
if result.get("status") == HTTP_OK or not result.get("error"):
print(f"Added to allowlist: {cve_id}")
return 0
else:
print_error(result)
return 1
# Remove CVE
if hasattr(args, "remove") and args.remove:
cve_id = args.remove.upper()
original_len = len(items)
items = [item for item in items if item.get("cve_id") != cve_id]
if len(items) == original_len:
print(f"CVE not in allowlist: {cve_id}")
return 0
result = api_request(
f"{url}/api/v2.0/system/CVEAllowlist",
user, password, method="PUT",
data={"items": items, "expires_at": data.get("expires_at")}
)
if result.get("status") == HTTP_OK or not result.get("error"):
print(f"Removed from allowlist: {cve_id}")
return 0
else:
print_error(result)
return 1
# List CVEs (default)
if not items:
print("No CVEs in system allowlist")
return 0
expires = data.get("expires_at")
if expires:
print(f"Expires: {format_timestamp(expires)}")
print(f"\nAllowed CVEs ({len(items)}):")
for item in items:
print(f" {item.get('cve_id', 'N/A')}")
if args.json:
print("\n--- Raw JSON ---")
print(json.dumps(data, indent=2))
return 0
def cmd_scan_all(args: Namespace, user: str, password: str, url: str) -> int:
"""Trigger system-wide vulnerability scan."""
if not confirm_action("Trigger system-wide vulnerability scan?", args.force):
print("Aborted")
return 0
# Build schedule payload for manual trigger
payload = {
"schedule": {
"type": GC_SCHEDULE_MANUAL,
}
}
result = api_request(
f"{url}/api/v2.0/system/scanAll/schedule",
user, password, method="POST", data=payload
)
if result.get("status") in (HTTP_OK, HTTP_CREATED) or not result.get("error"):
print("System-wide scan triggered")
print("Note: This may take a long time depending on the number of artifacts")
return 0
else:
print_error(result)
return 1

55
src/harbor/constants.py Normal file
View File

@@ -0,0 +1,55 @@
"""Constants and magic numbers."""
# Network
DEFAULT_TIMEOUT = 30
SCAN_POLL_INTERVAL = 3
SCAN_DEFAULT_TIMEOUT = 120
# Pagination
DEFAULT_PAGE_SIZE = 100
DEFAULT_RESULT_LIMIT = 50
# Display column widths
COL_DIGEST = 20
COL_TAGS = 25
COL_PROJECT = 20
COL_REPO = 40
COL_SIZE = 12
COL_STATUS = 15
COL_SEVERITY = 10
COL_CVE = 25
COL_PACKAGE = 25
COL_VERSION = 15
COL_TAG = 30
COL_TIME = 25
# Validation
SHA256_DIGEST_LENGTH = 71
SHA256_PREFIX = "sha256:"
# HTTP status codes
HTTP_SUCCESS = (200, 201, 202, 204)
HTTP_ACCEPTED = 202
HTTP_OK = 200
HTTP_CREATED = 201
# Admin display column widths
COL_USERNAME = 20
COL_EMAIL = 30
COL_ADMIN = 8
COL_CREATED = 20
COL_ACTION = 30
COL_RESOURCE = 40
COL_OPERATION = 15
COL_SCHEDULE = 20
# Admin pagination
ADMIN_PAGE_SIZE = 25
ADMIN_MAX_PAGES = 10
# GC job types
GC_JOB_TYPE = "GARBAGE_COLLECTION"
GC_SCHEDULE_MANUAL = "Manual"
# Audit log operations
AUDIT_OPERATIONS = ("create", "delete", "pull", "push", "update")

77
src/harbor/output.py Normal file
View File

@@ -0,0 +1,77 @@
"""Output formatting helpers."""
import sys
from typing import Any
def print_error(data: dict[str, Any] | str) -> None:
"""Print formatted error message.
Handles both dict responses from API (with 'error' and 'message' keys)
and plain string errors.
Args:
data: Error dict from API or error string
"""
if isinstance(data, dict):
code = data.get("error", "")
msg = data.get("message", str(data))
if code:
print(f"Error: {msg} ({code})")
else:
print(f"Error: {msg}")
else:
print(f"Error: {data}")
def confirm_action(message: str, force: bool = False) -> bool:
"""Require confirmation for destructive actions.
Args:
message: Description of the action to confirm
force: If True, skip confirmation and return True
Returns:
True if confirmed, False otherwise
"""
if force:
return True
if not sys.stdin.isatty():
print("Error: Cannot confirm in non-interactive mode. Use --force.")
return False
print(message)
response = input("Confirm? [y/N]: ")
return response.lower() == "y"
def format_size(size_bytes: int) -> str:
"""Format byte size for display.
Args:
size_bytes: Size in bytes
Returns:
Human-readable size string (e.g., "1.5 GB")
"""
for unit in ("B", "KB", "MB", "GB", "TB"):
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} PB"
def format_timestamp(ts: str | None) -> str:
"""Format ISO timestamp for display.
Args:
ts: ISO 8601 timestamp string or None
Returns:
Formatted timestamp or "N/A"
"""
if not ts:
return "N/A"
# Strip timezone suffix and fractional seconds for display
return ts.replace("T", " ").split(".")[0].rstrip("Z")