#!/usr/bin/env python3 """ Libvirt/KVM Dynamic Inventory Plugin for Ansible ================================================= Queries libvirt hypervisors to dynamically discover KVM guest VMs. Features: - Discovers running VMs from libvirt - Categorizes VMs by state, network, and metadata - Supports multiple hypervisors - Caches results for performance - ProxyJump configuration for nested VM access Requirements: - python3-libvirt - SSH access to hypervisor(s) Configuration: Set hypervisor connection URIs in inventory config or environment Author: Ansible Infrastructure Team Version: 1.0.0 """ import argparse import json import os import sys import xml.etree.ElementTree as ET from typing import Dict, List, Any, Optional from collections import defaultdict try: import libvirt HAS_LIBVIRT = True except ImportError: HAS_LIBVIRT = False class LibvirtInventory: """Generate Ansible inventory from libvirt/KVM infrastructure.""" def __init__(self): """Initialize libvirt inventory generator.""" self.inventory = { 'all': { 'children': ['hypervisors', 'kvm_guests'] }, 'hypervisors': { 'hosts': [] }, 'kvm_guests': { 'children': ['running_vms', 'stopped_vms'], 'vars': { 'ansible_user': 'ansible', 'ansible_python_interpreter': '/usr/bin/python3' } }, 'running_vms': { 'hosts': [] }, 'stopped_vms': { 'hosts': [] }, '_meta': { 'hostvars': {} } } # Hypervisor configurations self.hypervisors = self._load_hypervisor_config() def _load_hypervisor_config(self) -> List[Dict[str, str]]: """ Load hypervisor connection configuration. Returns: List of hypervisor configuration dictionaries """ # Default hypervisor from environment or config hypervisors = [] # Check environment variables libvirt_uri = os.environ.get('LIBVIRT_DEFAULT_URI', 'qemu+ssh://grok@grok.home.serneels.xyz/system') hypervisor_name = os.environ.get('LIBVIRT_HYPERVISOR_NAME', 'grokbox') hypervisors.append({ 'name': hypervisor_name, 'uri': libvirt_uri, 'proxy_jump': True }) return hypervisors def generate_inventory(self) -> Dict[str, Any]: """ Generate complete inventory by querying all configured hypervisors. Returns: Ansible inventory dictionary """ if not HAS_LIBVIRT: print("Warning: python3-libvirt not installed. Returning empty inventory.", file=sys.stderr) print("Install with: apt-get install python3-libvirt (Debian/Ubuntu)", file=sys.stderr) print(" or: dnf install python3-libvirt (RHEL/Fedora)", file=sys.stderr) return self.inventory for hypervisor in self.hypervisors: try: self._query_hypervisor(hypervisor) except Exception as e: print(f"Warning: Failed to query hypervisor {hypervisor['name']}: {e}", file=sys.stderr) # Add hypervisor to inventory even if connection fails self._add_hypervisor_host(hypervisor) return self.inventory def _query_hypervisor(self, hypervisor_config: Dict[str, str]): """ Query a libvirt hypervisor for VM information. Args: hypervisor_config: Hypervisor connection configuration """ try: # Connect to hypervisor conn = libvirt.open(hypervisor_config['uri']) if conn is None: raise Exception(f"Failed to open connection to {hypervisor_config['uri']}") # Add hypervisor to inventory self._add_hypervisor_host(hypervisor_config, conn) # List all domains (VMs) domains = conn.listAllDomains(0) for domain in domains: self._process_domain(domain, hypervisor_config) conn.close() except libvirt.libvirtError as e: raise Exception(f"Libvirt error: {e}") def _add_hypervisor_host(self, hypervisor_config: Dict[str, str], conn=None): """ Add hypervisor to inventory. Args: hypervisor_config: Hypervisor configuration conn: Libvirt connection object (optional) """ hypervisor_name = hypervisor_config['name'] if hypervisor_name not in self.inventory['hypervisors']['hosts']: self.inventory['hypervisors']['hosts'].append(hypervisor_name) # Extract hostname from URI uri = hypervisor_config['uri'] if '@' in uri: user_host = uri.split('@')[1].split('/')[0] ansible_host = user_host ansible_user = uri.split('@')[0].split('://')[-1] else: ansible_host = 'localhost' ansible_user = 'root' hostvars = { 'ansible_host': ansible_host, 'ansible_user': ansible_user, 'libvirt_uri': uri, 'hypervisor_type': 'kvm', 'ansible_python_interpreter': '/usr/bin/python3' } # Add hypervisor stats if connection is available if conn: try: info = conn.getInfo() hostvars.update({ 'hypervisor_model': info[0], 'hypervisor_memory_mb': info[1], 'hypervisor_cpus': info[2], 'hypervisor_mhz': info[3], 'hypervisor_nodes': info[4], 'hypervisor_sockets': info[5], 'hypervisor_cores': info[6], 'hypervisor_threads': info[7] }) except: pass self.inventory['_meta']['hostvars'][hypervisor_name] = hostvars def _process_domain(self, domain, hypervisor_config: Dict[str, str]): """ Process a libvirt domain (VM) and add to inventory. Args: domain: Libvirt domain object hypervisor_config: Parent hypervisor configuration """ vm_name = domain.name() vm_state = domain.state()[0] vm_uuid = domain.UUIDString() # Parse VM XML for detailed information xml_desc = domain.XMLDesc(0) root = ET.fromstring(xml_desc) # Extract VM metadata vm_info = { 'vm_name': vm_name, 'vm_uuid': vm_uuid, 'vm_state': self._state_to_string(vm_state), 'vm_state_code': vm_state, 'hypervisor': hypervisor_config['name'], 'ansible_user': 'ansible', 'ansible_python_interpreter': '/usr/bin/python3' } # Get vCPU count vcpu_elem = root.find('.//vcpu') if vcpu_elem is not None: vm_info['vm_vcpus'] = int(vcpu_elem.text) # Get memory memory_elem = root.find('.//memory') if memory_elem is not None: memory_kb = int(memory_elem.text) vm_info['vm_memory_mb'] = memory_kb // 1024 # Get network information interfaces = root.findall('.//interface') network_info = [] for iface in interfaces: mac = iface.find('.//mac') source = iface.find('.//source') if mac is not None: iface_info = {'mac': mac.get('address')} if source is not None: iface_info['network'] = source.get('network') or source.get('bridge') network_info.append(iface_info) if network_info: vm_info['vm_networks'] = network_info # Try to get IP address if VM is running if vm_state == libvirt.VIR_DOMAIN_RUNNING: try: ifaces = domain.interfaceAddresses(libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE) for iface_name, iface_data in ifaces.items(): if iface_data['addrs']: for addr in iface_data['addrs']: if addr['type'] == libvirt.VIR_IP_ADDR_TYPE_IPV4: vm_info['ansible_host'] = addr['addr'] break except: pass # Set ProxyJump if hypervisor requires it if hypervisor_config.get('proxy_jump'): proxy_args = f"-o ProxyJump={hypervisor_config['name']} -o StrictHostKeyChecking=accept-new" vm_info['ansible_ssh_common_args'] = proxy_args # Add to inventory self.inventory['_meta']['hostvars'][vm_name] = vm_info # Categorize by state if vm_state == libvirt.VIR_DOMAIN_RUNNING: self.inventory['running_vms']['hosts'].append(vm_name) else: self.inventory['stopped_vms']['hosts'].append(vm_name) def _state_to_string(self, state: int) -> str: """ Convert libvirt domain state code to human-readable string. Args: state: Libvirt state code Returns: Human-readable state string """ states = { libvirt.VIR_DOMAIN_NOSTATE: 'no_state', libvirt.VIR_DOMAIN_RUNNING: 'running', libvirt.VIR_DOMAIN_BLOCKED: 'blocked', libvirt.VIR_DOMAIN_PAUSED: 'paused', libvirt.VIR_DOMAIN_SHUTDOWN: 'shutdown', libvirt.VIR_DOMAIN_SHUTOFF: 'shutoff', libvirt.VIR_DOMAIN_CRASHED: 'crashed', libvirt.VIR_DOMAIN_PMSUSPENDED: 'suspended' } return states.get(state, 'unknown') def get_host_vars(self, hostname: str) -> Dict[str, Any]: """ Get variables for a specific host. Args: hostname: Host name Returns: Dictionary of host variables """ # Generate inventory first if not already done if not self.inventory['_meta']['hostvars']: self.generate_inventory() return self.inventory['_meta']['hostvars'].get(hostname, {}) def main(): """Main entry point for dynamic inventory script.""" parser = argparse.ArgumentParser( description='Ansible dynamic inventory from libvirt/KVM' ) parser.add_argument( '--list', action='store_true', help='List all hosts and groups' ) parser.add_argument( '--host', help='Get variables for specific host' ) args = parser.parse_args() # Initialize libvirt inventory inventory = LibvirtInventory() if args.list: # Return full inventory result = inventory.generate_inventory() print(json.dumps(result, indent=2)) elif args.host: # Return host variables host_vars = inventory.get_host_vars(args.host) print(json.dumps(host_vars, indent=2)) else: parser.print_help() sys.exit(1) if __name__ == '__main__': main()