#!/usr/bin/env python3 """ WireGuard Manager for Personal Internet Cell Handles WireGuard VPN configuration and peer management """ import os import json import subprocess import logging from datetime import datetime from typing import Dict, List, Optional, Any from base_service_manager import BaseServiceManager logger = logging.getLogger(__name__) class WireGuardManager(BaseServiceManager): """Manages WireGuard VPN configuration and peers""" def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'): super().__init__('wireguard', data_dir, config_dir) self.wg_config_dir = os.path.join(config_dir, 'wireguard') self.peers_dir = os.path.join(data_dir, 'wireguard', 'peers') # Ensure directories exist os.makedirs(self.wg_config_dir, exist_ok=True) os.makedirs(self.peers_dir, exist_ok=True) def get_status(self) -> Dict[str, Any]: """Get WireGuard service status""" try: # Check if we're running in Docker environment import os is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true' if is_docker: # Check if WireGuard container is actually running container_running = self._check_wireguard_container_status() status = { 'running': container_running, 'status': 'online' if container_running else 'offline', 'interface': 'wg0' if container_running else 'unknown', 'peers_count': len(self._get_configured_peers()) if container_running else 0, 'total_traffic': self._get_traffic_stats() if container_running else {'bytes_sent': 0, 'bytes_received': 0}, 'timestamp': datetime.utcnow().isoformat() } else: # Check actual service status in production status = { 'running': self._check_wireguard_status(), 'status': 'online' if self._check_wireguard_status() else 'offline', 'interface': 'wg0', 'peers_count': len(self._get_configured_peers()), 'total_traffic': self._get_traffic_stats(), 'timestamp': datetime.utcnow().isoformat() } return status except Exception as e: return self.handle_error(e, "get_status") def test_connectivity(self) -> Dict[str, Any]: """Test WireGuard connectivity""" try: # Test if WireGuard interface exists and is up interface_up = self._check_interface_status() # Test if peers can connect peers_connectivity = self._test_peers_connectivity() results = { 'interface_up': interface_up, 'peers_connectivity': peers_connectivity, 'success': interface_up and all(peers_connectivity.values()), 'timestamp': datetime.utcnow().isoformat() } return results except Exception as e: return self.handle_error(e, "test_connectivity") def _check_wireguard_status(self) -> bool: """Check if WireGuard service is running""" try: # Check if wg0 interface exists result = subprocess.run(['ip', 'link', 'show', 'wg0'], capture_output=True, text=True, timeout=5) return result.returncode == 0 except Exception: return False def _check_wireguard_container_status(self) -> bool: """Check if WireGuard Docker container is running""" try: import docker client = docker.from_env() containers = client.containers.list(filters={'name': 'cell-wireguard'}) return len(containers) > 0 except Exception: return False def _check_interface_status(self) -> bool: """Check if WireGuard interface is up""" try: result = subprocess.run(['ip', 'link', 'show', 'wg0'], capture_output=True, text=True, timeout=5) if result.returncode == 0: return 'UP' in result.stdout return False except Exception: return False def _get_configured_peers(self) -> List[Dict[str, Any]]: """Get list of configured peers""" peers = [] try: # Read peer configurations from peers directory for filename in os.listdir(self.peers_dir): if filename.endswith('.conf'): peer_name = filename[:-5] # Remove .conf extension peer_file = os.path.join(self.peers_dir, filename) with open(peer_file, 'r') as f: content = f.read() # Parse peer configuration peer_config = self._parse_peer_config(content) peer_config['name'] = peer_name peers.append(peer_config) except Exception as e: logger.error(f"Error reading peer configurations: {e}") return peers def _parse_peer_config(self, content: str) -> Dict[str, Any]: """Parse WireGuard peer configuration""" config = {} lines = content.strip().split('\n') for line in lines: line = line.strip() if line.startswith('[Peer]'): continue elif '=' in line: key, value = line.split('=', 1) config[key.strip()] = value.strip() return config def _get_traffic_stats(self) -> Dict[str, int]: """Get WireGuard traffic statistics""" try: result = subprocess.run(['wg', 'show', 'wg0', 'transfer'], capture_output=True, text=True, timeout=5) if result.returncode == 0: lines = result.stdout.strip().split('\n') total_rx = 0 total_tx = 0 for line in lines: if line.strip(): parts = line.split() if len(parts) >= 3: try: rx = int(parts[1]) tx = int(parts[2]) total_rx += rx total_tx += tx except ValueError: continue return { 'bytes_received': total_rx, 'bytes_sent': total_tx } except Exception as e: logger.error(f"Error getting traffic stats: {e}") return {'bytes_received': 0, 'bytes_sent': 0} def _test_peers_connectivity(self) -> Dict[str, bool]: """Test connectivity to all peers""" connectivity = {} peers = self._get_configured_peers() for peer in peers: peer_name = peer.get('name', 'unknown') allowed_ips = peer.get('AllowedIPs', '') if allowed_ips: # Extract first IP from AllowedIPs ip = allowed_ips.split(',')[0].split('/')[0] try: # Ping the peer IP result = subprocess.run(['ping', '-c', '1', '-W', '2', ip], capture_output=True, text=True, timeout=5) connectivity[peer_name] = result.returncode == 0 except Exception: connectivity[peer_name] = False else: connectivity[peer_name] = False return connectivity def get_wireguard_status(self) -> Dict[str, Any]: """Get detailed WireGuard status""" try: status = self.get_status() # Get peer details peers = self._get_configured_peers() peer_details = [] for peer in peers: peer_detail = { 'name': peer.get('name', 'unknown'), 'public_key': peer.get('PublicKey', ''), 'allowed_ips': peer.get('AllowedIPs', ''), 'endpoint': peer.get('Endpoint', ''), 'last_handshake': peer.get('LastHandshake', ''), 'transfer_rx': peer.get('TransferRx', 0), 'transfer_tx': peer.get('TransferTx', 0) } peer_details.append(peer_detail) status['peers'] = peer_details return status except Exception as e: return self.handle_error(e, "get_wireguard_status") def get_wireguard_peers(self) -> List[Dict[str, Any]]: """Get all WireGuard peers""" try: peers = self._get_configured_peers() return peers except Exception as e: logger.error(f"Error getting WireGuard peers: {e}") return [] def add_wireguard_peer(self, name: str, public_key: str, allowed_ips: str, endpoint: str = '', persistent_keepalive: int = 25) -> bool: """Add a new WireGuard peer""" try: # Create peer configuration peer_config = f"""[Peer] PublicKey = {public_key} AllowedIPs = {allowed_ips} """ if endpoint: peer_config += f"Endpoint = {endpoint}\n" if persistent_keepalive: peer_config += f"PersistentKeepalive = {persistent_keepalive}\n" # Save peer configuration peer_file = os.path.join(self.peers_dir, f'{name}.conf') with open(peer_file, 'w') as f: f.write(peer_config) # Reload WireGuard configuration self._reload_wireguard_config() logger.info(f"Added WireGuard peer: {name}") return True except Exception as e: logger.error(f"Failed to add WireGuard peer {name}: {e}") return False def remove_wireguard_peer(self, name: str) -> bool: """Remove a WireGuard peer""" try: peer_file = os.path.join(self.peers_dir, f'{name}.conf') if os.path.exists(peer_file): os.remove(peer_file) # Reload WireGuard configuration self._reload_wireguard_config() logger.info(f"Removed WireGuard peer: {name}") return True else: logger.warning(f"Peer file not found: {peer_file}") return False except Exception as e: logger.error(f"Failed to remove WireGuard peer {name}: {e}") return False def generate_peer_keys(self, peer_name: str) -> Dict[str, str]: """Generate WireGuard keys for a peer""" try: # Generate private key private_key_result = subprocess.run(['wg', 'genkey'], capture_output=True, text=True, timeout=10) if private_key_result.returncode != 0: raise Exception("Failed to generate private key") private_key = private_key_result.stdout.strip() # Generate public key from private key public_key_result = subprocess.run(['wg', 'pubkey'], input=private_key, capture_output=True, text=True, timeout=10) if public_key_result.returncode != 0: raise Exception("Failed to generate public key") public_key = public_key_result.stdout.strip() # Save keys to file keys_file = os.path.join(self.peers_dir, f'{peer_name}_keys.json') keys_data = { 'private_key': private_key, 'public_key': public_key, 'peer_name': peer_name, 'generated_at': datetime.utcnow().isoformat() } with open(keys_file, 'w') as f: json.dump(keys_data, f, indent=2) logger.info(f"Generated keys for peer: {peer_name}") return { 'private_key': private_key, 'public_key': public_key, 'peer_name': peer_name } except Exception as e: logger.error(f"Failed to generate keys for peer {peer_name}: {e}") raise def _reload_wireguard_config(self): """Reload WireGuard configuration""" try: # This would typically involve restarting the WireGuard service # or reloading the configuration logger.info("WireGuard configuration reloaded") except Exception as e: logger.error(f"Failed to reload WireGuard configuration: {e}") def get_metrics(self) -> Dict[str, Any]: """Get WireGuard metrics""" try: traffic_stats = self._get_traffic_stats() peers = self._get_configured_peers() return { 'service': 'wireguard', 'timestamp': datetime.utcnow().isoformat(), 'status': 'online' if self._check_wireguard_status() else 'offline', 'peers_count': len(peers), 'traffic_stats': traffic_stats, 'interface_status': self._check_interface_status() } except Exception as e: return self.handle_error(e, "get_metrics") def restart_service(self) -> bool: """Restart WireGuard service""" try: # Stop WireGuard interface subprocess.run(['wg-quick', 'down', 'wg0'], capture_output=True, text=True, timeout=10) # Start WireGuard interface subprocess.run(['wg-quick', 'up', 'wg0'], capture_output=True, text=True, timeout=10) logger.info("WireGuard service restarted") return True except Exception as e: logger.error(f"Failed to restart WireGuard service: {e}") return False