Files
pic/api/routing_manager.py
T
roof 4bf583c071 fix: diagnostics tab — run ping/traceroute in cell-wireguard, fix wrong method call
The connectivity endpoint was calling routing_manager.test_connectivity()
(no args, internal health check) instead of test_routing_connectivity(target_ip).
Also ping/traceroute aren't installed in the API container; run them via
docker exec cell-wireguard instead.

Updated test_api_endpoints to mock test_routing_connectivity and cover
the new DELETE /firewall/<id> and GET /live-iptables endpoints.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 01:26:40 -04:00

1055 lines
42 KiB
Python

#!/usr/bin/env python3
"""
Routing Manager for Personal Internet Cell
Handles VPN gateway, NAT, iptables, and advanced routing
"""
import os
import json
import subprocess
import logging
import ipaddress
import time
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
import re
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
class RoutingManager(BaseServiceManager):
"""Manages VPN gateway, NAT, and routing functionality"""
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('routing', data_dir, config_dir)
self.routing_dir = os.path.join(config_dir, 'routing')
self.rules_file = os.path.join(data_dir, 'routing', 'rules.json')
# Service state tracking
self._service_running = False
self._state_file = os.path.join(data_dir, 'routing', 'service_state.json')
# Ensure directories exist
self.safe_makedirs(self.routing_dir)
self.safe_makedirs(os.path.dirname(self.rules_file))
# Initialize routing configuration
self._ensure_config_exists()
# Load service state
self._load_service_state()
def _ensure_config_exists(self):
"""Ensure routing configuration exists"""
try:
if not os.path.exists(self.rules_file):
self._initialize_rules()
except (PermissionError, OSError):
pass
def _initialize_rules(self):
"""Initialize routing rules"""
default_rules = {
'nat_rules': [],
'forwarding_rules': [],
'peer_routes': {},
'exit_nodes': [],
'bridge_routes': [],
'split_routes': [],
'firewall_rules': []
}
with open(self.rules_file, 'w') as f:
json.dump(default_rules, f, indent=2)
logger.info("Routing rules initialized")
def _load_service_state(self):
"""Load service state from file"""
try:
if os.path.exists(self._state_file):
with open(self._state_file, 'r') as f:
state = json.load(f)
self._service_running = state.get('running', False)
else:
# Default to running if no state file exists (for backward compatibility)
self._service_running = True
self._save_service_state()
except Exception as e:
logger.error(f"Failed to load service state: {e}")
self._service_running = True
def _save_service_state(self):
"""Save service state to file"""
try:
state = {
'running': self._service_running,
'timestamp': datetime.utcnow().isoformat()
}
with open(self._state_file, 'w') as f:
json.dump(state, f, indent=2)
except Exception as e:
logger.error(f"Failed to save service state: {e}")
def _validate_cidr(self, cidr):
import ipaddress
try:
ipaddress.ip_network(cidr)
return True
except Exception:
return False
def add_nat_rule(self, source_network: str, target_interface: str, masquerade: bool = True, nat_type: str = 'MASQUERADE', protocol: str = 'ALL', external_port: str = None, internal_ip: str = None, internal_port: str = None) -> bool:
"""Add NAT rule for network translation, port forwarding, or 1:1 NAT."""
# Validation
if not source_network or not self._validate_cidr(source_network):
logger.error(f"Invalid source_network: {source_network}")
return False
if not target_interface or not isinstance(target_interface, str):
logger.error(f"Invalid target_interface: {target_interface}")
return False
if nat_type not in ['MASQUERADE', 'SNAT', 'DNAT']:
logger.error(f"Invalid nat_type: {nat_type}")
return False
if protocol not in ['TCP', 'UDP', 'ALL']:
logger.error(f"Invalid protocol: {protocol}")
return False
try:
rules = self._load_rules()
nat_rule = {
'id': f"nat_{len(rules['nat_rules']) + 1}",
'source_network': source_network,
'target_interface': target_interface,
'masquerade': masquerade,
'nat_type': nat_type,
'protocol': protocol,
'external_port': external_port,
'internal_ip': internal_ip,
'internal_port': internal_port,
'enabled': True,
'created_at': datetime.now().isoformat()
}
rules['nat_rules'].append(nat_rule)
self._save_rules(rules)
self._apply_nat_rule(nat_rule)
logger.info(f"Added NAT rule for {source_network} -> {target_interface} type={nat_type}")
return True
except Exception as e:
logger.error(f"Failed to add NAT rule: {e}")
return False
def remove_nat_rule(self, rule_id: str) -> bool:
"""Remove NAT rule"""
try:
rules = self._load_rules()
# Find and remove rule
rules['nat_rules'] = [rule for rule in rules['nat_rules'] if rule['id'] != rule_id]
self._save_rules(rules)
# Remove from iptables
self._remove_nat_rule(rule_id)
logger.info(f"Removed NAT rule {rule_id}")
return True
except Exception as e:
logger.error(f"Failed to remove NAT rule: {e}")
return False
def add_peer_route(self, peer_name: str, peer_ip: str, allowed_networks: list, route_type: str = 'lan') -> bool:
"""Add routing rule for a peer"""
# Validation
if not peer_name or not isinstance(peer_name, str):
logger.error(f"Invalid peer_name: {peer_name}")
return False
if not peer_ip or not isinstance(peer_ip, str):
logger.error(f"Invalid peer_ip: {peer_ip}")
return False
if not allowed_networks or not isinstance(allowed_networks, list) or not all(self._validate_cidr(n) for n in allowed_networks):
logger.error(f"Invalid allowed_networks: {allowed_networks}")
return False
if route_type not in ['lan', 'exit', 'bridge', 'split']:
logger.error(f"Invalid route_type: {route_type}")
return False
try:
rules = self._load_rules()
peer_route = {
'peer_name': peer_name,
'peer_ip': peer_ip,
'allowed_networks': allowed_networks,
'route_type': route_type,
'enabled': True,
'created_at': datetime.now().isoformat()
}
rules['peer_routes'][peer_name] = peer_route
self._save_rules(rules)
self._apply_peer_route(peer_route)
logger.info(f"Added peer route for {peer_name}")
return True
except Exception as e:
logger.error(f"Failed to add peer route: {e}")
return False
def remove_peer_route(self, peer_name: str) -> bool:
"""Remove routing rule for a peer"""
try:
rules = self._load_rules()
if peer_name in rules['peer_routes']:
del rules['peer_routes'][peer_name]
self._save_rules(rules)
# Remove from routing table
self._remove_peer_route(peer_name)
logger.info(f"Removed peer route for {peer_name}")
return True
return False
except Exception as e:
logger.error(f"Failed to remove peer route: {e}")
return False
def add_exit_node(self, peer_name: str, peer_ip: str, allowed_domains: List[str] = None) -> bool:
"""Add exit node configuration"""
try:
rules = self._load_rules()
exit_node = {
'peer_name': peer_name,
'peer_ip': peer_ip,
'allowed_domains': allowed_domains or [],
'enabled': True,
'created_at': datetime.now().isoformat()
}
rules['exit_nodes'].append(exit_node)
self._save_rules(rules)
# Apply exit node rules
self._apply_exit_node(exit_node)
logger.info(f"Added exit node {peer_name}")
return True
except Exception as e:
logger.error(f"Failed to add exit node: {e}")
return False
def add_bridge_route(self, source_peer: str, target_peer: str,
allowed_networks: List[str]) -> bool:
"""Add bridge route between peers"""
try:
rules = self._load_rules()
bridge_route = {
'id': f"bridge_{len(rules['bridge_routes']) + 1}",
'source_peer': source_peer,
'target_peer': target_peer,
'allowed_networks': allowed_networks,
'enabled': True,
'created_at': datetime.now().isoformat()
}
rules['bridge_routes'].append(bridge_route)
self._save_rules(rules)
# Apply bridge route
self._apply_bridge_route(bridge_route)
logger.info(f"Added bridge route {source_peer} -> {target_peer}")
return True
except Exception as e:
logger.error(f"Failed to add bridge route: {e}")
return False
def add_split_route(self, network: str, exit_peer: str,
fallback_peer: str = None) -> bool:
"""Add split routing rule"""
try:
rules = self._load_rules()
split_route = {
'id': f"split_{len(rules['split_routes']) + 1}",
'network': network,
'exit_peer': exit_peer,
'fallback_peer': fallback_peer,
'enabled': True,
'created_at': datetime.now().isoformat()
}
rules['split_routes'].append(split_route)
self._save_rules(rules)
# Apply split route
self._apply_split_route(split_route)
logger.info(f"Added split route for {network}")
return True
except Exception as e:
logger.error(f"Failed to add split route: {e}")
return False
def add_firewall_rule(self, rule_type: str, source: str, destination: str, action: str = 'ACCEPT', port: str = None, protocol: str = 'ALL', port_range: str = None) -> bool:
"""Add firewall rule with protocol and port range support."""
# Validation
if rule_type not in ['INPUT', 'OUTPUT', 'FORWARD']:
logger.error(f"Invalid rule_type: {rule_type}")
return False
if not source or not self._validate_cidr(source):
logger.error(f"Invalid source: {source}")
return False
if not destination or not self._validate_cidr(destination):
logger.error(f"Invalid destination: {destination}")
return False
if action not in ['ACCEPT', 'DROP', 'REJECT']:
logger.error(f"Invalid action: {action}")
return False
if protocol not in ['TCP', 'UDP', 'ICMP', 'ALL']:
logger.error(f"Invalid protocol: {protocol}")
return False
if port is not None and port != '':
try:
port_num = int(port)
if not (0 < port_num < 65536):
logger.error(f"Invalid port: {port}")
return False
except Exception:
logger.error(f"Invalid port: {port}")
return False
if port_range is not None and port_range != '':
# Validate port range format (e.g., 1000-2000)
if not re.match(r'^\d{1,5}-\d{1,5}$', port_range):
logger.error(f"Invalid port_range: {port_range}")
return False
try:
rules = self._load_rules()
firewall_rule = {
'id': f"fw_{len(rules['firewall_rules']) + 1}",
'rule_type': rule_type,
'source': source,
'destination': destination,
'action': action,
'port': port,
'protocol': protocol,
'port_range': port_range,
'enabled': True,
'created_at': datetime.now().isoformat()
}
rules['firewall_rules'].append(firewall_rule)
self._save_rules(rules)
self._apply_firewall_rule(firewall_rule)
logger.info(f"Added firewall rule {rule_type} {source} -> {destination} proto={protocol}")
return True
except Exception as e:
logger.error(f"Failed to add firewall rule: {e}")
return False
def get_routing_status(self) -> Dict:
"""Get routing and gateway status"""
try:
rules = self._load_rules()
# Get iptables status
nat_rules_count = len([r for r in rules['nat_rules'] if r['enabled']])
firewall_rules_count = len([r for r in rules['firewall_rules'] if r['enabled']])
peer_routes_count = len([r for r in rules['peer_routes'].values() if r['enabled']])
exit_nodes_count = len([r for r in rules['exit_nodes'] if r['enabled']])
# Get routing table info
routing_table = self._get_routing_table()
return {
'nat_rules_count': nat_rules_count,
'firewall_rules_count': firewall_rules_count,
'peer_routes_count': peer_routes_count,
'exit_nodes_count': exit_nodes_count,
'bridge_routes_count': len(rules['bridge_routes']),
'split_routes_count': len(rules['split_routes']),
'routing_table': routing_table,
'active_rules': rules
}
except Exception as e:
logger.error(f"Failed to get routing status: {e}")
return {
'nat_rules_count': 0,
'firewall_rules_count': 0,
'peer_routes_count': 0,
'exit_nodes_count': 0,
'bridge_routes_count': 0,
'split_routes_count': 0,
'routing_table': [],
'active_rules': {}
}
def test_routing_connectivity(self, target_ip: str, via_peer: str = None) -> Dict:
"""Test routing connectivity by running ping/traceroute in cell-wireguard."""
WG = 'cell-wireguard'
def _exec(cmd):
result = subprocess.run(
['docker', 'exec', WG] + cmd,
capture_output=True, text=True, timeout=35
)
return {
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr,
}
results = {}
try:
results['ping'] = _exec(['ping', '-c', '4', '-W', '3', target_ip])
except Exception as e:
results['ping'] = {'success': False, 'output': '', 'error': str(e)}
try:
results['traceroute'] = _exec(['traceroute', '-m', '10', '-w', '2', target_ip])
except Exception as e:
results['traceroute'] = {'success': False, 'output': '', 'error': str(e)}
if via_peer:
try:
results['peer_route'] = _exec(['ping', '-c', '3', '-W', '3', '-I', via_peer, target_ip])
except Exception as e:
results['peer_route'] = {'success': False, 'output': '', 'error': str(e)}
return results
def get_routing_logs(self, lines: int = 50) -> Dict:
"""Get routing and firewall logs"""
try:
logs = {}
# Get iptables logs
try:
result = subprocess.run(['dmesg', '|', 'grep', 'iptables'],
capture_output=True, text=True, timeout=10)
logs['iptables'] = result.stdout
except Exception as e:
logs['iptables'] = f"Error getting iptables logs: {e}"
# Get routing logs
try:
result = subprocess.run(['dmesg', '|', 'grep', 'routing'],
capture_output=True, text=True, timeout=10)
logs['routing'] = result.stdout
except Exception as e:
logs['routing'] = f"Error getting routing logs: {e}"
# Get network interface logs
try:
result = subprocess.run(['ip', 'route', 'show'],
capture_output=True, text=True, timeout=10)
logs['routes'] = result.stdout
except Exception as e:
logs['routes'] = f"Error getting route table: {e}"
return logs
except Exception as e:
logger.error(f"Failed to get routing logs: {e}")
return {'error': str(e)}
def remove_firewall_rule(self, rule_id: str) -> bool:
"""Remove a stored firewall rule and delete it from iptables."""
try:
rules = self._load_rules()
rule = next((r for r in rules['firewall_rules'] if r['id'] == rule_id), None)
if not rule:
return False
rules['firewall_rules'] = [r for r in rules['firewall_rules'] if r['id'] != rule_id]
self._save_rules(rules)
try:
cmd = ['iptables', '-D', rule['rule_type'],
'-s', rule['source'], '-d', rule['destination']]
if rule.get('protocol') and rule['protocol'] != 'ALL':
cmd += ['-p', rule['protocol'].lower()]
if rule.get('port'):
cmd += ['--dport', str(rule['port'])]
if rule.get('port_range'):
cmd += ['--dport', rule['port_range'].replace('-', ':')]
cmd += ['-j', rule['action']]
subprocess.run(cmd, capture_output=True, timeout=10)
except Exception as e:
logger.warning(f"iptables -D failed (rule may already be gone): {e}")
logger.info(f"Removed firewall rule {rule_id}")
return True
except Exception as e:
logger.error(f"Failed to remove firewall rule: {e}")
return False
def get_live_iptables(self) -> dict:
"""Return live iptables rules from the WireGuard container."""
out = {}
for table in ('filter', 'nat'):
try:
r = subprocess.run(
['docker', 'exec', 'cell-wireguard',
'iptables', '-t', table, '-L', '-n', '-v', '--line-numbers'],
capture_output=True, text=True, timeout=10
)
out[table] = r.stdout if r.returncode == 0 else r.stderr
except Exception as e:
out[table] = str(e)
return out
def get_nat_rules(self):
"""Return all NAT rules."""
rules = self._load_rules()
return rules.get('nat_rules', [])
def get_peer_routes(self):
"""Return all peer routes as a list."""
rules = self._load_rules()
# peer_routes is a dict keyed by peer_name
return list(rules.get('peer_routes', {}).values())
def get_firewall_rules(self):
"""Return all firewall rules."""
rules = self._load_rules()
return rules.get('firewall_rules', [])
def update_peer_ip(self, peer_name: str, new_ip: str) -> bool:
"""Update peer IP in all routes and re-apply them."""
try:
rules = self._load_rules()
updated = False
if 'peer_routes' in rules and peer_name in rules['peer_routes']:
rules['peer_routes'][peer_name]['peer_ip'] = new_ip
self._save_rules(rules)
self._apply_peer_route(rules['peer_routes'][peer_name])
updated = True
# Optionally update exit_nodes, bridge_routes, split_routes if needed
return updated
except Exception as e:
logger.error(f"Failed to update peer IP in routing: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""Get routing service status"""
try:
routing_status = self.get_routing_status()
rules = self._load_rules()
# Check if routing service is actually running by testing basic functionality
is_running = self._is_routing_service_running()
status = {
'running': is_running,
'status': 'online' if is_running else 'offline',
'routing_status': routing_status,
'nat_rules_count': len(rules.get('nat_rules', [])),
'peer_routes_count': len(rules.get('peer_routes', {})),
'exit_nodes_count': len(rules.get('exit_nodes', [])),
'firewall_rules_count': len(rules.get('firewall_rules', [])),
'timestamp': datetime.utcnow().isoformat()
}
return status
except Exception as e:
return self.handle_error(e, "get_status")
def test_connectivity(self) -> Dict[str, Any]:
"""Test routing service connectivity"""
try:
# Test basic routing functionality
routing_test = self._test_routing_functionality()
# Test iptables access
iptables_test = self._test_iptables_access()
# Test network interfaces
interfaces_test = self._test_network_interfaces()
# Test routing table access
routing_table_test = self._test_routing_table_access()
results = {
'routing_functionality': routing_test,
'iptables_access': iptables_test,
'network_interfaces': interfaces_test,
'routing_table_access': routing_table_test,
'success': routing_test.get('success', False) and iptables_test.get('success', False),
'timestamp': datetime.utcnow().isoformat()
}
return results
except Exception as e:
return self.handle_error(e, "test_connectivity")
def _test_routing_functionality(self) -> Dict[str, Any]:
"""Test basic routing functionality"""
try:
# Test if we can read routing rules
rules = self._load_rules()
# Test if we can access routing status
routing_status = self.get_routing_status()
return {
'success': True,
'message': 'Routing functionality working',
'rules_loaded': bool(rules),
'status_accessible': bool(routing_status)
}
except Exception as e:
return {
'success': False,
'message': f'Routing functionality test failed: {str(e)}',
'error': str(e)
}
def _test_iptables_access(self) -> Dict[str, Any]:
"""Test iptables access"""
try:
# Test if we can list iptables rules
result = subprocess.run(['iptables', '-L', '-n'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
return {
'success': True,
'message': 'iptables access working',
'rules_count': len([line for line in result.stdout.split('\n') if line.strip()])
}
else:
return {
'success': False,
'message': f'iptables access failed: {result.stderr}',
'error': result.stderr
}
except FileNotFoundError:
# System tools not available (development environment)
return {
'success': True,
'message': 'iptables not available (development mode)',
'rules_count': 0
}
except Exception as e:
return {
'success': False,
'message': f'iptables access test failed: {str(e)}',
'error': str(e)
}
def _test_network_interfaces(self) -> Dict[str, Any]:
"""Test network interfaces access"""
try:
# Test if we can list network interfaces
result = subprocess.run(['ip', 'link', 'show'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
interfaces = [line.strip() for line in result.stdout.split('\n') if line.strip()]
return {
'success': True,
'message': 'Network interfaces accessible',
'interfaces_count': len(interfaces)
}
else:
return {
'success': False,
'message': f'Network interfaces access failed: {result.stderr}',
'error': result.stderr
}
except FileNotFoundError:
# System tools not available (development environment)
return {
'success': True,
'message': 'Network tools not available (development mode)',
'interfaces_count': 0
}
except Exception as e:
return {
'success': False,
'message': f'Network interfaces test failed: {str(e)}',
'error': str(e)
}
def _test_routing_table_access(self) -> Dict[str, Any]:
"""Test routing table access"""
try:
# Test if we can read routing table
result = subprocess.run(['ip', 'route', 'show'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
routes = [line.strip() for line in result.stdout.split('\n') if line.strip()]
return {
'success': True,
'message': 'Routing table accessible',
'routes_count': len(routes)
}
else:
return {
'success': False,
'message': f'Routing table access failed: {result.stderr}',
'error': result.stderr
}
except FileNotFoundError:
# System tools not available (development environment)
return {
'success': True,
'message': 'Routing tools not available (development mode)',
'routes_count': 0
}
except Exception as e:
return {
'success': False,
'message': f'Routing table test failed: {str(e)}',
'error': str(e)
}
def _load_rules(self) -> Dict:
"""Load routing rules from file"""
try:
with open(self.rules_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load routing rules: {e}")
return {}
def _save_rules(self, rules: Dict):
"""Save routing rules to file"""
try:
with open(self.rules_file, 'w') as f:
json.dump(rules, f, indent=2)
except Exception as e:
logger.error(f"Failed to save routing rules: {e}")
def _apply_nat_rule(self, rule: Dict):
"""Apply NAT rule to iptables, supporting MASQUERADE, SNAT, DNAT, and port forwarding."""
try:
if rule.get('nat_type', 'MASQUERADE') == 'MASQUERADE' and rule['masquerade']:
cmd = [
'iptables', '-t', 'nat', '-A', 'POSTROUTING',
'-s', rule['source_network'],
'-o', rule['target_interface'],
'-j', 'MASQUERADE'
]
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Applied MASQUERADE NAT rule: {rule['source_network']} -> {rule['target_interface']}")
elif rule.get('nat_type') == 'DNAT' and rule['internal_ip']:
# Port forwarding (DNAT)
cmd = [
'iptables', '-t', 'nat', '-A', 'PREROUTING',
'-d', rule['source_network'],
]
if rule.get('protocol') and rule['protocol'] != 'ALL':
cmd += ['-p', rule['protocol'].lower()]
if rule.get('external_port'):
cmd += ['--dport', str(rule['external_port'])]
cmd += ['-j', 'DNAT', '--to-destination', f"{rule['internal_ip']}{':' + str(rule['internal_port']) if rule.get('internal_port') else ''}"]
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Applied DNAT rule: {rule['source_network']}:{rule.get('external_port')} -> {rule['internal_ip']}:{rule.get('internal_port')}")
elif rule.get('nat_type') == 'SNAT' and rule['internal_ip']:
# 1:1 NAT (SNAT)
cmd = [
'iptables', '-t', 'nat', '-A', 'POSTROUTING',
'-s', rule['internal_ip'],
]
if rule.get('protocol') and rule['protocol'] != 'ALL':
cmd += ['-p', rule['protocol'].lower()]
if rule.get('internal_port'):
cmd += ['--sport', str(rule['internal_port'])]
cmd += ['-j', 'SNAT', '--to-source', rule['source_network']]
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Applied SNAT rule: {rule['internal_ip']} -> {rule['source_network']}")
except Exception as e:
logger.error(f"Failed to apply NAT rule: {e}")
def _remove_nat_rule(self, rule_id: str):
"""Remove NAT rule from iptables"""
try:
# This is a simplified removal - in practice you'd need to track the exact rule
cmd = ['iptables', '-t', 'nat', '-F', 'POSTROUTING']
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Removed NAT rule: {rule_id}")
except Exception as e:
logger.error(f"Failed to remove NAT rule: {e}")
def _apply_peer_route(self, route: Dict):
"""Apply peer routing rule"""
try:
# Add route for peer networks
for network in route['allowed_networks']:
cmd = [
'ip', 'route', 'add', network,
'via', route['peer_ip'],
'dev', 'wg0'
]
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Applied peer route for {route['peer_name']}")
except Exception as e:
logger.error(f"Failed to apply peer route: {e}")
def _remove_peer_route(self, peer_name: str):
"""Remove peer routing rule"""
try:
# Remove routes for this peer
cmd = ['ip', 'route', 'del', 'via', peer_name, 'dev', 'wg0']
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Removed peer route for {peer_name}")
except Exception as e:
logger.error(f"Failed to remove peer route: {e}")
def _apply_exit_node(self, exit_node: Dict):
"""Apply exit node configuration"""
try:
# Add default route through exit node
cmd = [
'ip', 'route', 'add', 'default',
'via', exit_node['peer_ip'],
'dev', 'wg0'
]
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Applied exit node {exit_node['peer_name']}")
except Exception as e:
logger.error(f"Failed to apply exit node: {e}")
def _apply_bridge_route(self, route: Dict):
"""Apply bridge routing rule"""
try:
# Add forwarding rules for bridge
for network in route['allowed_networks']:
cmd = [
'iptables', '-A', 'FORWARD',
'-s', network,
'-d', route['target_peer'],
'-j', 'ACCEPT'
]
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Applied bridge route {route['source_peer']} -> {route['target_peer']}")
except Exception as e:
logger.error(f"Failed to apply bridge route: {e}")
def _apply_split_route(self, route: Dict):
"""Apply split routing rule"""
try:
# Add specific route for network
cmd = [
'ip', 'route', 'add', route['network'],
'via', route['exit_peer'],
'dev', 'wg0'
]
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Applied split route for {route['network']}")
except Exception as e:
logger.error(f"Failed to apply split route: {e}")
def _apply_firewall_rule(self, rule: Dict):
"""Apply firewall rule with protocol and port range support."""
try:
cmd = [
'iptables', '-A', rule['rule_type'],
'-s', rule['source'],
'-d', rule['destination']
]
if rule.get('protocol') and rule['protocol'] != 'ALL':
cmd += ['-p', rule['protocol'].lower()]
if rule.get('port'):
cmd += ['--dport', str(rule['port'])]
if rule.get('port_range'):
cmd += ['--dport', rule['port_range'].replace('-', ':')]
cmd += ['-j', rule['action']]
subprocess.run(cmd, check=True, timeout=10)
logger.info(f"Applied firewall rule {rule['rule_type']} proto={rule.get('protocol')} port={rule.get('port') or rule.get('port_range')}")
except Exception as e:
logger.error(f"Failed to apply firewall rule: {e}")
def _get_routing_table(self) -> List[Dict]:
"""Get host routing table from /proc/1/net/route (host PID namespace)."""
try:
return self._parse_proc_net_route('/proc/1/net/route')
except Exception:
pass
# Fallback: WireGuard container routing table
try:
result = subprocess.run(
['docker', 'exec', 'cell-wireguard', 'ip', 'route', 'show'],
capture_output=True, text=True, timeout=10,
)
if result.returncode == 0:
routes = []
for line in result.stdout.strip().split('\n'):
if line.strip():
routes.append({'route': line.strip(), 'parsed': self._parse_route(line.strip())})
return routes
except Exception as e:
logger.error(f"Failed to get routing table: {e}")
return []
def _parse_proc_net_route(self, path: str) -> List[Dict]:
"""Parse /proc/net/route hex table into human-readable routes."""
import socket, struct
routes = []
with open(path) as f:
lines = f.readlines()[1:] # skip header
for line in lines:
parts = line.strip().split()
if len(parts) < 8:
continue
iface, dest_hex, gw_hex, mask_hex = parts[0], parts[1], parts[2], parts[7]
def hex_to_ip(h):
return socket.inet_ntoa(struct.pack('<I', int(h, 16)))
dest = hex_to_ip(dest_hex)
gw = hex_to_ip(gw_hex)
mask = hex_to_ip(mask_hex)
prefix = bin(struct.unpack('>I', socket.inet_aton(mask))[0]).count('1')
if dest == '0.0.0.0' and mask == '0.0.0.0':
dest_str = 'default'
route_str = f'default via {gw} dev {iface}'
else:
dest_str = f'{dest}/{prefix}'
route_str = f'{dest}/{prefix} dev {iface}' + (f' via {gw}' if gw != '0.0.0.0' else '')
routes.append({
'route': route_str,
'parsed': {'destination': dest_str, 'via': gw if gw != '0.0.0.0' else '', 'dev': iface, 'metric': ''},
})
return routes
def _parse_route(self, route_line: str) -> Dict:
"""Parse route line into components"""
try:
# Simple route parsing - can be enhanced
parts = route_line.split()
parsed = {
'destination': parts[0] if parts else '',
'via': '',
'dev': '',
'metric': ''
}
for i, part in enumerate(parts):
if part == 'via' and i + 1 < len(parts):
parsed['via'] = parts[i + 1]
elif part == 'dev' and i + 1 < len(parts):
parsed['dev'] = parts[i + 1]
elif part == 'metric' and i + 1 < len(parts):
parsed['metric'] = parts[i + 1]
return parsed
except Exception as e:
logger.error(f"Failed to parse route: {e}")
return {'destination': route_line, 'via': '', 'dev': '', 'metric': ''}
def _is_routing_service_running(self) -> bool:
"""Check if routing service is actually running"""
# Use internal state tracking instead of system tool checks
return self._service_running
def start(self) -> bool:
"""Start routing service"""
try:
# Set internal state to running
self._service_running = True
self._save_service_state()
# Try to enable IP forwarding (may fail in Docker without privileges)
try:
subprocess.run(['sysctl', '-w', 'net.ipv4.ip_forward=1'],
check=True, timeout=10)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.warning(f"Could not enable IP forwarding: {e}")
# Continue anyway - service is considered started
# Load existing rules
rules = self._load_rules()
# Apply all enabled rules (may fail in Docker without privileges)
try:
for rule in rules.get('nat_rules', []):
if rule.get('enabled', True):
self._apply_nat_rule(rule)
for rule in rules.get('firewall_rules', []):
if rule.get('enabled', True):
self._apply_firewall_rule(rule)
for route in rules.get('peer_routes', {}).values():
if route.get('enabled', True):
self._apply_peer_route(route)
for exit_node in rules.get('exit_nodes', []):
if exit_node.get('enabled', True):
self._apply_exit_node(exit_node)
except Exception as e:
logger.warning(f"Could not apply routing rules: {e}")
# Continue anyway - service is considered started
logger.info("Routing service started successfully")
return True
except Exception as e:
logger.error(f"Failed to start routing service: {e}")
self._service_running = False
self._save_service_state()
return False
def stop(self) -> bool:
"""Stop routing service"""
try:
# Set internal state to stopped
self._service_running = False
self._save_service_state()
# Try to clear all iptables rules (may fail in Docker without privileges)
try:
subprocess.run(['iptables', '-t', 'nat', '-F'],
check=True, timeout=10)
subprocess.run(['iptables', '-F'],
check=True, timeout=10)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.warning(f"Could not clear iptables rules: {e}")
# Continue anyway - service is considered stopped
# Try to disable IP forwarding (may fail in Docker without privileges)
try:
subprocess.run(['sysctl', '-w', 'net.ipv4.ip_forward=0'],
check=True, timeout=10)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.warning(f"Could not disable IP forwarding: {e}")
# Continue anyway - service is considered stopped
logger.info("Routing service stopped successfully")
return True
except Exception as e:
logger.error(f"Failed to stop routing service: {e}")
# Even if system commands fail, we consider the service stopped
self._service_running = False
self._save_service_state()
return True # Return True because the state is now stopped
def restart(self) -> bool:
"""Restart routing service"""
try:
self.stop()
time.sleep(1) # Brief pause
return self.start()
except Exception as e:
logger.error(f"Failed to restart routing service: {e}")
return False