Initial commit: Ansible infrastructure automation

- Add comprehensive Ansible guidelines and best practices (CLAUDE.md)
- Add infrastructure inventory documentation
- Add VM deployment playbooks and configurations
- Add dynamic inventory plugins (libvirt_kvm, ssh_config)
- Add cloud-init and preseed configurations for automated deployments
- Add security-first configuration templates
- Add role and setup documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-10 23:02:32 +01:00
parent 5ba666dfbf
commit 455133c600
17 changed files with 2983 additions and 0 deletions

354
plugins/inventory/libvirt_kvm.py Executable file
View File

@@ -0,0 +1,354 @@
#!/usr/bin/env python3
"""
Libvirt/KVM Dynamic Inventory Plugin for Ansible
=================================================
Queries libvirt hypervisors to dynamically discover KVM guest VMs.
Features:
- Discovers running VMs from libvirt
- Categorizes VMs by state, network, and metadata
- Supports multiple hypervisors
- Caches results for performance
- ProxyJump configuration for nested VM access
Requirements:
- python3-libvirt
- SSH access to hypervisor(s)
Configuration:
Set hypervisor connection URIs in inventory config or environment
Author: Ansible Infrastructure Team
Version: 1.0.0
"""
import argparse
import json
import os
import sys
import xml.etree.ElementTree as ET
from typing import Dict, List, Any, Optional
from collections import defaultdict
try:
import libvirt
HAS_LIBVIRT = True
except ImportError:
HAS_LIBVIRT = False
class LibvirtInventory:
"""Generate Ansible inventory from libvirt/KVM infrastructure."""
def __init__(self):
"""Initialize libvirt inventory generator."""
self.inventory = {
'all': {
'children': ['hypervisors', 'kvm_guests']
},
'hypervisors': {
'hosts': []
},
'kvm_guests': {
'children': ['running_vms', 'stopped_vms'],
'vars': {
'ansible_user': 'ansible',
'ansible_python_interpreter': '/usr/bin/python3'
}
},
'running_vms': {
'hosts': []
},
'stopped_vms': {
'hosts': []
},
'_meta': {
'hostvars': {}
}
}
# Hypervisor configurations
self.hypervisors = self._load_hypervisor_config()
def _load_hypervisor_config(self) -> List[Dict[str, str]]:
"""
Load hypervisor connection configuration.
Returns:
List of hypervisor configuration dictionaries
"""
# Default hypervisor from environment or config
hypervisors = []
# Check environment variables
libvirt_uri = os.environ.get('LIBVIRT_DEFAULT_URI', 'qemu+ssh://grok@grok.home.serneels.xyz/system')
hypervisor_name = os.environ.get('LIBVIRT_HYPERVISOR_NAME', 'grokbox')
hypervisors.append({
'name': hypervisor_name,
'uri': libvirt_uri,
'proxy_jump': True
})
return hypervisors
def generate_inventory(self) -> Dict[str, Any]:
"""
Generate complete inventory by querying all configured hypervisors.
Returns:
Ansible inventory dictionary
"""
if not HAS_LIBVIRT:
print("Warning: python3-libvirt not installed. Returning empty inventory.",
file=sys.stderr)
print("Install with: apt-get install python3-libvirt (Debian/Ubuntu)",
file=sys.stderr)
print(" or: dnf install python3-libvirt (RHEL/Fedora)",
file=sys.stderr)
return self.inventory
for hypervisor in self.hypervisors:
try:
self._query_hypervisor(hypervisor)
except Exception as e:
print(f"Warning: Failed to query hypervisor {hypervisor['name']}: {e}",
file=sys.stderr)
# Add hypervisor to inventory even if connection fails
self._add_hypervisor_host(hypervisor)
return self.inventory
def _query_hypervisor(self, hypervisor_config: Dict[str, str]):
"""
Query a libvirt hypervisor for VM information.
Args:
hypervisor_config: Hypervisor connection configuration
"""
try:
# Connect to hypervisor
conn = libvirt.open(hypervisor_config['uri'])
if conn is None:
raise Exception(f"Failed to open connection to {hypervisor_config['uri']}")
# Add hypervisor to inventory
self._add_hypervisor_host(hypervisor_config, conn)
# List all domains (VMs)
domains = conn.listAllDomains(0)
for domain in domains:
self._process_domain(domain, hypervisor_config)
conn.close()
except libvirt.libvirtError as e:
raise Exception(f"Libvirt error: {e}")
def _add_hypervisor_host(self, hypervisor_config: Dict[str, str], conn=None):
"""
Add hypervisor to inventory.
Args:
hypervisor_config: Hypervisor configuration
conn: Libvirt connection object (optional)
"""
hypervisor_name = hypervisor_config['name']
if hypervisor_name not in self.inventory['hypervisors']['hosts']:
self.inventory['hypervisors']['hosts'].append(hypervisor_name)
# Extract hostname from URI
uri = hypervisor_config['uri']
if '@' in uri:
user_host = uri.split('@')[1].split('/')[0]
ansible_host = user_host
ansible_user = uri.split('@')[0].split('://')[-1]
else:
ansible_host = 'localhost'
ansible_user = 'root'
hostvars = {
'ansible_host': ansible_host,
'ansible_user': ansible_user,
'libvirt_uri': uri,
'hypervisor_type': 'kvm',
'ansible_python_interpreter': '/usr/bin/python3'
}
# Add hypervisor stats if connection is available
if conn:
try:
info = conn.getInfo()
hostvars.update({
'hypervisor_model': info[0],
'hypervisor_memory_mb': info[1],
'hypervisor_cpus': info[2],
'hypervisor_mhz': info[3],
'hypervisor_nodes': info[4],
'hypervisor_sockets': info[5],
'hypervisor_cores': info[6],
'hypervisor_threads': info[7]
})
except:
pass
self.inventory['_meta']['hostvars'][hypervisor_name] = hostvars
def _process_domain(self, domain, hypervisor_config: Dict[str, str]):
"""
Process a libvirt domain (VM) and add to inventory.
Args:
domain: Libvirt domain object
hypervisor_config: Parent hypervisor configuration
"""
vm_name = domain.name()
vm_state = domain.state()[0]
vm_uuid = domain.UUIDString()
# Parse VM XML for detailed information
xml_desc = domain.XMLDesc(0)
root = ET.fromstring(xml_desc)
# Extract VM metadata
vm_info = {
'vm_name': vm_name,
'vm_uuid': vm_uuid,
'vm_state': self._state_to_string(vm_state),
'vm_state_code': vm_state,
'hypervisor': hypervisor_config['name'],
'ansible_user': 'ansible',
'ansible_python_interpreter': '/usr/bin/python3'
}
# Get vCPU count
vcpu_elem = root.find('.//vcpu')
if vcpu_elem is not None:
vm_info['vm_vcpus'] = int(vcpu_elem.text)
# Get memory
memory_elem = root.find('.//memory')
if memory_elem is not None:
memory_kb = int(memory_elem.text)
vm_info['vm_memory_mb'] = memory_kb // 1024
# Get network information
interfaces = root.findall('.//interface')
network_info = []
for iface in interfaces:
mac = iface.find('.//mac')
source = iface.find('.//source')
if mac is not None:
iface_info = {'mac': mac.get('address')}
if source is not None:
iface_info['network'] = source.get('network') or source.get('bridge')
network_info.append(iface_info)
if network_info:
vm_info['vm_networks'] = network_info
# Try to get IP address if VM is running
if vm_state == libvirt.VIR_DOMAIN_RUNNING:
try:
ifaces = domain.interfaceAddresses(libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE)
for iface_name, iface_data in ifaces.items():
if iface_data['addrs']:
for addr in iface_data['addrs']:
if addr['type'] == libvirt.VIR_IP_ADDR_TYPE_IPV4:
vm_info['ansible_host'] = addr['addr']
break
except:
pass
# Set ProxyJump if hypervisor requires it
if hypervisor_config.get('proxy_jump'):
proxy_args = f"-o ProxyJump={hypervisor_config['name']} -o StrictHostKeyChecking=accept-new"
vm_info['ansible_ssh_common_args'] = proxy_args
# Add to inventory
self.inventory['_meta']['hostvars'][vm_name] = vm_info
# Categorize by state
if vm_state == libvirt.VIR_DOMAIN_RUNNING:
self.inventory['running_vms']['hosts'].append(vm_name)
else:
self.inventory['stopped_vms']['hosts'].append(vm_name)
def _state_to_string(self, state: int) -> str:
"""
Convert libvirt domain state code to human-readable string.
Args:
state: Libvirt state code
Returns:
Human-readable state string
"""
states = {
libvirt.VIR_DOMAIN_NOSTATE: 'no_state',
libvirt.VIR_DOMAIN_RUNNING: 'running',
libvirt.VIR_DOMAIN_BLOCKED: 'blocked',
libvirt.VIR_DOMAIN_PAUSED: 'paused',
libvirt.VIR_DOMAIN_SHUTDOWN: 'shutdown',
libvirt.VIR_DOMAIN_SHUTOFF: 'shutoff',
libvirt.VIR_DOMAIN_CRASHED: 'crashed',
libvirt.VIR_DOMAIN_PMSUSPENDED: 'suspended'
}
return states.get(state, 'unknown')
def get_host_vars(self, hostname: str) -> Dict[str, Any]:
"""
Get variables for a specific host.
Args:
hostname: Host name
Returns:
Dictionary of host variables
"""
# Generate inventory first if not already done
if not self.inventory['_meta']['hostvars']:
self.generate_inventory()
return self.inventory['_meta']['hostvars'].get(hostname, {})
def main():
"""Main entry point for dynamic inventory script."""
parser = argparse.ArgumentParser(
description='Ansible dynamic inventory from libvirt/KVM'
)
parser.add_argument(
'--list',
action='store_true',
help='List all hosts and groups'
)
parser.add_argument(
'--host',
help='Get variables for specific host'
)
args = parser.parse_args()
# Initialize libvirt inventory
inventory = LibvirtInventory()
if args.list:
# Return full inventory
result = inventory.generate_inventory()
print(json.dumps(result, indent=2))
elif args.host:
# Return host variables
host_vars = inventory.get_host_vars(args.host)
print(json.dumps(host_vars, indent=2))
else:
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,248 @@
#!/usr/bin/env python3
"""
Dynamic Inventory Script - SSH Config Parser
==============================================
Parses ~/.ssh/config to generate Ansible dynamic inventory.
Usage:
python ssh_config_inventory.py --list
python ssh_config_inventory.py --host <hostname>
Requirements:
- Python 3.6+
- paramiko (optional, for advanced SSH config parsing)
Author: Ansible Infrastructure Team
Version: 1.0.0
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
class SSHConfigParser:
"""Parse SSH config file and extract host configurations."""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize SSH config parser.
Args:
config_path: Path to SSH config file. Defaults to ~/.ssh/config
"""
if config_path is None:
config_path = os.path.expanduser("~/.ssh/config")
self.config_path = config_path
self.hosts = {}
self.groups = {
'all': {
'children': ['external_hosts', 'hypervisors', 'kvm_guests']
},
'external_hosts': {'hosts': []},
'hypervisors': {'hosts': []},
'kvm_guests': {
'children': ['dns_servers', 'mail_servers', 'development', 'uncategorized'],
'vars': {
'ansible_user': 'ansible',
'ansible_ssh_common_args': '-o StrictHostKeyChecking=accept-new'
}
},
'dns_servers': {'hosts': []},
'mail_servers': {'hosts': []},
'development': {'hosts': []},
'uncategorized': {'hosts': []},
'_meta': {
'hostvars': {}
}
}
def parse_config(self) -> Dict[str, Any]:
"""
Parse SSH config file and build inventory structure.
Returns:
Dictionary containing Ansible inventory structure
"""
if not os.path.exists(self.config_path):
print(f"Warning: SSH config not found at {self.config_path}", file=sys.stderr)
return self.groups
current_host = None
current_config = {}
with open(self.config_path, 'r') as f:
for line in f:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#'):
continue
# Match Host declarations
host_match = re.match(r'^Host\s+(.+)$', line, re.IGNORECASE)
if host_match:
# Save previous host if exists
if current_host and current_host != '*':
self._add_host(current_host, current_config)
# Start new host
current_host = host_match.group(1).strip()
current_config = {'ansible_host': current_host}
continue
# Parse host configuration options
if current_host and current_host != '*':
self._parse_option(line, current_config)
# Add last host
if current_host and current_host != '*':
self._add_host(current_host, current_config)
return self.groups
def _parse_option(self, line: str, config: Dict[str, str]):
"""Parse SSH config option line."""
# Match key-value pairs (Hostname, User, Port, etc.)
match = re.match(r'^(\w+)\s+(.+)$', line, re.IGNORECASE)
if not match:
return
key = match.group(1).lower()
value = match.group(2).strip()
# Map SSH config options to Ansible variables
option_map = {
'hostname': 'ansible_host',
'user': 'ansible_user',
'port': 'ansible_port',
'proxyjump': 'ansible_ssh_common_args',
'forwardagent': 'ansible_ssh_extra_args',
}
if key == 'hostname':
config['ansible_host'] = value
elif key == 'user':
config['ansible_user'] = value
elif key == 'port':
config['ansible_port'] = int(value)
elif key == 'proxyjump':
# Build ProxyJump SSH args
existing_args = config.get('ansible_ssh_common_args', '')
proxy_arg = f"-o ProxyJump={value}"
config['ansible_ssh_common_args'] = f"{existing_args} {proxy_arg}".strip()
elif key == 'forwardagent':
if value.lower() == 'yes':
config['ansible_ssh_extra_args'] = '-o ForwardAgent=yes'
elif key == 'hostkeyalias':
config['host_key_alias'] = value
def _add_host(self, hostname: str, config: Dict[str, Any]):
"""Add host to inventory and categorize into groups."""
# Store host configuration
self.hosts[hostname] = config
self.groups['_meta']['hostvars'][hostname] = config
# Categorize host based on characteristics
group = self._categorize_host(hostname, config)
if group and group in self.groups:
self.groups[group]['hosts'].append(hostname)
def _categorize_host(self, hostname: str, config: Dict[str, Any]) -> Optional[str]:
"""
Categorize host into appropriate group based on naming and config.
Args:
hostname: Host name
config: Host configuration dictionary
Returns:
Group name or None
"""
ansible_user = config.get('ansible_user', '')
has_proxyjump = 'ProxyJump' in config.get('ansible_ssh_common_args', '')
ansible_host = config.get('ansible_host', '')
# Categorization logic
# 1. Hypervisors - have ForwardAgent and specific users
if ansible_user in ['grok'] or 'ForwardAgent' in config.get('ansible_ssh_extra_args', ''):
return 'hypervisors'
# 2. External hosts - public IPs, no ProxyJump
if not has_proxyjump and ansible_user not in ['ansible']:
# Check if it looks like a public IP or external hostname
if re.match(r'^\d+\.\d+\.\d+\.\d+$', ansible_host) or 'home' not in ansible_host:
if not ansible_host.startswith('192.168.'):
return 'external_hosts'
# 3. KVM guests - use ansible user and ProxyJump
if has_proxyjump and ansible_user == 'ansible':
# Categorize by service/role based on hostname
if 'pihole' in hostname or 'dns' in hostname:
return 'dns_servers'
elif 'mail' in hostname or 'mx' in hostname or hostname == 'mymx':
return 'mail_servers'
elif 'derp' in hostname or 'dev' in hostname or 'test' in hostname:
return 'development'
else:
return 'uncategorized'
return None
def get_host_vars(self, hostname: str) -> Dict[str, Any]:
"""
Get variables for a specific host.
Args:
hostname: Host name
Returns:
Dictionary of host variables
"""
return self.groups['_meta']['hostvars'].get(hostname, {})
def main():
"""Main entry point for dynamic inventory script."""
parser = argparse.ArgumentParser(
description='Ansible dynamic inventory from SSH config'
)
parser.add_argument(
'--list',
action='store_true',
help='List all hosts and groups'
)
parser.add_argument(
'--host',
help='Get variables for specific host'
)
args = parser.parse_args()
# Initialize SSH config parser
ssh_parser = SSHConfigParser()
if args.list:
# Return full inventory
inventory = ssh_parser.parse_config()
print(json.dumps(inventory, indent=2))
elif args.host:
# Return host variables
ssh_parser.parse_config()
host_vars = ssh_parser.get_host_vars(args.host)
print(json.dumps(host_vars, indent=2))
else:
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()