- Add comprehensive Ansible guidelines and best practices (CLAUDE.md) - Add infrastructure inventory documentation - Add VM deployment playbooks and configurations - Add dynamic inventory plugins (libvirt_kvm, ssh_config) - Add cloud-init and preseed configurations for automated deployments - Add security-first configuration templates - Add role and setup documentation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
355 lines
11 KiB
Python
Executable File
355 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Libvirt/KVM Dynamic Inventory Plugin for Ansible
|
|
=================================================
|
|
Queries libvirt hypervisors to dynamically discover KVM guest VMs.
|
|
|
|
Features:
|
|
- Discovers running VMs from libvirt
|
|
- Categorizes VMs by state, network, and metadata
|
|
- Supports multiple hypervisors
|
|
- Caches results for performance
|
|
- ProxyJump configuration for nested VM access
|
|
|
|
Requirements:
|
|
- python3-libvirt
|
|
- SSH access to hypervisor(s)
|
|
|
|
Configuration:
|
|
Set hypervisor connection URIs in inventory config or environment
|
|
|
|
Author: Ansible Infrastructure Team
|
|
Version: 1.0.0
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import xml.etree.ElementTree as ET
|
|
from typing import Dict, List, Any, Optional
|
|
from collections import defaultdict
|
|
|
|
try:
|
|
import libvirt
|
|
HAS_LIBVIRT = True
|
|
except ImportError:
|
|
HAS_LIBVIRT = False
|
|
|
|
|
|
class LibvirtInventory:
|
|
"""Generate Ansible inventory from libvirt/KVM infrastructure."""
|
|
|
|
def __init__(self):
|
|
"""Initialize libvirt inventory generator."""
|
|
self.inventory = {
|
|
'all': {
|
|
'children': ['hypervisors', 'kvm_guests']
|
|
},
|
|
'hypervisors': {
|
|
'hosts': []
|
|
},
|
|
'kvm_guests': {
|
|
'children': ['running_vms', 'stopped_vms'],
|
|
'vars': {
|
|
'ansible_user': 'ansible',
|
|
'ansible_python_interpreter': '/usr/bin/python3'
|
|
}
|
|
},
|
|
'running_vms': {
|
|
'hosts': []
|
|
},
|
|
'stopped_vms': {
|
|
'hosts': []
|
|
},
|
|
'_meta': {
|
|
'hostvars': {}
|
|
}
|
|
}
|
|
|
|
# Hypervisor configurations
|
|
self.hypervisors = self._load_hypervisor_config()
|
|
|
|
def _load_hypervisor_config(self) -> List[Dict[str, str]]:
|
|
"""
|
|
Load hypervisor connection configuration.
|
|
|
|
Returns:
|
|
List of hypervisor configuration dictionaries
|
|
"""
|
|
# Default hypervisor from environment or config
|
|
hypervisors = []
|
|
|
|
# Check environment variables
|
|
libvirt_uri = os.environ.get('LIBVIRT_DEFAULT_URI', 'qemu+ssh://grok@grok.home.serneels.xyz/system')
|
|
hypervisor_name = os.environ.get('LIBVIRT_HYPERVISOR_NAME', 'grokbox')
|
|
|
|
hypervisors.append({
|
|
'name': hypervisor_name,
|
|
'uri': libvirt_uri,
|
|
'proxy_jump': True
|
|
})
|
|
|
|
return hypervisors
|
|
|
|
def generate_inventory(self) -> Dict[str, Any]:
|
|
"""
|
|
Generate complete inventory by querying all configured hypervisors.
|
|
|
|
Returns:
|
|
Ansible inventory dictionary
|
|
"""
|
|
if not HAS_LIBVIRT:
|
|
print("Warning: python3-libvirt not installed. Returning empty inventory.",
|
|
file=sys.stderr)
|
|
print("Install with: apt-get install python3-libvirt (Debian/Ubuntu)",
|
|
file=sys.stderr)
|
|
print(" or: dnf install python3-libvirt (RHEL/Fedora)",
|
|
file=sys.stderr)
|
|
return self.inventory
|
|
|
|
for hypervisor in self.hypervisors:
|
|
try:
|
|
self._query_hypervisor(hypervisor)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to query hypervisor {hypervisor['name']}: {e}",
|
|
file=sys.stderr)
|
|
# Add hypervisor to inventory even if connection fails
|
|
self._add_hypervisor_host(hypervisor)
|
|
|
|
return self.inventory
|
|
|
|
def _query_hypervisor(self, hypervisor_config: Dict[str, str]):
|
|
"""
|
|
Query a libvirt hypervisor for VM information.
|
|
|
|
Args:
|
|
hypervisor_config: Hypervisor connection configuration
|
|
"""
|
|
try:
|
|
# Connect to hypervisor
|
|
conn = libvirt.open(hypervisor_config['uri'])
|
|
if conn is None:
|
|
raise Exception(f"Failed to open connection to {hypervisor_config['uri']}")
|
|
|
|
# Add hypervisor to inventory
|
|
self._add_hypervisor_host(hypervisor_config, conn)
|
|
|
|
# List all domains (VMs)
|
|
domains = conn.listAllDomains(0)
|
|
|
|
for domain in domains:
|
|
self._process_domain(domain, hypervisor_config)
|
|
|
|
conn.close()
|
|
|
|
except libvirt.libvirtError as e:
|
|
raise Exception(f"Libvirt error: {e}")
|
|
|
|
def _add_hypervisor_host(self, hypervisor_config: Dict[str, str], conn=None):
|
|
"""
|
|
Add hypervisor to inventory.
|
|
|
|
Args:
|
|
hypervisor_config: Hypervisor configuration
|
|
conn: Libvirt connection object (optional)
|
|
"""
|
|
hypervisor_name = hypervisor_config['name']
|
|
|
|
if hypervisor_name not in self.inventory['hypervisors']['hosts']:
|
|
self.inventory['hypervisors']['hosts'].append(hypervisor_name)
|
|
|
|
# Extract hostname from URI
|
|
uri = hypervisor_config['uri']
|
|
if '@' in uri:
|
|
user_host = uri.split('@')[1].split('/')[0]
|
|
ansible_host = user_host
|
|
ansible_user = uri.split('@')[0].split('://')[-1]
|
|
else:
|
|
ansible_host = 'localhost'
|
|
ansible_user = 'root'
|
|
|
|
hostvars = {
|
|
'ansible_host': ansible_host,
|
|
'ansible_user': ansible_user,
|
|
'libvirt_uri': uri,
|
|
'hypervisor_type': 'kvm',
|
|
'ansible_python_interpreter': '/usr/bin/python3'
|
|
}
|
|
|
|
# Add hypervisor stats if connection is available
|
|
if conn:
|
|
try:
|
|
info = conn.getInfo()
|
|
hostvars.update({
|
|
'hypervisor_model': info[0],
|
|
'hypervisor_memory_mb': info[1],
|
|
'hypervisor_cpus': info[2],
|
|
'hypervisor_mhz': info[3],
|
|
'hypervisor_nodes': info[4],
|
|
'hypervisor_sockets': info[5],
|
|
'hypervisor_cores': info[6],
|
|
'hypervisor_threads': info[7]
|
|
})
|
|
except:
|
|
pass
|
|
|
|
self.inventory['_meta']['hostvars'][hypervisor_name] = hostvars
|
|
|
|
def _process_domain(self, domain, hypervisor_config: Dict[str, str]):
|
|
"""
|
|
Process a libvirt domain (VM) and add to inventory.
|
|
|
|
Args:
|
|
domain: Libvirt domain object
|
|
hypervisor_config: Parent hypervisor configuration
|
|
"""
|
|
vm_name = domain.name()
|
|
vm_state = domain.state()[0]
|
|
vm_uuid = domain.UUIDString()
|
|
|
|
# Parse VM XML for detailed information
|
|
xml_desc = domain.XMLDesc(0)
|
|
root = ET.fromstring(xml_desc)
|
|
|
|
# Extract VM metadata
|
|
vm_info = {
|
|
'vm_name': vm_name,
|
|
'vm_uuid': vm_uuid,
|
|
'vm_state': self._state_to_string(vm_state),
|
|
'vm_state_code': vm_state,
|
|
'hypervisor': hypervisor_config['name'],
|
|
'ansible_user': 'ansible',
|
|
'ansible_python_interpreter': '/usr/bin/python3'
|
|
}
|
|
|
|
# Get vCPU count
|
|
vcpu_elem = root.find('.//vcpu')
|
|
if vcpu_elem is not None:
|
|
vm_info['vm_vcpus'] = int(vcpu_elem.text)
|
|
|
|
# Get memory
|
|
memory_elem = root.find('.//memory')
|
|
if memory_elem is not None:
|
|
memory_kb = int(memory_elem.text)
|
|
vm_info['vm_memory_mb'] = memory_kb // 1024
|
|
|
|
# Get network information
|
|
interfaces = root.findall('.//interface')
|
|
network_info = []
|
|
for iface in interfaces:
|
|
mac = iface.find('.//mac')
|
|
source = iface.find('.//source')
|
|
if mac is not None:
|
|
iface_info = {'mac': mac.get('address')}
|
|
if source is not None:
|
|
iface_info['network'] = source.get('network') or source.get('bridge')
|
|
network_info.append(iface_info)
|
|
|
|
if network_info:
|
|
vm_info['vm_networks'] = network_info
|
|
|
|
# Try to get IP address if VM is running
|
|
if vm_state == libvirt.VIR_DOMAIN_RUNNING:
|
|
try:
|
|
ifaces = domain.interfaceAddresses(libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE)
|
|
for iface_name, iface_data in ifaces.items():
|
|
if iface_data['addrs']:
|
|
for addr in iface_data['addrs']:
|
|
if addr['type'] == libvirt.VIR_IP_ADDR_TYPE_IPV4:
|
|
vm_info['ansible_host'] = addr['addr']
|
|
break
|
|
except:
|
|
pass
|
|
|
|
# Set ProxyJump if hypervisor requires it
|
|
if hypervisor_config.get('proxy_jump'):
|
|
proxy_args = f"-o ProxyJump={hypervisor_config['name']} -o StrictHostKeyChecking=accept-new"
|
|
vm_info['ansible_ssh_common_args'] = proxy_args
|
|
|
|
# Add to inventory
|
|
self.inventory['_meta']['hostvars'][vm_name] = vm_info
|
|
|
|
# Categorize by state
|
|
if vm_state == libvirt.VIR_DOMAIN_RUNNING:
|
|
self.inventory['running_vms']['hosts'].append(vm_name)
|
|
else:
|
|
self.inventory['stopped_vms']['hosts'].append(vm_name)
|
|
|
|
def _state_to_string(self, state: int) -> str:
|
|
"""
|
|
Convert libvirt domain state code to human-readable string.
|
|
|
|
Args:
|
|
state: Libvirt state code
|
|
|
|
Returns:
|
|
Human-readable state string
|
|
"""
|
|
states = {
|
|
libvirt.VIR_DOMAIN_NOSTATE: 'no_state',
|
|
libvirt.VIR_DOMAIN_RUNNING: 'running',
|
|
libvirt.VIR_DOMAIN_BLOCKED: 'blocked',
|
|
libvirt.VIR_DOMAIN_PAUSED: 'paused',
|
|
libvirt.VIR_DOMAIN_SHUTDOWN: 'shutdown',
|
|
libvirt.VIR_DOMAIN_SHUTOFF: 'shutoff',
|
|
libvirt.VIR_DOMAIN_CRASHED: 'crashed',
|
|
libvirt.VIR_DOMAIN_PMSUSPENDED: 'suspended'
|
|
}
|
|
return states.get(state, 'unknown')
|
|
|
|
def get_host_vars(self, hostname: str) -> Dict[str, Any]:
|
|
"""
|
|
Get variables for a specific host.
|
|
|
|
Args:
|
|
hostname: Host name
|
|
|
|
Returns:
|
|
Dictionary of host variables
|
|
"""
|
|
# Generate inventory first if not already done
|
|
if not self.inventory['_meta']['hostvars']:
|
|
self.generate_inventory()
|
|
|
|
return self.inventory['_meta']['hostvars'].get(hostname, {})
|
|
|
|
|
|
def main():
|
|
"""Main entry point for dynamic inventory script."""
|
|
parser = argparse.ArgumentParser(
|
|
description='Ansible dynamic inventory from libvirt/KVM'
|
|
)
|
|
parser.add_argument(
|
|
'--list',
|
|
action='store_true',
|
|
help='List all hosts and groups'
|
|
)
|
|
parser.add_argument(
|
|
'--host',
|
|
help='Get variables for specific host'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize libvirt inventory
|
|
inventory = LibvirtInventory()
|
|
|
|
if args.list:
|
|
# Return full inventory
|
|
result = inventory.generate_inventory()
|
|
print(json.dumps(result, indent=2))
|
|
|
|
elif args.host:
|
|
# Return host variables
|
|
host_vars = inventory.get_host_vars(args.host)
|
|
print(json.dumps(host_vars, indent=2))
|
|
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|