#!/usr/bin/env python3 """CLI entry point for RF Environment Scanner""" import argparse import sys from pathlib import Path from .scanner import RFScanner from .visualize import ( create_ascii_radar, create_signal_strength_chart, create_environment_analysis, load_latest_scan ) from .distance import estimate_distance from .config import Config, get_config def main(): parser = argparse.ArgumentParser( description="RF Environment Scanner - Map WiFi and Bluetooth signals", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: rf-mapper start # Start web server (background) rf-mapper start --foreground # Start in foreground (debug) rf-mapper stop # Stop web server rf-mapper restart # Restart web server rf-mapper status # Check if running rf-mapper scan -l kitchen # Scan with location label rf-mapper config # Show current configuration Note: Requires sudo for WiFi/Bluetooth scanning. """ ) parser.add_argument( '-c', '--config', help='Path to configuration file' ) parser.add_argument( '--profile', action='store_true', help='Enable CPU profiling (prints stats on completion)' ) parser.add_argument( '--profile-memory', action='store_true', help='Enable memory profiling (shows top allocations)' ) parser.add_argument( '--profile-output', type=Path, metavar='FILE', help='Save CPU profile to file (.prof format)' ) subparsers = parser.add_subparsers(dest='command', help='Available commands') # Scan command scan_parser = subparsers.add_parser('scan', help='Perform WiFi and Bluetooth scan') scan_parser.add_argument( '-l', '--location', default='default', help='Location label for this scan (e.g., living_room, office)' ) scan_parser.add_argument( '-i', '--interface', help='WiFi interface to use (default from config)' ) scan_parser.add_argument( '--no-wifi', action='store_true', help='Skip WiFi scanning' ) scan_parser.add_argument( '--no-bt', action='store_true', help='Skip Bluetooth scanning' ) # Visualize command viz_parser = subparsers.add_parser('visualize', help='Visualize scan results') viz_parser.add_argument( '-f', '--file', help='Specific scan file to visualize (default: latest)' ) # Analyze command analyze_parser = subparsers.add_parser('analyze', help='Analyze RF environment') analyze_parser.add_argument( '-f', '--file', help='Specific scan file to analyze (default: latest)' ) # List command subparsers.add_parser('list', help='List saved scans') # Start command start_parser = subparsers.add_parser('start', help='Start web server') start_parser.add_argument( '-H', '--host', help='Host to bind to (default from config)' ) start_parser.add_argument( '-p', '--port', type=int, help='Port to listen on (default from config)' ) start_parser.add_argument( '-f', '--foreground', action='store_true', help='Run in foreground (default: background daemon)' ) start_parser.add_argument( '--debug', action='store_true', help='Enable Flask debug mode' ) start_parser.add_argument( '--profile-requests', action='store_true', help='Enable per-request profiling (saves profiles to data/profiles/)' ) start_parser.add_argument( '--log-requests', action='store_true', help='Log all requests to data/logs/requests_YYYYMMDD.log' ) # Stop command subparsers.add_parser('stop', help='Stop background web server') # Restart command restart_parser = subparsers.add_parser('restart', help='Restart web server') restart_parser.add_argument( '-H', '--host', help='Host to bind to (default from config)' ) restart_parser.add_argument( '-p', '--port', type=int, help='Port to listen on (default from config)' ) restart_parser.add_argument( '--debug', action='store_true', help='Enable Flask debug mode' ) restart_parser.add_argument( '--profile-requests', action='store_true', help='Enable per-request profiling' ) restart_parser.add_argument( '--log-requests', action='store_true', help='Log all requests' ) # Status command subparsers.add_parser('status', help='Check if web server is running') # Deprecated: web command (alias for start --foreground) web_parser = subparsers.add_parser('web', help='[DEPRECATED] Use "start" instead') web_parser.add_argument('-H', '--host', help='Host to bind to') web_parser.add_argument('-p', '--port', type=int, help='Port to listen on') web_parser.add_argument('--debug', action='store_true', help='Enable debug mode') web_parser.add_argument('--profile-requests', action='store_true', help='Enable profiling') web_parser.add_argument('--log-requests', action='store_true', help='Log requests') # Check-termux command subparsers.add_parser('check-termux', help='Check Termux/Android prerequisites') # Config command config_parser = subparsers.add_parser('config', help='Show/edit configuration') config_parser.add_argument( '--set-gps', nargs=2, metavar=('LAT', 'LON'), help='Set GPS coordinates' ) config_parser.add_argument( '--save', action='store_true', help='Save changes to config file' ) args = parser.parse_args() # Load configuration config = Config.load(args.config) if args.config else get_config() data_dir = config.get_data_dir() data_dir.mkdir(parents=True, exist_ok=True) def run_command(): if args.command == 'scan': run_scan(args, config, data_dir) elif args.command == 'visualize': run_visualize(args, data_dir) elif args.command == 'analyze': run_analyze(args, data_dir) elif args.command == 'list': run_list(data_dir) elif args.command == 'start': run_start(args, config, data_dir) elif args.command == 'stop': run_stop(data_dir) elif args.command == 'restart': run_restart(args, config, data_dir) elif args.command == 'status': run_status(data_dir) elif args.command == 'config': run_config(args, config) elif args.command == 'check-termux': run_check_termux() elif args.command == 'web': run_web_deprecated(args, config, data_dir) else: # Default: run interactive scan run_interactive(config, data_dir) # Wrap command execution with profilers if requested if args.profile or args.profile_memory: from .profiling import cpu_profiler, memory_profiler if args.profile and args.profile_memory: with cpu_profiler(args.profile_output), memory_profiler(): run_command() elif args.profile: with cpu_profiler(args.profile_output): run_command() else: with memory_profiler(): run_command() else: run_command() def run_interactive(config: Config, data_dir: Path): """Run interactive scan mode""" print(f""" ╔══════════════════════════════════════════════════════════════╗ ║ RF ENVIRONMENT SCANNER v1.0 ║ ║ WiFi + Bluetooth Signal Mapper ║ ╚══════════════════════════════════════════════════════════════╝ Config: {config._config_path or 'defaults'} GPS: {config.gps.latitude}, {config.gps.longitude} """) try: location = input("Enter location label (e.g., 'living_room'): ").strip() or "default" except (EOFError, KeyboardInterrupt): location = "default" scanner = RFScanner(data_dir) result, wifi, bt = scanner.full_scan(location) scanner.print_results(wifi, bt) # Show visualizations if wifi: print(create_ascii_radar(result.wifi_networks, f"WiFi Networks ({len(wifi)} found)")) if bt: print(create_ascii_radar(result.bluetooth_devices, f"Bluetooth Devices ({len(bt)} found)")) print(create_environment_analysis(result.wifi_networks, result.bluetooth_devices)) # Estimated distances print(f"\n{'='*60}") print("ESTIMATED DISTANCES") print('='*60) print("\nWiFi Access Points (nearest 5):") for net in sorted(wifi, key=lambda x: x.rssi, reverse=True)[:5]: dist = estimate_distance(net.rssi, n=config.scanner.path_loss_exponent) print(f" {net.ssid[:25]:<25} ~{dist:.1f}m away") if bt: print("\nBluetooth Devices (nearest 5):") for dev in sorted(bt, key=lambda x: x.rssi, reverse=True)[:5]: dist = estimate_distance(dev.rssi, tx_power=-65, n=config.scanner.path_loss_exponent) print(f" {dev.name[:25]:<25} ~{dist:.1f}m away") def run_scan(args, config: Config, data_dir: Path): """Run a scan with specified options""" interface = args.interface or config.scanner.wifi_interface scanner = RFScanner(data_dir) print(f"Starting scan at location: {args.location}") wifi = [] bt = [] if not args.no_wifi: wifi = scanner.scan_wifi(interface) print(f"Found {len(wifi)} WiFi networks") if not args.no_bt: bt = scanner.scan_bluetooth( timeout=config.scanner.bt_scan_timeout, auto_identify=config.scanner.auto_identify_bluetooth ) print(f"Found {len(bt)} Bluetooth devices") # Save results from dataclasses import asdict from datetime import datetime import json result = { 'timestamp': datetime.now().isoformat(), 'location_label': args.location, 'gps': { 'latitude': config.gps.latitude, 'longitude': config.gps.longitude }, 'wifi_networks': [asdict(n) for n in wifi], 'bluetooth_devices': [asdict(d) for d in bt] } filename = data_dir / f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{args.location}.json" with open(filename, 'w') as f: json.dump(result, f, indent=2) print(f"Saved to: {filename}") # Print results scanner.print_results(wifi, bt) def run_visualize(args, data_dir: Path): """Visualize scan results""" if args.file: import json with open(args.file) as f: scan = json.load(f) else: scan = load_latest_scan(data_dir) if not scan: print("No scan data found. Run 'rf-mapper scan' first.") sys.exit(1) print(f"Scan from: {scan['timestamp']}") print(f"Location: {scan['location_label']}") wifi = scan.get('wifi_networks', []) bt = scan.get('bluetooth_devices', []) if wifi: print(create_ascii_radar(wifi, f"WiFi Networks ({len(wifi)} found)")) print(create_signal_strength_chart(wifi, "WiFi Signal Strengths")) if bt: print(create_ascii_radar(bt, f"Bluetooth Devices ({len(bt)} found)")) print(create_signal_strength_chart(bt, "Bluetooth Signal Strengths")) def run_analyze(args, data_dir: Path): """Analyze RF environment""" if args.file: import json with open(args.file) as f: scan = json.load(f) else: scan = load_latest_scan(data_dir) if not scan: print("No scan data found. Run 'rf-mapper scan' first.") sys.exit(1) wifi = scan.get('wifi_networks', []) bt = scan.get('bluetooth_devices', []) print(create_environment_analysis(wifi, bt)) def run_list(data_dir: Path): """List saved scans""" scan_files = sorted(data_dir.glob('scan_*.json'), reverse=True) if not scan_files: print("No scans found.") return print(f"{'Date/Time':<20} {'Location':<20} {'WiFi':>6} {'BT':>6}") print('-' * 55) for f in scan_files[:20]: import json with open(f) as fh: scan = json.load(fh) ts = scan.get('timestamp', '')[:19].replace('T', ' ') loc = scan.get('location_label', 'unknown') wifi_count = len(scan.get('wifi_networks', [])) bt_count = len(scan.get('bluetooth_devices', [])) print(f"{ts:<20} {loc:<20} {wifi_count:>6} {bt_count:>6}") def run_config(args, config: Config): """Show or edit configuration""" if args.set_gps: lat, lon = args.set_gps config.gps.latitude = float(lat) config.gps.longitude = float(lon) print(f"GPS set to: {config.gps.latitude}, {config.gps.longitude}") if args.save: config.save() print(f"Configuration saved to: {config._config_path}") # Show current config print(f""" RF Mapper Configuration {'='*40} Config File: {config._config_path or 'Not found (using defaults)'} GPS Position: Latitude: {config.gps.latitude} Longitude: {config.gps.longitude} Web Server: Host: {config.web.host} Port: {config.web.port} Scanner: WiFi Interface: {config.scanner.wifi_interface} BT Scan Timeout: {config.scanner.bt_scan_timeout}s Path Loss Exponent: {config.scanner.path_loss_exponent} Data: Directory: {config.get_data_dir()} Max Scans: {config.data.max_scans} Home Assistant: Enabled: {config.home_assistant.enabled} URL: {config.home_assistant.url} """) def run_check_termux(): """Check Termux/Android prerequisites""" from .termux import is_termux, check_termux_prerequisites if not is_termux(): print("Not running in Termux/Android environment.") print("This check is only relevant when running on Android via Termux.") sys.exit(0) success = check_termux_prerequisites(verbose=True) sys.exit(0 if success else 1) def get_pid_file(data_dir: Path) -> Path: """Get path to PID file""" return data_dir / "rf-mapper.pid" def get_start_time_file(data_dir: Path) -> Path: """Get path to start time file""" return data_dir / "rf-mapper.started" def read_pid(data_dir: Path) -> int | None: """Read PID from file, return None if not exists or invalid""" pid_file = get_pid_file(data_dir) if not pid_file.exists(): return None try: pid = int(pid_file.read_text().strip()) return pid except (ValueError, OSError): return None def read_start_time(data_dir: Path) -> float | None: """Read start timestamp from file""" start_file = get_start_time_file(data_dir) if not start_file.exists(): return None try: return float(start_file.read_text().strip()) except (ValueError, OSError): return None def format_uptime(seconds: float) -> str: """Format uptime in human-readable form""" if seconds < 60: return f"{int(seconds)}s" elif seconds < 3600: mins = int(seconds // 60) secs = int(seconds % 60) return f"{mins}m {secs}s" elif seconds < 86400: hours = int(seconds // 3600) mins = int((seconds % 3600) // 60) return f"{hours}h {mins}m" else: days = int(seconds // 86400) hours = int((seconds % 86400) // 3600) return f"{days}d {hours}h" def is_process_running(pid: int) -> bool: """Check if process with given PID is running""" import os import signal try: os.kill(pid, 0) return True except OSError: return False def run_web_deprecated(args, config: Config, data_dir: Path): """Deprecated: run web server (forwards to start --foreground)""" import sys print("\033[33m" + "=" * 60 + "\033[0m") print("\033[33mWARNING: 'rf-mapper web' is deprecated.\033[0m") print("\033[33mUse 'rf-mapper start' instead (runs in background).\033[0m") print("\033[33mUse 'rf-mapper start -f' for foreground mode.\033[0m") print("\033[33mThis command will be removed in a future version.\033[0m") print("\033[33m" + "=" * 60 + "\033[0m\n") # Run in foreground (old behavior) from .web.app import run_server host = args.host or config.web.host port = args.port or config.web.port debug = getattr(args, 'debug', False) profile_requests = getattr(args, 'profile_requests', False) log_requests = getattr(args, 'log_requests', False) run_server( host=host, port=port, debug=debug, config=config, profile_requests=profile_requests, log_requests=log_requests ) def run_start(args, config: Config, data_dir: Path): """Start web server (foreground or background)""" import os import subprocess import time # Check Termux prerequisites if running on Android from .termux import is_termux, check_termux_prerequisites, setup_termux_signal_handlers if is_termux(): if not check_termux_prerequisites(verbose=True): print("Exiting due to missing prerequisites.") sys.exit(1) # Set up signal handlers for graceful shutdown setup_termux_signal_handlers() host = args.host or config.web.host port = args.port or config.web.port debug = getattr(args, 'debug', False) profile_requests = getattr(args, 'profile_requests', False) log_requests = getattr(args, 'log_requests', False) foreground = getattr(args, 'foreground', False) pid_file = get_pid_file(data_dir) start_file = get_start_time_file(data_dir) # Check if already running (skip check if we're the spawned foreground process) if not foreground: pid = read_pid(data_dir) if pid and is_process_running(pid): print(f"RF Mapper is already running (PID: {pid})") print(f"Use 'rf-mapper stop' to stop it, or 'rf-mapper restart' to restart") sys.exit(1) if foreground: # Run in foreground (blocking) from .web.app import run_server # Write PID file for status command pid_file.write_text(str(os.getpid())) start_file.write_text(str(time.time())) try: run_server( host=host, port=port, debug=debug, config=config, profile_requests=profile_requests, log_requests=log_requests ) finally: # Cleanup PID file on exit pid_file.unlink(missing_ok=True) start_file.unlink(missing_ok=True) else: # Run in background (daemon mode) # Build command for subprocess cmd = [sys.executable, "-m", "rf_mapper", "start", "-H", host, "-p", str(port), "--foreground"] if debug: cmd.append("--debug") if profile_requests: cmd.append("--profile-requests") if log_requests: cmd.append("--log-requests") # Start process in background log_file = data_dir / "rf-mapper.log" with open(log_file, 'a') as log: process = subprocess.Popen( cmd, stdout=log, stderr=log, start_new_session=True ) # Wait briefly for process to start and write PID file time.sleep(1) # Verify it started successfully if not is_process_running(process.pid): print("RF Mapper failed to start. Check log:") print(f" {log_file}") sys.exit(1) print(f"RF Mapper started (PID: {process.pid})") print(f" URL: http://{host}:{port}") print(f" Log: {log_file}") print(f"\nUse 'rf-mapper stop' to stop the server") def run_stop(data_dir: Path): """Stop background web server""" import os import signal pid = read_pid(data_dir) if not pid: print("RF Mapper is not running (no PID file)") sys.exit(1) if not is_process_running(pid): print(f"RF Mapper is not running (stale PID: {pid})") get_pid_file(data_dir).unlink(missing_ok=True) get_start_time_file(data_dir).unlink(missing_ok=True) sys.exit(1) # Send SIGTERM try: os.kill(pid, signal.SIGTERM) print(f"RF Mapper stopped (PID: {pid})") get_pid_file(data_dir).unlink(missing_ok=True) get_start_time_file(data_dir).unlink(missing_ok=True) except OSError as e: print(f"Failed to stop RF Mapper: {e}") sys.exit(1) def run_restart(args, config: Config, data_dir: Path): """Restart background web server""" import time pid = read_pid(data_dir) if pid and is_process_running(pid): print("Stopping RF Mapper...") run_stop(data_dir) time.sleep(1) print("Starting RF Mapper...") run_start(args, config, data_dir) def run_status(data_dir: Path): """Check if web server is running""" import time pid = read_pid(data_dir) if not pid: print("RF Mapper is not running (no PID file)") sys.exit(1) if is_process_running(pid): print(f"RF Mapper is running (PID: {pid})") # Show uptime start_time = read_start_time(data_dir) if start_time: uptime_secs = time.time() - start_time print(f" Uptime: {format_uptime(uptime_secs)}") log_file = data_dir / "rf-mapper.log" if log_file.exists(): print(f" Log: {log_file}") sys.exit(0) else: print(f"RF Mapper is not running (stale PID: {pid})") get_pid_file(data_dir).unlink(missing_ok=True) get_start_time_file(data_dir).unlink(missing_ok=True) sys.exit(1) if __name__ == '__main__': main()