#!/usr/bin/env python3 """ Dynamic Inventory Script - SSH Config Parser ============================================== Parses ~/.ssh/config to generate Ansible dynamic inventory. Usage: python ssh_config_inventory.py --list python ssh_config_inventory.py --host Requirements: - Python 3.6+ - paramiko (optional, for advanced SSH config parsing) Author: Ansible Infrastructure Team Version: 1.0.0 """ import argparse import json import os import re import sys from pathlib import Path from typing import Dict, List, Any, Optional class SSHConfigParser: """Parse SSH config file and extract host configurations.""" def __init__(self, config_path: Optional[str] = None): """ Initialize SSH config parser. Args: config_path: Path to SSH config file. Defaults to ~/.ssh/config """ if config_path is None: config_path = os.path.expanduser("~/.ssh/config") self.config_path = config_path self.hosts = {} self.groups = { 'all': { 'children': ['external_hosts', 'hypervisors', 'kvm_guests'] }, 'external_hosts': {'hosts': []}, 'hypervisors': {'hosts': []}, 'kvm_guests': { 'children': ['dns_servers', 'mail_servers', 'development', 'uncategorized'], 'vars': { 'ansible_user': 'ansible', 'ansible_ssh_common_args': '-o StrictHostKeyChecking=accept-new' } }, 'dns_servers': {'hosts': []}, 'mail_servers': {'hosts': []}, 'development': {'hosts': []}, 'uncategorized': {'hosts': []}, '_meta': { 'hostvars': {} } } def parse_config(self) -> Dict[str, Any]: """ Parse SSH config file and build inventory structure. Returns: Dictionary containing Ansible inventory structure """ if not os.path.exists(self.config_path): print(f"Warning: SSH config not found at {self.config_path}", file=sys.stderr) return self.groups current_host = None current_config = {} with open(self.config_path, 'r') as f: for line in f: line = line.strip() # Skip comments and empty lines if not line or line.startswith('#'): continue # Match Host declarations host_match = re.match(r'^Host\s+(.+)$', line, re.IGNORECASE) if host_match: # Save previous host if exists if current_host and current_host != '*': self._add_host(current_host, current_config) # Start new host current_host = host_match.group(1).strip() current_config = {'ansible_host': current_host} continue # Parse host configuration options if current_host and current_host != '*': self._parse_option(line, current_config) # Add last host if current_host and current_host != '*': self._add_host(current_host, current_config) return self.groups def _parse_option(self, line: str, config: Dict[str, str]): """Parse SSH config option line.""" # Match key-value pairs (Hostname, User, Port, etc.) match = re.match(r'^(\w+)\s+(.+)$', line, re.IGNORECASE) if not match: return key = match.group(1).lower() value = match.group(2).strip() # Map SSH config options to Ansible variables option_map = { 'hostname': 'ansible_host', 'user': 'ansible_user', 'port': 'ansible_port', 'proxyjump': 'ansible_ssh_common_args', 'forwardagent': 'ansible_ssh_extra_args', } if key == 'hostname': config['ansible_host'] = value elif key == 'user': config['ansible_user'] = value elif key == 'port': config['ansible_port'] = int(value) elif key == 'proxyjump': # Build ProxyJump SSH args existing_args = config.get('ansible_ssh_common_args', '') proxy_arg = f"-o ProxyJump={value}" config['ansible_ssh_common_args'] = f"{existing_args} {proxy_arg}".strip() elif key == 'forwardagent': if value.lower() == 'yes': config['ansible_ssh_extra_args'] = '-o ForwardAgent=yes' elif key == 'hostkeyalias': config['host_key_alias'] = value def _add_host(self, hostname: str, config: Dict[str, Any]): """Add host to inventory and categorize into groups.""" # Store host configuration self.hosts[hostname] = config self.groups['_meta']['hostvars'][hostname] = config # Categorize host based on characteristics group = self._categorize_host(hostname, config) if group and group in self.groups: self.groups[group]['hosts'].append(hostname) def _categorize_host(self, hostname: str, config: Dict[str, Any]) -> Optional[str]: """ Categorize host into appropriate group based on naming and config. Args: hostname: Host name config: Host configuration dictionary Returns: Group name or None """ ansible_user = config.get('ansible_user', '') has_proxyjump = 'ProxyJump' in config.get('ansible_ssh_common_args', '') ansible_host = config.get('ansible_host', '') # Categorization logic # 1. Hypervisors - have ForwardAgent and specific users if ansible_user in ['grok'] or 'ForwardAgent' in config.get('ansible_ssh_extra_args', ''): return 'hypervisors' # 2. External hosts - public IPs, no ProxyJump if not has_proxyjump and ansible_user not in ['ansible']: # Check if it looks like a public IP or external hostname if re.match(r'^\d+\.\d+\.\d+\.\d+$', ansible_host) or 'home' not in ansible_host: if not ansible_host.startswith('192.168.'): return 'external_hosts' # 3. KVM guests - use ansible user and ProxyJump if has_proxyjump and ansible_user == 'ansible': # Categorize by service/role based on hostname if 'pihole' in hostname or 'dns' in hostname: return 'dns_servers' elif 'mail' in hostname or 'mx' in hostname or hostname == 'mymx': return 'mail_servers' elif 'derp' in hostname or 'dev' in hostname or 'test' in hostname: return 'development' else: return 'uncategorized' return None def get_host_vars(self, hostname: str) -> Dict[str, Any]: """ Get variables for a specific host. Args: hostname: Host name Returns: Dictionary of host variables """ return self.groups['_meta']['hostvars'].get(hostname, {}) def main(): """Main entry point for dynamic inventory script.""" parser = argparse.ArgumentParser( description='Ansible dynamic inventory from SSH config' ) parser.add_argument( '--list', action='store_true', help='List all hosts and groups' ) parser.add_argument( '--host', help='Get variables for specific host' ) args = parser.parse_args() # Initialize SSH config parser ssh_parser = SSHConfigParser() if args.list: # Return full inventory inventory = ssh_parser.parse_config() print(json.dumps(inventory, indent=2)) elif args.host: # Return host variables ssh_parser.parse_config() host_vars = ssh_parser.get_host_vars(args.host) print(json.dumps(host_vars, indent=2)) else: parser.print_help() sys.exit(1) if __name__ == '__main__': main()