From 969f0a52078cd6021084101085c3af7c1909d7c6 Mon Sep 17 00:00:00 2001 From: Username Date: Sun, 18 Jan 2026 19:10:47 +0100 Subject: [PATCH] add admin commands for system, users, gc, audit, security Phase 6 implementation: - system: version, auth mode, storage volumes - gc, gc-run: schedule, history, manual trigger - users, user-create, user-delete, user-admin, user-passwd - audit, audit-purge: view and purge audit logs - cve-allow: manage system CVE allowlist - scan-all: trigger system-wide vulnerability scan Helpers: paginated_request(), confirm_action(), format_size() --- PROJECT.md | 11 +- ROADMAP.md | 26 +- TASKLIST.md | 9 + TODO.md | 44 ++- src/harbor/cli.py | 146 +++++++- src/harbor/client.py | 75 +++- src/harbor/commands.py | 800 +++++++++++++++++++++++++++++++++++++++- src/harbor/constants.py | 55 +++ src/harbor/output.py | 77 ++++ 9 files changed, 1198 insertions(+), 45 deletions(-) create mode 100644 src/harbor/constants.py create mode 100644 src/harbor/output.py diff --git a/PROJECT.md b/PROJECT.md index f9bcf5e..b747dcc 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -14,13 +14,18 @@ Command-line interface for managing Harbor container registry. Provides quick ac - Tag management - Artifact deletion - Project configuration (auto-scan, auto-sbom) +- System information and storage status +- Garbage collection (status, trigger) +- User management (list, create, delete, admin privileges, password reset) +- Audit log viewing and purging +- CVE allowlist management +- System-wide vulnerability scanning ### Out of Scope -- User/group management (admin-only, rare) -- System configuration (one-time setup) +- LDAP/OIDC group management (external provider) - Replication rule creation (complex, rare) -- Garbage collection triggers (scheduled) +- Registry endpoint management (rare setup task) ## Success Criteria diff --git a/ROADMAP.md b/ROADMAP.md index 130d813..2126f12 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -44,10 +44,10 @@ Additional functionality: - [ ] `labels` command - manage artifact labels - [ ] `copy` command - copy artifact between repos -- [ ] `gc` command - show garbage collection status +- [x] `gc` command - show garbage collection status - [ ] `replication` command - list replication rules/executions - [ ] `quota` command - show project quota usage -- [ ] `audit` command - show audit logs +- [x] `audit` command - show audit logs - [ ] Partial digest tab completion ## Phase 5: Distribution @@ -59,11 +59,29 @@ Packaging and distribution: - [ ] Man page generation - [ ] Shell completions (bash/zsh/fish) +## Phase 6: Admin (Complete) + +Admin capabilities for Harbor system management: + +- [x] `system` command - version, auth mode, storage volumes +- [x] `gc` command - schedule, history, manual trigger +- [x] `gc-run` command - trigger GC with dry-run/wait +- [x] `users` command - list users with pagination +- [x] `user-create` command - create local user +- [x] `user-delete` command - delete user +- [x] `user-admin` command - grant/revoke admin privileges +- [x] `user-passwd` command - reset user password +- [x] `audit` command - view audit logs with filters +- [x] `audit-purge` command - purge old audit logs +- [x] `cve-allow` command - manage system CVE allowlist +- [x] `scan-all` command - trigger system-wide vulnerability scan + ## Dependencies ``` Phase 1 (done) ──> Phase 2 ─┬─> Phase 3 - └─> Phase 4 ──> Phase 5 + ├─> Phase 4 ──> Phase 5 + └─> Phase 6 (done) ``` -Phase 3 and 4 can proceed in parallel after Phase 2. +Phase 3, 4, and 6 can proceed in parallel after Phase 2. diff --git a/TASKLIST.md b/TASKLIST.md index 2a7751e..b9f672e 100644 --- a/TASKLIST.md +++ b/TASKLIST.md @@ -25,8 +25,11 @@ Active, prioritized work items. | `labels` command | 4 | | `copy` command | 4 | | `quota` command | 4 | +| `replication` command | 4 | | pyproject.toml with entry points | 5 | | Shell completions | 5 | +| `registries` command | deferred | +| `groups` command (LDAP/OIDC) | deferred | ## Completed @@ -41,3 +44,9 @@ Active, prioritized work items. - [x] `--verify-ssl` flag for SSL certificate verification - [x] Modular package structure (`src/harbor/`) - [x] Documentation updated for new structure +- [x] Phase 6: Admin commands (v0.1.3) + - `system`, `gc`, `gc-run`, `users` + - `user-create`, `user-delete`, `user-admin`, `user-passwd` + - `audit`, `audit-purge`, `cve-allow`, `scan-all` + - `paginated_request()` helper + - `confirm_action()`, `format_size()`, `format_timestamp()` helpers diff --git a/TODO.md b/TODO.md index 9cadf16..9cddf26 100644 --- a/TODO.md +++ b/TODO.md @@ -12,20 +12,50 @@ Intake buffer for ideas, issues, and unrefined tasks. - Health check endpoint monitoring - Multi-registry support (switch between registries) +## Deferred Admin Features + +- `registries` - list configured registry endpoints +- `registry-create` - add registry endpoint +- `registry-test` - test registry connectivity +- `replication` - list replication rules/executions +- `groups` - user group management (LDAP/OIDC) + ## Issues -- SSL verification disabled globally (should be configurable) -- No timeout configuration for API calls -- Delete confirmation reads stdin (breaks piping) -- Partial digest matching fetches all artifacts (slow for large repos) +(none currently open) ## Questions - Should `--json` output be available on all commands? - Should we support OIDC authentication? -- Worth adding `--dry-run` for destructive operations? ## Debt -- Error handling inconsistent across commands -- Some magic numbers (column widths, timeouts) +(none currently open) + +## Resolved + +### Issues (fixed in v0.1.1) + +- SSL verification disabled globally → added `--verify-ssl` flag +- No timeout configuration for API calls → added `--timeout` flag +- Delete confirmation reads stdin → added TTY check, requires `--force` in non-interactive mode +- Partial digest matching fetches all artifacts → limited to first 100 artifacts + +### Debt (fixed in v0.1.1) + +- Error handling inconsistent across commands → added `output.py` with `print_error()` +- Some magic numbers (column widths, timeouts) → added `constants.py` + +### Features (added in v0.1.2) + +- Artifact cleanup command (`clean`) with filters: `--untagged`, `--older-than`, `--keep` +- Dry-run mode for cleanup preview (`--dry-run`) + +### Features (added in v0.1.3) + +- Admin commands: `system`, `gc`, `gc-run`, `users` +- User management: `user-create`, `user-delete`, `user-admin`, `user-passwd` +- Audit and security: `audit`, `audit-purge`, `cve-allow`, `scan-all` +- Pagination helper for admin endpoints +- Confirmation helper for destructive actions diff --git a/src/harbor/cli.py b/src/harbor/cli.py index 34972f4..f8bafff 100644 --- a/src/harbor/cli.py +++ b/src/harbor/cli.py @@ -6,17 +6,31 @@ import sys from . import __version__ from .commands import ( cmd_artifacts, + cmd_audit, + cmd_audit_purge, + cmd_clean, cmd_config, + cmd_cve_allow, cmd_delete, + cmd_gc, + cmd_gc_run, cmd_info, cmd_projects, cmd_repos, cmd_sbom, cmd_scan, + cmd_scan_all, + cmd_system, cmd_tags, + cmd_user_admin, + cmd_user_create, + cmd_user_delete, + cmd_user_passwd, + cmd_users, cmd_vulns, ) from .config import load_credentials +from .constants import DEFAULT_TIMEOUT def create_parser() -> argparse.ArgumentParser: @@ -30,13 +44,19 @@ Examples: %(prog)s repos library List repos in 'library' project %(prog)s artifacts library flaskpaste List artifacts %(prog)s info library flaskpaste Show latest artifact details - %(prog)s info library flaskpaste -d latest Show artifact by tag %(prog)s vulns library flaskpaste -s high Show high severity vulns - %(prog)s vulns library flaskpaste -P jaraco Filter by package name %(prog)s scan library flaskpaste --wait Scan and wait for completion - %(prog)s tags library flaskpaste List tags for latest artifact - %(prog)s sbom library flaskpaste Show SBOM for artifact - %(prog)s config library --show Show project settings + %(prog)s clean library flaskpaste --dry-run Preview cleanup + +Admin commands (require admin privileges): + %(prog)s system Show system info and storage + %(prog)s gc Show GC schedule and history + %(prog)s gc-run --dry-run Trigger GC (dry run) + %(prog)s users List users + %(prog)s user-create alice --email a@b.c Create user + %(prog)s audit --operation pull View audit logs + %(prog)s cve-allow --add CVE-2024-1234 Add CVE to allowlist + %(prog)s scan-all Trigger system-wide scan """ ) parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}") @@ -44,6 +64,8 @@ Examples: parser.add_argument("-u", "--user", help="Username", default=None) parser.add_argument("-p", "--password", help="Password", default=None) parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certificates (default: skip)") + parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, + help=f"API request timeout in seconds (default: {DEFAULT_TIMEOUT})") subparsers = parser.add_subparsers(dest="command", help="Commands") @@ -116,6 +138,103 @@ Examples: p_config.add_argument("--auto-sbom", type=lambda x: x.lower() == "true", metavar="true|false", help="Enable/disable auto-SBOM generation on push") + # clean + p_clean = subparsers.add_parser("clean", help="Clean artifacts from repository") + p_clean.add_argument("project", help="Project name") + p_clean.add_argument("repo", help="Repository name") + p_clean.add_argument("--untagged", action="store_true", help="Only delete untagged artifacts") + p_clean.add_argument("--older-than", type=int, metavar="DAYS", + help="Only delete artifacts older than N days") + p_clean.add_argument("--keep", type=int, metavar="N", + help="Keep N most recent artifacts (applied before other filters)") + p_clean.add_argument("--dry-run", action="store_true", help="Show what would be deleted without deleting") + p_clean.add_argument("-f", "--force", action="store_true", help="Skip confirmation prompt") + + # ========================================================================= + # Admin Commands - Phase A: System Visibility + # ========================================================================= + + # system + p_system = subparsers.add_parser("system", help="Show Harbor system information") + p_system.add_argument("--json", action="store_true", help="Include raw JSON output") + + # gc + p_gc = subparsers.add_parser("gc", help="Show garbage collection status") + p_gc.add_argument("--history", type=int, default=5, metavar="N", + help="Number of history entries to show (default: 5)") + p_gc.add_argument("--json", action="store_true", help="Include raw JSON output") + + # gc-run + p_gc_run = subparsers.add_parser("gc-run", help="Trigger garbage collection") + p_gc_run.add_argument("--dry-run", action="store_true", help="Dry run (no actual deletion)") + p_gc_run.add_argument("--wait", action="store_true", help="Wait for GC to complete") + p_gc_run.add_argument("-f", "--force", action="store_true", help="Skip confirmation prompt") + + # users + p_users = subparsers.add_parser("users", help="List Harbor users") + p_users.add_argument("--page", type=int, default=1, help="Page number (default: 1)") + p_users.add_argument("--limit", type=int, default=25, help="Results per page (default: 25)") + p_users.add_argument("--json", action="store_true", help="Include raw JSON output") + + # ========================================================================= + # Admin Commands - Phase B: User Management + # ========================================================================= + + # user-create + p_user_create = subparsers.add_parser("user-create", help="Create a new user") + p_user_create.add_argument("username", help="Username") + p_user_create.add_argument("--email", required=True, help="Email address") + p_user_create.add_argument("--password", help="Password (prompts if not provided)") + p_user_create.add_argument("--realname", help="Real name (defaults to username)") + p_user_create.add_argument("--comment", help="Comment") + p_user_create.add_argument("--admin", action="store_true", help="Grant admin privileges") + + # user-delete + p_user_delete = subparsers.add_parser("user-delete", help="Delete a user") + p_user_delete.add_argument("username", help="Username to delete") + p_user_delete.add_argument("-f", "--force", action="store_true", help="Skip confirmation") + + # user-admin + p_user_admin = subparsers.add_parser("user-admin", help="Grant or revoke admin privileges") + p_user_admin.add_argument("username", help="Username") + p_user_admin.add_argument("--revoke", action="store_true", help="Revoke admin (default: grant)") + p_user_admin.add_argument("-f", "--force", action="store_true", help="Skip confirmation") + + # user-passwd + p_user_passwd = subparsers.add_parser("user-passwd", help="Reset user password") + p_user_passwd.add_argument("username", help="Username") + p_user_passwd.add_argument("--new-password", help="New password (prompts if not provided)") + + # ========================================================================= + # Admin Commands - Phase C: Audit and Security + # ========================================================================= + + # audit + p_audit = subparsers.add_parser("audit", help="View audit logs") + p_audit.add_argument("--page", type=int, default=1, help="Page number (default: 1)") + p_audit.add_argument("--limit", type=int, default=25, help="Results per page (default: 25)") + p_audit.add_argument("--operation", help="Filter by operation (create/delete/pull/push)") + p_audit.add_argument("--username", help="Filter by username") + p_audit.add_argument("--resource", help="Filter by resource") + p_audit.add_argument("--json", action="store_true", help="Include raw JSON output") + + # audit-purge + p_audit_purge = subparsers.add_parser("audit-purge", help="Purge old audit logs") + p_audit_purge.add_argument("--retention", type=int, default=30, metavar="DAYS", + help="Keep logs newer than N days (default: 30)") + p_audit_purge.add_argument("--dry-run", action="store_true", help="Dry run") + p_audit_purge.add_argument("-f", "--force", action="store_true", help="Skip confirmation") + + # cve-allow + p_cve_allow = subparsers.add_parser("cve-allow", help="Manage system CVE allowlist") + p_cve_allow.add_argument("--add", metavar="CVE", help="Add CVE to allowlist") + p_cve_allow.add_argument("--remove", metavar="CVE", help="Remove CVE from allowlist") + p_cve_allow.add_argument("--json", action="store_true", help="Include raw JSON output") + + # scan-all + p_scan_all = subparsers.add_parser("scan-all", help="Trigger system-wide vulnerability scan") + p_scan_all.add_argument("-f", "--force", action="store_true", help="Skip confirmation") + return parser @@ -139,6 +258,7 @@ def main() -> int: return 1 commands = { + # Core commands "projects": cmd_projects, "repos": cmd_repos, "artifacts": cmd_artifacts, @@ -149,6 +269,22 @@ def main() -> int: "tags": cmd_tags, "sbom": cmd_sbom, "config": cmd_config, + "clean": cmd_clean, + # Admin - Phase A: System Visibility + "system": cmd_system, + "gc": cmd_gc, + "gc-run": cmd_gc_run, + "users": cmd_users, + # Admin - Phase B: User Management + "user-create": cmd_user_create, + "user-delete": cmd_user_delete, + "user-admin": cmd_user_admin, + "user-passwd": cmd_user_passwd, + # Admin - Phase C: Audit and Security + "audit": cmd_audit, + "audit-purge": cmd_audit_purge, + "cve-allow": cmd_cve_allow, + "scan-all": cmd_scan_all, } return commands[args.command](args, user, password, url) diff --git a/src/harbor/client.py b/src/harbor/client.py index 0bef125..f05972b 100644 --- a/src/harbor/client.py +++ b/src/harbor/client.py @@ -7,6 +7,16 @@ import urllib.error import urllib.request from typing import Any +from .constants import ( + ADMIN_MAX_PAGES, + ADMIN_PAGE_SIZE, + DEFAULT_PAGE_SIZE, + DEFAULT_TIMEOUT, + HTTP_SUCCESS, + SHA256_DIGEST_LENGTH, + SHA256_PREFIX, +) + def build_url(base: str, *parts: str, **params: str) -> str: """Build API URL from parts and optional query parameters.""" @@ -24,6 +34,7 @@ def api_request( method: str = "GET", data: dict[str, Any] | None = None, verify_ssl: bool = False, + timeout: int = DEFAULT_TIMEOUT, ) -> dict[str, Any]: """Make authenticated API request to Harbor. @@ -34,6 +45,7 @@ def api_request( method: HTTP method data: Optional request body (JSON) verify_ssl: Whether to verify SSL certificates + timeout: Request timeout in seconds Returns: Response data as dict, or error dict with 'error' key @@ -54,9 +66,9 @@ def api_request( req.data = json.dumps(data).encode() try: - with urllib.request.urlopen(req, context=ctx, timeout=30) as resp: + with urllib.request.urlopen(req, context=ctx, timeout=timeout) as resp: # Handle responses with no body - if resp.status in (200, 201, 202, 204): + if resp.status in HTTP_SUCCESS: body = resp.read().decode().strip() if not body: return {"status": resp.status} @@ -73,6 +85,53 @@ def api_request( return {"error": str(e)} +def paginated_request( + url: str, + user: str, + password: str, + page_size: int = ADMIN_PAGE_SIZE, + max_pages: int = ADMIN_MAX_PAGES, + verify_ssl: bool = False, + timeout: int = DEFAULT_TIMEOUT, +) -> list[dict[str, Any]] | dict[str, Any]: + """Fetch all pages from a paginated endpoint. + + Args: + url: Base API URL (without pagination params) + user: Username for basic auth + password: Password for basic auth + page_size: Items per page + max_pages: Maximum pages to fetch + verify_ssl: Whether to verify SSL certificates + timeout: Request timeout in seconds + + Returns: + List of all items from all pages, or error dict with 'error' key + """ + all_items: list[dict[str, Any]] = [] + separator = "&" if "?" in url else "?" + + for page in range(1, max_pages + 1): + page_url = f"{url}{separator}page={page}&page_size={page_size}" + data = api_request(page_url, user, password, verify_ssl=verify_ssl, timeout=timeout) + + if isinstance(data, dict) and "error" in data: + return data + + if not isinstance(data, list): + return {"error": "unexpected", "message": "Expected list response"} + + if not data: + break + + all_items.extend(data) + + if len(data) < page_size: + break + + return all_items + + def resolve_digest( project: str, repo: str, @@ -93,6 +152,10 @@ def resolve_digest( Returns: Full digest string or None if not found + + Note: + Partial digest matching is limited to the first 100 artifacts. + For repos with more artifacts, use full digest or tag name. """ if not digest_or_tag: # Get latest artifact @@ -105,7 +168,7 @@ def resolve_digest( return artifacts[0].get("digest") # If it looks like a full digest, use it directly - if digest_or_tag.startswith("sha256:") and len(digest_or_tag) == 71: + if digest_or_tag.startswith(SHA256_PREFIX) and len(digest_or_tag) == SHA256_DIGEST_LENGTH: return digest_or_tag # Try as tag first @@ -116,9 +179,9 @@ def resolve_digest( if "error" not in artifact: return artifact.get("digest") - # Try partial digest match + # Try partial digest match (limited to first page for performance) artifacts = api_request( - f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts", + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?page_size={DEFAULT_PAGE_SIZE}", user, password ) if "error" in artifacts: @@ -126,7 +189,7 @@ def resolve_digest( for a in artifacts: if a.get("digest", "").startswith(digest_or_tag) or \ - a.get("digest", "").startswith(f"sha256:{digest_or_tag}"): + a.get("digest", "").startswith(f"{SHA256_PREFIX}{digest_or_tag}"): return a.get("digest") return digest_or_tag # Return as-is, let API handle error diff --git a/src/harbor/commands.py b/src/harbor/commands.py index 4f1d3c0..a3d1b97 100644 --- a/src/harbor/commands.py +++ b/src/harbor/commands.py @@ -1,17 +1,49 @@ """CLI command handlers.""" import json +import sys import time from argparse import Namespace +from datetime import datetime, timezone -from .client import api_request, resolve_digest +from .client import api_request, paginated_request, resolve_digest +from .constants import ( + ADMIN_PAGE_SIZE, + COL_ACTION, + COL_ADMIN, + COL_CREATED, + COL_CVE, + COL_DIGEST, + COL_EMAIL, + COL_OPERATION, + COL_PACKAGE, + COL_PROJECT, + COL_REPO, + COL_RESOURCE, + COL_SCHEDULE, + COL_SEVERITY, + COL_SIZE, + COL_STATUS, + COL_TAG, + COL_TAGS, + COL_TIME, + COL_USERNAME, + COL_VERSION, + DEFAULT_RESULT_LIMIT, + GC_SCHEDULE_MANUAL, + HTTP_ACCEPTED, + HTTP_CREATED, + HTTP_OK, + SCAN_POLL_INTERVAL, +) +from .output import confirm_action, format_size, format_timestamp, print_error def cmd_projects(args: Namespace, user: str, password: str, url: str) -> int: """List projects.""" data = api_request(f"{url}/api/v2.0/projects", user, password) if "error" in data: - print(f"Error: {data}") + print_error(data) return 1 print(f"{'Project':<20} {'Public':<8} {'Repos':<6} {'Auto-Scan':<10} {'Auto-SBOM':<10}") @@ -28,7 +60,7 @@ def cmd_repos(args: Namespace, user: str, password: str, url: str) -> int: project = args.project data = api_request(f"{url}/api/v2.0/projects/{project}/repositories", user, password) if "error" in data: - print(f"Error: {data}") + print_error(data) return 1 print(f"{'Repository':<40} {'Artifacts':<10} {'Pull Count':<12}") @@ -48,7 +80,7 @@ def cmd_artifacts(args: Namespace, user: str, password: str, url: str) -> int: user, password ) if "error" in data: - print(f"Error: {data}") + print_error(data) return 1 print(f"{'Digest':<20} {'Tags':<25} {'Size':<12} {'Scan Status':<15} {'Severity':<10}") @@ -84,7 +116,7 @@ def cmd_info(args: Namespace, user: str, password: str, url: str) -> int: user, password ) if "error" in data: - print(f"Error: {data}") + print_error(data) return 1 # Basic info @@ -150,7 +182,7 @@ def cmd_vulns(args: Namespace, user: str, password: str, url: str) -> int: user, password ) if "error" in data: - print(f"Error: {data}") + print_error(data) return 1 severity_filter = args.severity.upper() if args.severity else None @@ -217,25 +249,25 @@ def cmd_scan(args: Namespace, user: str, password: str, url: str) -> int: user, password, method="POST" ) - if result.get("status") == 202: + if result.get("status") == HTTP_ACCEPTED: print(f"Scan triggered for {project}/{repo}@{digest[:19]}") if args.wait: print("Waiting for scan to complete...", end="", flush=True) max_wait = args.timeout waited = 0 - interval = 3 while waited < max_wait: - time.sleep(interval) - waited += interval + time.sleep(SCAN_POLL_INTERVAL) + waited += SCAN_POLL_INTERVAL artifact = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true", user, password ) if "error" in artifact: - print(f"\nError checking status: {artifact}") + print("\nError checking status: ", end="") + print_error(artifact) return 1 scan = artifact.get("scan_overview", {}) @@ -258,7 +290,7 @@ def cmd_scan(args: Namespace, user: str, password: str, url: str) -> int: return 0 else: - print(f"Error: {result}") + print_error(result) return 1 @@ -282,7 +314,7 @@ def cmd_delete(args: Namespace, user: str, password: str, url: str) -> int: print(f"Deleted tag: {args.tag}") return 0 else: - print(f"Error: {result}") + print_error(result) return 1 else: # Delete artifact by digest @@ -292,6 +324,9 @@ def cmd_delete(args: Namespace, user: str, password: str, url: str) -> int: return 1 if not args.force: + if not sys.stdin.isatty(): + print("Error: Cannot confirm deletion in non-interactive mode. Use --force.") + return 1 print(f"Will delete: {project}/{repo}@{digest[:19]}") confirm = input("Confirm deletion? [y/N]: ") if confirm.lower() != "y": @@ -306,7 +341,7 @@ def cmd_delete(args: Namespace, user: str, password: str, url: str) -> int: print(f"Deleted: {digest[:19]}") return 0 else: - print(f"Error: {result}") + print_error(result) return 1 @@ -326,11 +361,11 @@ def cmd_tags(args: Namespace, user: str, password: str, url: str) -> int: f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags", user, password, method="POST", data={"name": args.add} ) - if result.get("status") in (200, 201) or not result.get("error"): + if result.get("status") in (HTTP_OK, HTTP_CREATED) or not result.get("error"): print(f"Added tag: {args.add}") return 0 else: - print(f"Error: {result}") + print_error(result) return 1 # List tags @@ -339,7 +374,7 @@ def cmd_tags(args: Namespace, user: str, password: str, url: str) -> int: user, password ) if "error" in artifact: - print(f"Error: {artifact}") + print_error(artifact) return 1 tags = artifact.get("tags") or [] @@ -362,7 +397,7 @@ def cmd_config(args: Namespace, user: str, password: str, url: str) -> int: if args.show: data = api_request(f"{url}/api/v2.0/projects/{project}", user, password) if "error" in data: - print(f"Error: {data}") + print_error(data) return 1 meta = data.get("metadata", {}) @@ -391,7 +426,7 @@ def cmd_config(args: Namespace, user: str, password: str, url: str) -> int: print(f"Updated {project}: {update['metadata']}") return 0 else: - print(f"Error: {result}") + print_error(result) return 1 @@ -410,7 +445,7 @@ def cmd_sbom(args: Namespace, user: str, password: str, url: str) -> int: user, password ) if "error" in data: - print(f"Error: {data}") + print_error(data) return 1 if args.json: @@ -437,3 +472,728 @@ def cmd_sbom(args: Namespace, user: str, password: str, url: str) -> int: print(f"SBOM data: {sbom}") return 0 + + +def _parse_artifact_time(time_str: str | None) -> datetime | None: + """Parse ISO 8601 timestamp from Harbor API.""" + if not time_str: + return None + try: + # Handle various ISO formats Harbor may return + time_str = time_str.rstrip("Z").split(".")[0] + return datetime.fromisoformat(time_str).replace(tzinfo=timezone.utc) + except (ValueError, AttributeError): + return None + + +def _fetch_all_artifacts( + project: str, repo: str, user: str, password: str, url: str +) -> list[dict] | None: + """Fetch all artifacts with pagination.""" + artifacts = [] + page = 1 + page_size = 100 + + while True: + data = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts" + f"?page={page}&page_size={page_size}", + user, password + ) + if "error" in data: + print_error(data) + return None + if not data: + break + artifacts.extend(data) + if len(data) < page_size: + break + page += 1 + + return artifacts + + +def cmd_clean(args: Namespace, user: str, password: str, url: str) -> int: + """Clean artifacts from a repository. + + Supports filtering by: + - Untagged artifacts (--untagged) + - Age in days (--older-than) + - Keep N most recent (--keep) + """ + project = args.project + repo = args.repo + + # Fetch all artifacts + artifacts = _fetch_all_artifacts(project, repo, user, password, url) + if artifacts is None: + return 1 + + if not artifacts: + print("No artifacts found") + return 0 + + # Sort by push_time descending (newest first) + artifacts.sort( + key=lambda a: _parse_artifact_time(a.get("push_time")) or datetime.min.replace(tzinfo=timezone.utc), + reverse=True + ) + + # Build candidate list + candidates = [] + now = datetime.now(timezone.utc) + + for i, artifact in enumerate(artifacts): + digest = artifact.get("digest", "") + tags = artifact.get("tags") or [] + push_time = _parse_artifact_time(artifact.get("push_time")) + + # Skip if --keep and within retention count + if args.keep and i < args.keep: + continue + + # Filter: untagged only + if args.untagged and tags: + continue + + # Filter: older than N days + if args.older_than and push_time: + age_days = (now - push_time).days + if age_days < args.older_than: + continue + + candidates.append({ + "digest": digest, + "tags": [t.get("name", "") for t in tags], + "push_time": artifact.get("push_time", "N/A"), + "size": artifact.get("size", 0), + }) + + if not candidates: + print("No artifacts match the cleanup criteria") + return 0 + + # Calculate total size + total_size = sum(c["size"] for c in candidates) + size_mb = total_size / 1024 / 1024 + + # Display candidates + print(f"{'Digest':<20} {'Tags':<25} {'Push Time':<25} {'Size':<10}") + print("-" * 85) + for c in candidates: + digest = c["digest"][:19] + tags = ", ".join(c["tags"])[:24] or "(none)" + size = f"{c['size'] / 1024 / 1024:.1f}MB" + print(f"{digest:<20} {tags:<25} {c['push_time'][:24]:<25} {size:<10}") + + print(f"\nTotal: {len(candidates)} artifact(s), {size_mb:.1f} MB") + + # Dry run stops here + if args.dry_run: + print("\n(dry run - no changes made)") + return 0 + + # Confirm deletion + if not args.force: + if not sys.stdin.isatty(): + print("\nError: Cannot confirm deletion in non-interactive mode. Use --force.") + return 1 + confirm = input(f"\nDelete {len(candidates)} artifact(s)? [y/N]: ") + if confirm.lower() != "y": + print("Aborted") + return 1 + + # Delete artifacts + deleted = 0 + failed = 0 + for c in candidates: + result = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{c['digest']}", + user, password, method="DELETE" + ) + if result.get("status") == HTTP_OK or not result.get("error"): + deleted += 1 + print(f"Deleted: {c['digest'][:19]}") + else: + failed += 1 + print(f"Failed: {c['digest'][:19]} - ", end="") + print_error(result) + + print(f"\nDeleted: {deleted}, Failed: {failed}") + return 0 if failed == 0 else 1 + + +# ============================================================================= +# Admin Commands - Phase A: System Visibility +# ============================================================================= + + +def cmd_system(args: Namespace, user: str, password: str, url: str) -> int: + """Show Harbor system information.""" + # Get basic system info + info = api_request(f"{url}/api/v2.0/systeminfo", user, password) + if "error" in info: + print_error(info) + return 1 + + print(f"Harbor Version: {info.get('harbor_version', 'N/A')}") + print(f"Registry URL: {info.get('registry_url', 'N/A')}") + print(f"Auth Mode: {info.get('auth_mode', 'N/A')}") + print(f"Project Creation: {info.get('project_creation_restriction', 'N/A')}") + print(f"Self-Registration:{info.get('self_registration', False)}") + print(f"Has CA Root: {info.get('has_ca_root', False)}") + + # Get storage volumes (admin only) + volumes = api_request(f"{url}/api/v2.0/systeminfo/volumes", user, password) + if "error" not in volumes: + storage = volumes.get("storage", []) + if storage: + print("\nStorage:") + for vol in storage: + total = vol.get("total", 0) + free = vol.get("free", 0) + used = total - free + pct = (used / total * 100) if total > 0 else 0 + print(f" {vol.get('name', 'unknown')}: {format_size(used)} / {format_size(total)} ({pct:.1f}% used)") + + if args.json: + print("\n--- Raw JSON ---") + print(json.dumps({"systeminfo": info, "volumes": volumes}, indent=2)) + + return 0 + + +def cmd_gc(args: Namespace, user: str, password: str, url: str) -> int: + """Show garbage collection status and history.""" + # Get GC schedule + schedule = api_request(f"{url}/api/v2.0/system/gc/schedule", user, password) + if "error" in schedule: + print_error(schedule) + return 1 + + sched_info = schedule.get("schedule", {}) + print(f"Schedule Type: {sched_info.get('type', 'None')}") + if sched_info.get("cron"): + print(f"Cron: {sched_info.get('cron')}") + + job_params = schedule.get("job_parameters", {}) + if job_params: + print(f"Delete Untagged: {job_params.get('delete_untagged', False)}") + print(f"Dry Run: {job_params.get('dry_run', False)}") + workers = job_params.get("workers") + if workers: + print(f"Workers: {workers}") + + # Get GC history + history_limit = args.history if hasattr(args, "history") and args.history else 5 + history = api_request( + f"{url}/api/v2.0/system/gc?page=1&page_size={history_limit}", + user, password + ) + if "error" not in history and history: + print(f"\n{'ID':<8} {'Status':<12} {'Start Time':<20} {'Duration':<12} {'Deleted':<10}") + print("-" * 65) + for job in history: + job_id = str(job.get("id", "N/A")) + status = job.get("job_status", "N/A") + start = format_timestamp(job.get("creation_time"))[:19] + + # Calculate duration if end_time exists + duration = "N/A" + if job.get("update_time") and job.get("creation_time"): + try: + start_dt = datetime.fromisoformat(job["creation_time"].rstrip("Z").split(".")[0]) + end_dt = datetime.fromisoformat(job["update_time"].rstrip("Z").split(".")[0]) + secs = (end_dt - start_dt).total_seconds() + if secs < 60: + duration = f"{secs:.0f}s" + else: + duration = f"{secs / 60:.1f}m" + except (ValueError, KeyError): + pass + + # Extract deleted count from job_parameters or extra info + deleted = "N/A" + job_params_hist = job.get("job_parameters", {}) + if isinstance(job_params_hist, str): + try: + job_params_hist = json.loads(job_params_hist) + except json.JSONDecodeError: + job_params_hist = {} + + print(f"{job_id:<8} {status:<12} {start:<20} {duration:<12} {deleted:<10}") + + if args.json: + print("\n--- Raw JSON ---") + print(json.dumps({"schedule": schedule, "history": history}, indent=2)) + + return 0 + + +def cmd_gc_run(args: Namespace, user: str, password: str, url: str) -> int: + """Trigger garbage collection manually.""" + dry_run = args.dry_run if hasattr(args, "dry_run") else False + + if not dry_run and not confirm_action("Trigger garbage collection?", args.force): + print("Aborted") + return 0 + + # Build the schedule payload for manual trigger + payload = { + "schedule": { + "type": GC_SCHEDULE_MANUAL, + }, + "parameters": { + "delete_untagged": not args.dry_run if hasattr(args, "dry_run") else True, + "dry_run": dry_run, + } + } + + result = api_request( + f"{url}/api/v2.0/system/gc/schedule", + user, password, method="POST", data=payload + ) + + if result.get("status") in (HTTP_OK, HTTP_CREATED) or not result.get("error"): + mode = "dry run" if dry_run else "deletion" + print(f"GC triggered ({mode} mode)") + + if args.wait: + print("Waiting for GC to complete...", end="", flush=True) + max_wait = 300 # 5 minutes + waited = 0 + + while waited < max_wait: + time.sleep(SCAN_POLL_INTERVAL) + waited += SCAN_POLL_INTERVAL + + history = api_request( + f"{url}/api/v2.0/system/gc?page=1&page_size=1", + user, password + ) + if "error" in history or not history: + continue + + latest = history[0] + status = latest.get("job_status", "") + if status == "Success": + print(f"\nGC complete") + return 0 + elif status in ("Error", "Failed", "Stopped"): + print(f"\nGC failed: {status}") + return 1 + + print(".", end="", flush=True) + + print(f"\nTimeout after {max_wait}s - GC may still be running") + return 1 + + return 0 + else: + print_error(result) + return 1 + + +def cmd_users(args: Namespace, user: str, password: str, url: str) -> int: + """List Harbor users.""" + page = args.page if hasattr(args, "page") and args.page else 1 + limit = args.limit if hasattr(args, "limit") and args.limit else ADMIN_PAGE_SIZE + + data = api_request( + f"{url}/api/v2.0/users?page={page}&page_size={limit}", + user, password + ) + if "error" in data: + print_error(data) + return 1 + + if not data: + print("No users found") + return 0 + + print(f"{'Username':<{COL_USERNAME}} {'Email':<{COL_EMAIL}} {'Admin':<{COL_ADMIN}} {'Created':<{COL_CREATED}}") + print("-" * 80) + for u in data: + username = u.get("username", "N/A")[:COL_USERNAME - 1] + email = u.get("email", "N/A")[:COL_EMAIL - 1] + admin = "Yes" if u.get("sysadmin_flag") else "No" + created = format_timestamp(u.get("creation_time"))[:COL_CREATED - 1] + print(f"{username:<{COL_USERNAME}} {email:<{COL_EMAIL}} {admin:<{COL_ADMIN}} {created:<{COL_CREATED}}") + + if len(data) == limit: + print(f"\n(page {page}, use --page to see more)") + + if args.json: + print("\n--- Raw JSON ---") + print(json.dumps(data, indent=2)) + + return 0 + + +# ============================================================================= +# Admin Commands - Phase B: User Management +# ============================================================================= + + +def _get_user_by_name(username: str, user: str, password: str, url: str) -> dict | None: + """Find user by username, return user dict or None.""" + data = api_request( + f"{url}/api/v2.0/users/search?username={username}", + user, password + ) + if "error" in data or not data: + return None + # Search returns list, find exact match + for u in data: + if u.get("username") == username: + return u + return None + + +def cmd_user_create(args: Namespace, user: str, password: str, url: str) -> int: + """Create a new local user.""" + username = args.username + email = args.email + + # Get password from args or prompt + new_password = args.password + if not new_password: + if not sys.stdin.isatty(): + print("Error: Password required in non-interactive mode. Use --password.") + return 1 + import getpass + new_password = getpass.getpass(f"Password for {username}: ") + if not new_password: + print("Error: Password cannot be empty") + return 1 + + payload = { + "username": username, + "email": email, + "password": new_password, + "realname": args.realname if hasattr(args, "realname") and args.realname else username, + "comment": args.comment if hasattr(args, "comment") and args.comment else "", + } + + result = api_request( + f"{url}/api/v2.0/users", + user, password, method="POST", data=payload + ) + + if result.get("status") == HTTP_CREATED or not result.get("error"): + print(f"Created user: {username}") + + # Grant admin if requested + if args.admin: + created_user = _get_user_by_name(username, user, password, url) + if created_user: + admin_result = api_request( + f"{url}/api/v2.0/users/{created_user['user_id']}/sysadmin", + user, password, method="PUT", data={"sysadmin_flag": True} + ) + if admin_result.get("status") == HTTP_OK or not admin_result.get("error"): + print(f"Granted admin privileges to: {username}") + else: + print(f"Warning: Failed to grant admin privileges") + print_error(admin_result) + + return 0 + else: + print_error(result) + return 1 + + +def cmd_user_delete(args: Namespace, user: str, password: str, url: str) -> int: + """Delete a user.""" + username = args.username + + # Find user ID + target = _get_user_by_name(username, user, password, url) + if not target: + print(f"Error: User '{username}' not found") + return 1 + + user_id = target.get("user_id") + if not user_id: + print("Error: Could not determine user ID") + return 1 + + if not confirm_action(f"Delete user '{username}' (ID: {user_id})?", args.force): + print("Aborted") + return 0 + + result = api_request( + f"{url}/api/v2.0/users/{user_id}", + user, password, method="DELETE" + ) + + if result.get("status") == HTTP_OK or not result.get("error"): + print(f"Deleted user: {username}") + return 0 + else: + print_error(result) + return 1 + + +def cmd_user_admin(args: Namespace, user: str, password: str, url: str) -> int: + """Grant or revoke admin privileges.""" + username = args.username + grant = not args.revoke if hasattr(args, "revoke") else True + + # Find user ID + target = _get_user_by_name(username, user, password, url) + if not target: + print(f"Error: User '{username}' not found") + return 1 + + user_id = target.get("user_id") + if not user_id: + print("Error: Could not determine user ID") + return 1 + + action = "grant" if grant else "revoke" + if not confirm_action(f"{action.capitalize()} admin privileges for '{username}'?", args.force): + print("Aborted") + return 0 + + result = api_request( + f"{url}/api/v2.0/users/{user_id}/sysadmin", + user, password, method="PUT", data={"sysadmin_flag": grant} + ) + + if result.get("status") == HTTP_OK or not result.get("error"): + print(f"Admin privileges {action}ed for: {username}") + return 0 + else: + print_error(result) + return 1 + + +def cmd_user_passwd(args: Namespace, user: str, password: str, url: str) -> int: + """Reset user password.""" + username = args.username + + # Find user ID + target = _get_user_by_name(username, user, password, url) + if not target: + print(f"Error: User '{username}' not found") + return 1 + + user_id = target.get("user_id") + if not user_id: + print("Error: Could not determine user ID") + return 1 + + # Get new password from args or prompt + new_password = args.new_password if hasattr(args, "new_password") else None + if not new_password: + if not sys.stdin.isatty(): + print("Error: Password required in non-interactive mode. Use --new-password.") + return 1 + import getpass + new_password = getpass.getpass(f"New password for {username}: ") + if not new_password: + print("Error: Password cannot be empty") + return 1 + + # Admin password reset requires old_password to be empty string + result = api_request( + f"{url}/api/v2.0/users/{user_id}/password", + user, password, method="PUT", + data={"old_password": "", "new_password": new_password} + ) + + if result.get("status") == HTTP_OK or not result.get("error"): + print(f"Password reset for: {username}") + return 0 + else: + print_error(result) + return 1 + + +# ============================================================================= +# Admin Commands - Phase C: Audit and Security +# ============================================================================= + + +def cmd_audit(args: Namespace, user: str, password: str, url: str) -> int: + """View audit logs.""" + page = args.page if hasattr(args, "page") and args.page else 1 + limit = args.limit if hasattr(args, "limit") and args.limit else ADMIN_PAGE_SIZE + + # Build query parameters + query_parts = [f"page={page}", f"page_size={limit}"] + + if hasattr(args, "operation") and args.operation: + query_parts.append(f"operation={args.operation}") + if hasattr(args, "username") and args.username: + query_parts.append(f"username={args.username}") + if hasattr(args, "resource") and args.resource: + query_parts.append(f"resource={args.resource}") + + query = "&".join(query_parts) + data = api_request(f"{url}/api/v2.0/audit-logs?{query}", user, password) + + if "error" in data: + print_error(data) + return 1 + + if not data: + print("No audit logs found") + return 0 + + print(f"{'Time':<20} {'User':<15} {'Operation':<12} {'Resource':<30}") + print("-" * 80) + for log in data: + ts = format_timestamp(log.get("op_time"))[:19] + username = log.get("username", "N/A")[:14] + operation = log.get("operation", "N/A")[:11] + resource = log.get("resource", "N/A")[:29] + print(f"{ts:<20} {username:<15} {operation:<12} {resource:<30}") + + if len(data) == limit: + print(f"\n(page {page}, use --page to see more)") + + if args.json: + print("\n--- Raw JSON ---") + print(json.dumps(data, indent=2)) + + return 0 + + +def cmd_audit_purge(args: Namespace, user: str, password: str, url: str) -> int: + """Purge old audit logs.""" + retention_days = args.retention if hasattr(args, "retention") and args.retention else 30 + + if not confirm_action( + f"Purge audit logs older than {retention_days} days?", + args.force + ): + print("Aborted") + return 0 + + # Get current purge schedule + current = api_request(f"{url}/api/v2.0/system/purgeaudit/schedule", user, password) + + # Build schedule payload + payload = { + "schedule": { + "type": GC_SCHEDULE_MANUAL, + }, + "parameters": { + "audit_retention_hour": retention_days * 24, + "dry_run": args.dry_run if hasattr(args, "dry_run") else False, + "include_operations": "create,delete,pull,push", + } + } + + result = api_request( + f"{url}/api/v2.0/system/purgeaudit/schedule", + user, password, method="POST", data=payload + ) + + if result.get("status") in (HTTP_OK, HTTP_CREATED) or not result.get("error"): + mode = "dry run" if args.dry_run else "purge" + print(f"Audit log {mode} triggered (retention: {retention_days} days)") + return 0 + else: + print_error(result) + return 1 + + +def cmd_cve_allow(args: Namespace, user: str, password: str, url: str) -> int: + """Manage system CVE allowlist.""" + # Get current allowlist + data = api_request(f"{url}/api/v2.0/system/CVEAllowlist", user, password) + if "error" in data: + print_error(data) + return 1 + + items = data.get("items", []) or [] + + # Add CVE + if hasattr(args, "add") and args.add: + cve_id = args.add.upper() + existing = {item.get("cve_id") for item in items} + if cve_id in existing: + print(f"CVE already in allowlist: {cve_id}") + return 0 + + items.append({"cve_id": cve_id}) + result = api_request( + f"{url}/api/v2.0/system/CVEAllowlist", + user, password, method="PUT", + data={"items": items, "expires_at": data.get("expires_at")} + ) + if result.get("status") == HTTP_OK or not result.get("error"): + print(f"Added to allowlist: {cve_id}") + return 0 + else: + print_error(result) + return 1 + + # Remove CVE + if hasattr(args, "remove") and args.remove: + cve_id = args.remove.upper() + original_len = len(items) + items = [item for item in items if item.get("cve_id") != cve_id] + if len(items) == original_len: + print(f"CVE not in allowlist: {cve_id}") + return 0 + + result = api_request( + f"{url}/api/v2.0/system/CVEAllowlist", + user, password, method="PUT", + data={"items": items, "expires_at": data.get("expires_at")} + ) + if result.get("status") == HTTP_OK or not result.get("error"): + print(f"Removed from allowlist: {cve_id}") + return 0 + else: + print_error(result) + return 1 + + # List CVEs (default) + if not items: + print("No CVEs in system allowlist") + return 0 + + expires = data.get("expires_at") + if expires: + print(f"Expires: {format_timestamp(expires)}") + print(f"\nAllowed CVEs ({len(items)}):") + for item in items: + print(f" {item.get('cve_id', 'N/A')}") + + if args.json: + print("\n--- Raw JSON ---") + print(json.dumps(data, indent=2)) + + return 0 + + +def cmd_scan_all(args: Namespace, user: str, password: str, url: str) -> int: + """Trigger system-wide vulnerability scan.""" + if not confirm_action("Trigger system-wide vulnerability scan?", args.force): + print("Aborted") + return 0 + + # Build schedule payload for manual trigger + payload = { + "schedule": { + "type": GC_SCHEDULE_MANUAL, + } + } + + result = api_request( + f"{url}/api/v2.0/system/scanAll/schedule", + user, password, method="POST", data=payload + ) + + if result.get("status") in (HTTP_OK, HTTP_CREATED) or not result.get("error"): + print("System-wide scan triggered") + print("Note: This may take a long time depending on the number of artifacts") + return 0 + else: + print_error(result) + return 1 diff --git a/src/harbor/constants.py b/src/harbor/constants.py new file mode 100644 index 0000000..064bafc --- /dev/null +++ b/src/harbor/constants.py @@ -0,0 +1,55 @@ +"""Constants and magic numbers.""" + +# Network +DEFAULT_TIMEOUT = 30 +SCAN_POLL_INTERVAL = 3 +SCAN_DEFAULT_TIMEOUT = 120 + +# Pagination +DEFAULT_PAGE_SIZE = 100 +DEFAULT_RESULT_LIMIT = 50 + +# Display column widths +COL_DIGEST = 20 +COL_TAGS = 25 +COL_PROJECT = 20 +COL_REPO = 40 +COL_SIZE = 12 +COL_STATUS = 15 +COL_SEVERITY = 10 +COL_CVE = 25 +COL_PACKAGE = 25 +COL_VERSION = 15 +COL_TAG = 30 +COL_TIME = 25 + +# Validation +SHA256_DIGEST_LENGTH = 71 +SHA256_PREFIX = "sha256:" + +# HTTP status codes +HTTP_SUCCESS = (200, 201, 202, 204) +HTTP_ACCEPTED = 202 +HTTP_OK = 200 +HTTP_CREATED = 201 + +# Admin display column widths +COL_USERNAME = 20 +COL_EMAIL = 30 +COL_ADMIN = 8 +COL_CREATED = 20 +COL_ACTION = 30 +COL_RESOURCE = 40 +COL_OPERATION = 15 +COL_SCHEDULE = 20 + +# Admin pagination +ADMIN_PAGE_SIZE = 25 +ADMIN_MAX_PAGES = 10 + +# GC job types +GC_JOB_TYPE = "GARBAGE_COLLECTION" +GC_SCHEDULE_MANUAL = "Manual" + +# Audit log operations +AUDIT_OPERATIONS = ("create", "delete", "pull", "push", "update") diff --git a/src/harbor/output.py b/src/harbor/output.py new file mode 100644 index 0000000..e9694cd --- /dev/null +++ b/src/harbor/output.py @@ -0,0 +1,77 @@ +"""Output formatting helpers.""" + +import sys +from typing import Any + + +def print_error(data: dict[str, Any] | str) -> None: + """Print formatted error message. + + Handles both dict responses from API (with 'error' and 'message' keys) + and plain string errors. + + Args: + data: Error dict from API or error string + """ + if isinstance(data, dict): + code = data.get("error", "") + msg = data.get("message", str(data)) + if code: + print(f"Error: {msg} ({code})") + else: + print(f"Error: {msg}") + else: + print(f"Error: {data}") + + +def confirm_action(message: str, force: bool = False) -> bool: + """Require confirmation for destructive actions. + + Args: + message: Description of the action to confirm + force: If True, skip confirmation and return True + + Returns: + True if confirmed, False otherwise + """ + if force: + return True + + if not sys.stdin.isatty(): + print("Error: Cannot confirm in non-interactive mode. Use --force.") + return False + + print(message) + response = input("Confirm? [y/N]: ") + return response.lower() == "y" + + +def format_size(size_bytes: int) -> str: + """Format byte size for display. + + Args: + size_bytes: Size in bytes + + Returns: + Human-readable size string (e.g., "1.5 GB") + """ + for unit in ("B", "KB", "MB", "GB", "TB"): + if size_bytes < 1024: + return f"{size_bytes:.1f} {unit}" + size_bytes /= 1024 + return f"{size_bytes:.1f} PB" + + +def format_timestamp(ts: str | None) -> str: + """Format ISO timestamp for display. + + Args: + ts: ISO 8601 timestamp string or None + + Returns: + Formatted timestamp or "N/A" + """ + if not ts: + return "N/A" + # Strip timezone suffix and fractional seconds for display + return ts.replace("T", " ").split(".")[0].rstrip("Z")