Files
pic/tests/test_routing_manager.py
T
roof aa1e5c41ec
Unit Tests / test (push) Successful in 12m6s
test: raise coverage 68.7% -> ~80.4%; add ~250 tests for new egress/DDNS/network paths
Coverage was below acceptable levels and several newly-added code paths
(sshuttle egress, proxy egress, DDNS provider stubs, DNS overview route,
peer-registry provisioning) had zero test coverage.

~250 new unit tests are added across 16 new test files. Existing test files
are updated to match refactored interfaces (DHCP removed, constants
introduced, network_manager restructured). .coveragerc is added to pin the
source mapping and the 70% floor so regressions are caught at commit time.

tests/test_enhanced_api.py was previously living in api/ (wrong location)
and is moved to tests/ where it belongs.

Integration test files are updated to remove references to DHCP endpoints
and add coverage for the new DNS overview and DDNS sync endpoints.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 09:03:39 -04:00

925 lines
41 KiB
Python

import sys
import subprocess
from pathlib import Path
# Add api directory to path
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
import unittest
import tempfile
import shutil
import os
from unittest.mock import patch, MagicMock, call
from routing_manager import RoutingManager
import json
class TestRoutingManager(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.data_dir = os.path.join(self.test_dir, 'data')
self.config_dir = os.path.join(self.test_dir, 'config')
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.config_dir, exist_ok=True)
self.manager = RoutingManager(data_dir=self.data_dir, config_dir=self.config_dir)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_initialization(self):
# Test RoutingManager initialization and config creation
self.assertTrue(os.path.exists(self.manager.routing_dir))
self.assertTrue(os.path.exists(self.manager.rules_file))
# Check that rules file contains default structure
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertIn('nat_rules', rules)
self.assertIn('peer_routes', rules)
self.assertIn('exit_nodes', rules)
self.assertIn('bridge_routes', rules)
self.assertIn('split_routes', rules)
self.assertIn('firewall_rules', rules)
self.assertIsInstance(rules['nat_rules'], list)
self.assertIsInstance(rules['peer_routes'], dict)
self.assertIsInstance(rules['exit_nodes'], list)
self.assertIsInstance(rules['bridge_routes'], list)
self.assertIsInstance(rules['split_routes'], list)
self.assertIsInstance(rules['firewall_rules'], list)
@patch.object(RoutingManager, '_apply_nat_rule', return_value=True)
@patch.object(RoutingManager, '_remove_nat_rule', return_value=True)
def test_add_and_remove_nat_rule(self, mock_remove_nat, mock_apply_nat):
# Add a valid NAT rule
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0')
self.assertTrue(result)
# Check that the rule is persisted
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(len(rules['nat_rules']), 1)
rule = rules['nat_rules'][0]
self.assertEqual(rule['source_network'], '10.0.0.0/24')
self.assertEqual(rule['target_interface'], 'eth0')
self.assertEqual(rule['nat_type'], 'MASQUERADE')
self.assertTrue(rule['enabled'])
# Remove the NAT rule
rule_id = rule['id']
result = self.manager.remove_nat_rule(rule_id)
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(len(rules['nat_rules']), 0)
# Test invalid NAT rule (bad CIDR)
result = self.manager.add_nat_rule('bad-cidr', 'eth0')
self.assertFalse(result)
# Test invalid NAT rule (bad interface)
result = self.manager.add_nat_rule('10.0.0.0/24', '')
self.assertFalse(result)
# Test invalid NAT rule (bad nat_type)
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', nat_type='INVALID')
self.assertFalse(result)
# Test invalid NAT rule (bad protocol)
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', protocol='INVALID')
self.assertFalse(result)
@patch.object(RoutingManager, '_apply_peer_route', return_value=True)
@patch.object(RoutingManager, '_remove_peer_route', return_value=True)
def test_add_and_remove_peer_route(self, mock_remove_peer, mock_apply_peer):
# Add a valid peer route
allowed_networks = ['10.0.0.0/24']
result = self.manager.add_peer_route('peer1', '10.0.0.2', allowed_networks)
self.assertTrue(result)
# Check that the route is persisted
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertIn('peer1', rules['peer_routes'])
route = rules['peer_routes']['peer1']
self.assertEqual(route['peer_name'], 'peer1')
self.assertEqual(route['peer_ip'], '10.0.0.2')
self.assertEqual(route['allowed_networks'], allowed_networks)
self.assertEqual(route['route_type'], 'lan')
self.assertTrue(route['enabled'])
# Remove the peer route
result = self.manager.remove_peer_route('peer1')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertNotIn('peer1', rules['peer_routes'])
# Test invalid peer route (bad peer_name)
result = self.manager.add_peer_route('', '10.0.0.2', allowed_networks)
self.assertFalse(result)
# Test invalid peer route (bad peer_ip)
result = self.manager.add_peer_route('peer2', '', allowed_networks)
self.assertFalse(result)
# Test invalid peer route (bad allowed_networks)
result = self.manager.add_peer_route('peer3', '10.0.0.3', ['bad-cidr'])
self.assertFalse(result)
# Test invalid peer route (bad route_type)
result = self.manager.add_peer_route('peer4', '10.0.0.4', allowed_networks, route_type='invalid')
self.assertFalse(result)
@patch.object(RoutingManager, '_apply_exit_node')
def test_add_exit_node_valid(self, mock_apply):
result = self.manager.add_exit_node('peer1', '10.0.0.2')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(len(rules['exit_nodes']), 1)
node = rules['exit_nodes'][0]
self.assertEqual(node['peer_name'], 'peer1')
self.assertEqual(node['peer_ip'], '10.0.0.2')
self.assertTrue(node['enabled'])
mock_apply.assert_called_once()
@patch.object(RoutingManager, '_apply_exit_node')
def test_add_exit_node_with_allowed_domains(self, mock_apply):
result = self.manager.add_exit_node('peer1', '10.0.0.2', allowed_domains=['example.com'])
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(rules['exit_nodes'][0]['allowed_domains'], ['example.com'])
def test_add_exit_node_invalid_peer_name(self):
result = self.manager.add_exit_node('bad name!', '10.0.0.2')
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
def test_add_exit_node_invalid_ip(self):
result = self.manager.add_exit_node('peer1', 'not-an-ip')
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
def test_add_exit_node_invalid_domains(self):
result = self.manager.add_exit_node('peer1', '10.0.0.2', allowed_domains='not-a-list')
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
def test_add_exit_node_invalid_domain_format(self):
result = self.manager.add_exit_node('peer1', '10.0.0.2', allowed_domains=['bad domain!'])
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
@patch.object(RoutingManager, '_apply_bridge_route')
def test_add_bridge_route_valid(self, mock_apply):
result = self.manager.add_bridge_route('src-peer', '192.168.1.0/24', ['10.0.0.0/24'])
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(len(rules['bridge_routes']), 1)
route = rules['bridge_routes'][0]
self.assertEqual(route['source_peer'], 'src-peer')
self.assertEqual(route['target_peer'], '192.168.1.0/24')
mock_apply.assert_called_once()
def test_add_bridge_route_invalid_source(self):
result = self.manager.add_bridge_route('bad name!', '192.168.1.0/24', ['10.0.0.0/24'])
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
def test_add_bridge_route_invalid_target(self):
result = self.manager.add_bridge_route('src-peer', 'not-a-network', ['10.0.0.0/24'])
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
def test_add_bridge_route_empty_allowed_networks(self):
result = self.manager.add_bridge_route('src-peer', '192.168.1.0/24', [])
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
def test_add_bridge_route_invalid_network_in_list(self):
result = self.manager.add_bridge_route('src-peer', '192.168.1.0/24', ['not-a-cidr'])
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
@patch.object(RoutingManager, '_apply_split_route')
def test_add_split_route_valid(self, mock_apply):
result = self.manager.add_split_route('10.0.0.0/24', '10.0.0.1')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(len(rules['split_routes']), 1)
route = rules['split_routes'][0]
self.assertEqual(route['network'], '10.0.0.0/24')
self.assertEqual(route['exit_peer'], '10.0.0.1')
mock_apply.assert_called_once()
@patch.object(RoutingManager, '_apply_split_route')
def test_add_split_route_with_fallback(self, mock_apply):
result = self.manager.add_split_route('10.0.0.0/24', '10.0.0.1', fallback_peer='10.0.0.2')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(rules['split_routes'][0]['fallback_peer'], '10.0.0.2')
def test_add_split_route_invalid_network(self):
result = self.manager.add_split_route('not-a-cidr', '10.0.0.1')
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
def test_add_split_route_invalid_exit_peer(self):
result = self.manager.add_split_route('10.0.0.0/24', 'not-an-ip')
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
def test_add_split_route_invalid_fallback(self):
result = self.manager.add_split_route('10.0.0.0/24', '10.0.0.1', fallback_peer='not-ip')
self.assertIsInstance(result, dict)
self.assertFalse(result.get('success', True))
@patch.object(RoutingManager, '_apply_firewall_rule')
def test_add_firewall_rule_valid(self, mock_apply):
result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(len(rules['firewall_rules']), 1)
rule = rules['firewall_rules'][0]
self.assertEqual(rule['rule_type'], 'FORWARD')
self.assertEqual(rule['source'], '10.0.0.0/24')
self.assertEqual(rule['destination'], '192.168.1.0/24')
self.assertEqual(rule['action'], 'ACCEPT')
mock_apply.assert_called_once()
@patch.object(RoutingManager, '_apply_firewall_rule')
def test_add_firewall_rule_with_port(self, mock_apply):
result = self.manager.add_firewall_rule(
'INPUT', '10.0.0.0/24', '192.168.1.0/24', protocol='TCP', port='80')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
rule = rules['firewall_rules'][0]
self.assertEqual(rule['protocol'], 'TCP')
self.assertEqual(rule['port'], '80')
@patch.object(RoutingManager, '_apply_firewall_rule')
def test_add_firewall_rule_with_port_range(self, mock_apply):
result = self.manager.add_firewall_rule(
'INPUT', '10.0.0.0/24', '192.168.1.0/24', port_range='1000-2000')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(rules['firewall_rules'][0]['port_range'], '1000-2000')
def test_add_firewall_rule_invalid_type(self):
result = self.manager.add_firewall_rule('BADCHAIN', '10.0.0.0/24', '192.168.1.0/24')
self.assertFalse(result)
def test_add_firewall_rule_invalid_source(self):
result = self.manager.add_firewall_rule('FORWARD', 'not-cidr', '192.168.1.0/24')
self.assertFalse(result)
def test_add_firewall_rule_invalid_destination(self):
result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', 'not-cidr')
self.assertFalse(result)
def test_add_firewall_rule_invalid_action(self):
result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24', action='JUMP')
self.assertFalse(result)
def test_add_firewall_rule_invalid_protocol(self):
result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24', protocol='SCTP')
self.assertFalse(result)
def test_add_firewall_rule_invalid_port_value(self):
result = self.manager.add_firewall_rule(
'FORWARD', '10.0.0.0/24', '192.168.1.0/24', port='99999')
self.assertFalse(result)
def test_add_firewall_rule_port_not_number(self):
result = self.manager.add_firewall_rule(
'FORWARD', '10.0.0.0/24', '192.168.1.0/24', port='abc')
self.assertFalse(result)
def test_add_firewall_rule_invalid_port_range_format(self):
result = self.manager.add_firewall_rule(
'FORWARD', '10.0.0.0/24', '192.168.1.0/24', port_range='abc-def')
self.assertFalse(result)
@patch.object(RoutingManager, '_apply_firewall_rule')
@patch('subprocess.run')
def test_remove_firewall_rule(self, mock_sub, mock_apply):
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_sub.return_value = mock_proc
self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24')
with open(self.manager.rules_file) as f:
rules = json.load(f)
rule_id = rules['firewall_rules'][0]['id']
result = self.manager.remove_firewall_rule(rule_id)
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(len(rules['firewall_rules']), 0)
@patch.object(RoutingManager, '_apply_firewall_rule')
def test_remove_firewall_rule_not_found(self, mock_apply):
result = self.manager.remove_firewall_rule('nonexistent_id')
self.assertFalse(result)
@patch.object(RoutingManager, '_apply_firewall_rule')
@patch('subprocess.run')
def test_remove_firewall_rule_with_tcp_port(self, mock_sub, mock_apply):
"""remove_firewall_rule builds correct iptables -D cmd for TCP+port rule."""
mock_sub.return_value = MagicMock(returncode=0)
self.manager.add_firewall_rule(
'INPUT', '10.0.0.0/24', '192.168.1.0/24', protocol='TCP', port='443')
with open(self.manager.rules_file) as f:
rules = json.load(f)
rule_id = rules['firewall_rules'][0]['id']
result = self.manager.remove_firewall_rule(rule_id)
self.assertTrue(result)
# Check subprocess was called with iptables -D
call_args = mock_sub.call_args[0][0]
self.assertIn('iptables', call_args)
self.assertIn('-D', call_args)
def test_get_routing_status_empty(self):
status = self.manager.get_routing_status()
self.assertIn('nat_rules_count', status)
self.assertEqual(status['nat_rules_count'], 0)
self.assertIn('firewall_rules_count', status)
self.assertIn('peer_routes_count', status)
self.assertIn('exit_nodes_count', status)
self.assertIn('bridge_routes_count', status)
self.assertIn('split_routes_count', status)
self.assertIn('routing_table', status)
self.assertIn('active_rules', status)
@patch.object(RoutingManager, '_apply_nat_rule')
@patch.object(RoutingManager, '_apply_firewall_rule')
@patch.object(RoutingManager, '_apply_peer_route')
def test_get_routing_status_counts(self, mock_peer, mock_fw, mock_nat):
self.manager.add_nat_rule('10.0.0.0/24', 'eth0')
self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24')
self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'])
status = self.manager.get_routing_status()
self.assertEqual(status['nat_rules_count'], 1)
self.assertEqual(status['firewall_rules_count'], 1)
self.assertEqual(status['peer_routes_count'], 1)
def test_get_routing_status_corrupted_file(self):
# Corrupt the rules file to trigger exception
with open(self.manager.rules_file, 'w') as f:
f.write('{corrupt')
status = self.manager.get_routing_status()
# Should return safe defaults
self.assertEqual(status['nat_rules_count'], 0)
def test_get_nat_rules_empty(self):
rules = self.manager.get_nat_rules()
self.assertEqual(rules, [])
@patch.object(RoutingManager, '_apply_nat_rule')
def test_get_nat_rules_returns_list(self, mock_apply):
self.manager.add_nat_rule('10.0.0.0/24', 'eth0')
rules = self.manager.get_nat_rules()
self.assertEqual(len(rules), 1)
self.assertEqual(rules[0]['source_network'], '10.0.0.0/24')
def test_get_peer_routes_empty(self):
routes = self.manager.get_peer_routes()
self.assertEqual(routes, [])
@patch.object(RoutingManager, '_apply_peer_route')
def test_get_peer_routes_returns_list(self, mock_apply):
self.manager.add_peer_route('peer1', '10.0.0.2', ['10.0.0.0/24'])
routes = self.manager.get_peer_routes()
self.assertEqual(len(routes), 1)
self.assertEqual(routes[0]['peer_name'], 'peer1')
def test_get_firewall_rules_empty(self):
rules = self.manager.get_firewall_rules()
self.assertEqual(rules, [])
@patch.object(RoutingManager, '_apply_peer_route')
def test_update_peer_ip_updates_and_reapplies(self, mock_apply):
self.manager.add_peer_route('peer1', '10.0.0.2', ['10.0.0.0/24'])
result = self.manager.update_peer_ip('peer1', '10.0.0.99')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
self.assertEqual(rules['peer_routes']['peer1']['peer_ip'], '10.0.0.99')
def test_update_peer_ip_nonexistent_peer(self):
result = self.manager.update_peer_ip('nobody', '10.0.0.99')
self.assertFalse(result)
def test_remove_peer_route_nonexistent(self):
result = self.manager.remove_peer_route('nobody')
self.assertFalse(result)
def test_get_status_returns_dict(self):
status = self.manager.get_status()
self.assertIsInstance(status, dict)
self.assertIn('running', status)
self.assertIn('status', status)
self.assertIn('nat_rules_count', status)
def test_start_and_stop_service(self):
with patch('subprocess.run') as mock_sub:
mock_sub.return_value = MagicMock(returncode=0)
result = self.manager.start()
self.assertTrue(result)
self.assertTrue(self.manager._service_running)
result = self.manager.stop()
self.assertTrue(result)
self.assertFalse(self.manager._service_running)
def test_start_persists_service_state(self):
with patch('subprocess.run') as mock_sub:
mock_sub.return_value = MagicMock(returncode=0)
self.manager.start()
# State file should exist now
self.assertTrue(os.path.exists(self.manager._state_file))
with open(self.manager._state_file) as f:
state = json.load(f)
self.assertTrue(state['running'])
def test_stop_persists_service_state(self):
self.manager.stop()
with open(self.manager._state_file) as f:
state = json.load(f)
self.assertFalse(state['running'])
def test_restart_service(self):
with patch('subprocess.run') as mock_sub:
mock_sub.return_value = MagicMock(returncode=0)
with patch('time.sleep'):
result = self.manager.restart()
self.assertTrue(result)
def test_test_connectivity_returns_dict(self):
with patch('subprocess.run') as mock_sub:
mock_sub.return_value = MagicMock(returncode=0, stdout='', stderr='')
result = self.manager.test_connectivity()
self.assertIsInstance(result, dict)
self.assertIn('routing_functionality', result)
self.assertIn('iptables_access', result)
self.assertIn('network_interfaces', result)
self.assertIn('routing_table_access', result)
def test_test_routing_connectivity_returns_results(self):
with patch('subprocess.run') as mock_sub:
mock_sub.return_value = MagicMock(returncode=0, stdout='OK', stderr='')
results = self.manager.test_routing_connectivity('8.8.8.8')
self.assertIn('ping', results)
self.assertIn('traceroute', results)
def test_test_routing_connectivity_with_via_peer(self):
with patch('subprocess.run') as mock_sub:
mock_sub.return_value = MagicMock(returncode=0, stdout='OK', stderr='')
results = self.manager.test_routing_connectivity('8.8.8.8', via_peer='10.0.0.1')
self.assertIn('peer_route', results)
def test_test_routing_connectivity_subprocess_exception(self):
with patch('subprocess.run', side_effect=Exception('timeout')):
results = self.manager.test_routing_connectivity('8.8.8.8')
self.assertIn('ping', results)
self.assertFalse(results['ping']['success'])
def test_get_routing_logs(self):
with patch('subprocess.run') as mock_sub:
mock_sub.return_value = MagicMock(returncode=0, stdout='log line', stderr='')
logs = self.manager.get_routing_logs()
self.assertIsInstance(logs, dict)
self.assertIn('routes', logs)
def test_parse_route_basic(self):
parsed = self.manager._parse_route('10.0.0.0/24 via 172.20.0.1 dev eth0 metric 100')
self.assertEqual(parsed['destination'], '10.0.0.0/24')
self.assertEqual(parsed['via'], '172.20.0.1')
self.assertEqual(parsed['dev'], 'eth0')
self.assertEqual(parsed['metric'], '100')
def test_parse_route_no_via(self):
parsed = self.manager._parse_route('10.0.0.0/24 dev eth0')
self.assertEqual(parsed['destination'], '10.0.0.0/24')
self.assertEqual(parsed['dev'], 'eth0')
self.assertEqual(parsed['via'], '')
def test_parse_route_empty_string(self):
parsed = self.manager._parse_route('')
self.assertEqual(parsed['destination'], '')
def test_validate_cidr_valid(self):
self.assertTrue(self.manager._validate_cidr('10.0.0.0/24'))
self.assertTrue(self.manager._validate_cidr('192.168.1.0/24'))
self.assertTrue(self.manager._validate_cidr('172.16.0.0/12'))
self.assertTrue(self.manager._validate_cidr('0.0.0.0/0'))
def test_validate_cidr_invalid(self):
self.assertFalse(self.manager._validate_cidr('not-a-cidr'))
self.assertFalse(self.manager._validate_cidr(''))
self.assertFalse(self.manager._validate_cidr('10.0.0.300/24'))
def test_load_service_state_from_file(self):
"""State is restored from the file on the next instantiation."""
self.manager._service_running = True
self.manager._save_service_state()
new_manager = RoutingManager(data_dir=self.data_dir, config_dir=self.config_dir)
self.assertTrue(new_manager._service_running)
def test_load_service_state_no_file(self):
"""Without a state file, service defaults to running=True."""
if os.path.exists(self.manager._state_file):
os.remove(self.manager._state_file)
new_manager = RoutingManager(data_dir=self.data_dir, config_dir=self.config_dir)
self.assertTrue(new_manager._service_running)
def test_get_live_iptables(self):
with patch('subprocess.run') as mock_sub:
mock_sub.return_value = MagicMock(returncode=0, stdout='Chain INPUT', stderr='')
result = self.manager.get_live_iptables()
self.assertIn('filter', result)
self.assertIn('nat', result)
def test_get_live_iptables_subprocess_exception(self):
with patch('subprocess.run', side_effect=Exception('no docker')):
result = self.manager.get_live_iptables()
self.assertIn('filter', result)
self.assertIn('nat', result)
@patch.object(RoutingManager, '_apply_nat_rule')
def test_nat_rule_snat_type(self, mock_apply):
result = self.manager.add_nat_rule(
'10.0.0.0/24', 'eth0', nat_type='SNAT',
internal_ip='10.0.0.5', internal_port='8080')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
rule = rules['nat_rules'][0]
self.assertEqual(rule['nat_type'], 'SNAT')
@patch.object(RoutingManager, '_apply_nat_rule')
def test_nat_rule_dnat_type(self, mock_apply):
result = self.manager.add_nat_rule(
'10.0.0.0/24', 'eth0', nat_type='DNAT',
internal_ip='10.0.0.5', external_port='80', internal_port='8080')
self.assertTrue(result)
with open(self.manager.rules_file) as f:
rules = json.load(f)
rule = rules['nat_rules'][0]
self.assertEqual(rule['nat_type'], 'DNAT')
@patch.object(RoutingManager, '_apply_nat_rule')
def test_nat_rule_tcp_protocol(self, mock_apply):
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', protocol='TCP')
self.assertTrue(result)
@patch.object(RoutingManager, '_apply_nat_rule')
def test_nat_rule_udp_protocol(self, mock_apply):
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', protocol='UDP')
self.assertTrue(result)
@patch.object(RoutingManager, '_apply_peer_route')
def test_peer_route_exit_type(self, mock_apply):
result = self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'], route_type='exit')
self.assertTrue(result)
@patch.object(RoutingManager, '_apply_peer_route')
def test_peer_route_bridge_type(self, mock_apply):
result = self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'], route_type='bridge')
self.assertTrue(result)
@patch.object(RoutingManager, '_apply_peer_route')
def test_peer_route_split_type(self, mock_apply):
result = self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'], route_type='split')
self.assertTrue(result)
def test_ensure_config_exists_is_idempotent(self):
"""Calling _ensure_config_exists twice does not raise."""
self.manager._ensure_config_exists()
self.manager._ensure_config_exists()
self.assertTrue(os.path.exists(self.manager.rules_file))
def test_save_rules_failure_is_silent(self):
"""_save_rules failure (e.g. permission error) doesn't raise."""
with patch('builtins.open', side_effect=OSError('disk full')):
# Should not raise
self.manager._save_rules({'nat_rules': [], 'peer_routes': {}})
def test_load_rules_failure_returns_empty_dict(self):
"""_load_rules failure returns {}."""
with open(self.manager.rules_file, 'w') as f:
f.write('{corrupt')
result = self.manager._load_rules()
self.assertEqual(result, {})
def test_test_iptables_access_not_found(self):
"""FileNotFoundError from iptables returns success=True (dev mode)."""
with patch('subprocess.run', side_effect=FileNotFoundError()):
result = self.manager._test_iptables_access()
self.assertTrue(result['success'])
def test_test_network_interfaces_not_found(self):
"""FileNotFoundError from ip link show returns success=True (dev mode)."""
with patch('subprocess.run', side_effect=FileNotFoundError()):
result = self.manager._test_network_interfaces()
self.assertTrue(result['success'])
def test_test_routing_table_not_found(self):
"""FileNotFoundError from ip route show returns success=True (dev mode)."""
with patch('subprocess.run', side_effect=FileNotFoundError()):
result = self.manager._test_routing_table_access()
self.assertTrue(result['success'])
def test_is_routing_service_running_reflects_state(self):
self.manager._service_running = True
self.assertTrue(self.manager._is_routing_service_running())
self.manager._service_running = False
self.assertFalse(self.manager._is_routing_service_running())
@patch('subprocess.run')
def test_apply_nat_rule_masquerade(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
rule = {
'source_network': '10.0.0.0/24',
'target_interface': 'eth0',
'masquerade': True,
'nat_type': 'MASQUERADE',
'protocol': 'ALL',
'internal_ip': None,
'external_port': None,
'internal_port': None,
}
self.manager._apply_nat_rule(rule)
mock_sub.assert_called_once()
call_args = mock_sub.call_args[0][0]
self.assertIn('iptables', call_args)
self.assertIn('MASQUERADE', call_args)
@patch('subprocess.run')
def test_apply_nat_rule_dnat(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
rule = {
'source_network': '10.0.0.0/24',
'target_interface': 'eth0',
'masquerade': False,
'nat_type': 'DNAT',
'protocol': 'TCP',
'internal_ip': '192.168.1.10',
'external_port': '80',
'internal_port': '8080',
}
self.manager._apply_nat_rule(rule)
mock_sub.assert_called_once()
call_args = mock_sub.call_args[0][0]
self.assertIn('DNAT', call_args)
@patch('subprocess.run')
def test_apply_nat_rule_snat(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
rule = {
'source_network': '10.0.0.0/24',
'target_interface': 'eth0',
'masquerade': False,
'nat_type': 'SNAT',
'protocol': 'TCP',
'internal_ip': '192.168.1.10',
'external_port': None,
'internal_port': '8080',
}
self.manager._apply_nat_rule(rule)
mock_sub.assert_called_once()
call_args = mock_sub.call_args[0][0]
self.assertIn('SNAT', call_args)
@patch('subprocess.run')
def test_apply_firewall_rule_with_port(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
rule = {
'rule_type': 'INPUT',
'source': '10.0.0.0/24',
'destination': '192.168.1.0/24',
'action': 'ACCEPT',
'protocol': 'TCP',
'port': '443',
'port_range': None,
}
self.manager._apply_firewall_rule(rule)
mock_sub.assert_called_once()
call_args = mock_sub.call_args[0][0]
self.assertIn('--dport', call_args)
self.assertIn('443', call_args)
@patch('subprocess.run')
def test_apply_firewall_rule_with_port_range(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
rule = {
'rule_type': 'INPUT',
'source': '10.0.0.0/24',
'destination': '192.168.1.0/24',
'action': 'ACCEPT',
'protocol': 'ALL',
'port': None,
'port_range': '1000-2000',
}
self.manager._apply_firewall_rule(rule)
call_args = mock_sub.call_args[0][0]
# Port range should be passed as 1000:2000 for iptables
self.assertIn('1000:2000', call_args)
def test_parse_proc_net_route_valid_data(self):
"""_parse_proc_net_route parses /proc/net/route format correctly."""
import tempfile
route_content = (
"Iface\tDestination\tGateway\tFlags\tRefCnt\tUse\tMetric\tMask\tMTU\tWindow\tIRTT\n"
"eth0\t00000000\t0101A8C0\t0003\t0\t0\t100\t00000000\t0\t0\t0\n"
"eth0\t000011AC\t00000000\t0001\t0\t0\t0\tF0FFFFFF\t0\t0\t0\n"
)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write(route_content)
fname = f.name
try:
routes = self.manager._parse_proc_net_route(fname)
self.assertIsInstance(routes, list)
# Should have parsed the two non-header lines
self.assertEqual(len(routes), 2)
# First entry should be a default route
self.assertIn('route', routes[0])
finally:
os.unlink(fname)
class TestRoutingManagerInternalMethods(unittest.TestCase):
"""Tests for internal methods that are harder to reach via public API."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.data_dir = os.path.join(self.test_dir, 'data')
self.config_dir = os.path.join(self.test_dir, 'config')
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.config_dir, exist_ok=True)
self.manager = RoutingManager(self.data_dir, self.config_dir)
def tearDown(self):
shutil.rmtree(self.test_dir)
# ── _apply_peer_route ─────────────────────────────────────────────────
@patch('subprocess.run')
def test_apply_peer_route_calls_ip_route(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
self.manager._apply_peer_route({
'peer_name': 'alice',
'peer_ip': '10.0.0.2',
'allowed_networks': ['192.168.100.0/24', '10.1.0.0/16']
})
self.assertEqual(mock_sub.call_count, 2)
@patch('subprocess.run', side_effect=subprocess.CalledProcessError(1, 'ip'))
def test_apply_peer_route_exception_is_silent(self, _mock):
# Should not raise
self.manager._apply_peer_route({
'peer_name': 'alice',
'peer_ip': '10.0.0.2',
'allowed_networks': ['192.168.100.0/24']
})
# ── _remove_peer_route ────────────────────────────────────────────────
@patch('subprocess.run')
def test_remove_peer_route_calls_ip_route_del(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
self.manager._remove_peer_route('alice')
mock_sub.assert_called_once()
call_args = mock_sub.call_args[0][0]
self.assertIn('del', call_args)
@patch('subprocess.run', side_effect=FileNotFoundError('ip not found'))
def test_remove_peer_route_exception_is_silent(self, _mock):
self.manager._remove_peer_route('alice') # Should not raise
# ── _apply_exit_node ──────────────────────────────────────────────────
@patch('subprocess.run')
def test_apply_exit_node_adds_default_route(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
self.manager._apply_exit_node({
'peer_name': 'exitnode',
'peer_ip': '10.0.0.2'
})
call_args = mock_sub.call_args[0][0]
self.assertIn('default', call_args)
@patch('subprocess.run', side_effect=Exception('permission denied'))
def test_apply_exit_node_exception_is_silent(self, _mock):
self.manager._apply_exit_node({'peer_name': 'exit', 'peer_ip': '10.0.0.2'})
# ── _apply_bridge_route ───────────────────────────────────────────────
@patch('subprocess.run')
def test_apply_bridge_route_adds_forward_rules(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
self.manager._apply_bridge_route({
'source_peer': 'src',
'target_peer': 'dst',
'allowed_networks': ['10.1.0.0/16', '10.2.0.0/16']
})
self.assertEqual(mock_sub.call_count, 2)
@patch('subprocess.run', side_effect=Exception('fail'))
def test_apply_bridge_route_exception_is_silent(self, _mock):
self.manager._apply_bridge_route({
'source_peer': 'src', 'target_peer': 'dst',
'allowed_networks': ['10.1.0.0/16']
})
# ── _apply_split_route ────────────────────────────────────────────────
@patch('subprocess.run')
def test_apply_split_route_adds_specific_route(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
self.manager._apply_split_route({
'network': '10.100.0.0/16',
'exit_peer': 'exit_wg',
'priority': 100
})
self.assertEqual(mock_sub.call_count, 1)
call_args = mock_sub.call_args[0][0]
self.assertIn('10.100.0.0/16', call_args)
@patch('subprocess.run', side_effect=Exception('fail'))
def test_apply_split_route_exception_is_silent(self, _mock):
self.manager._apply_split_route({
'network': '10.100.0.0/16', 'exit_peer': 'wg0', 'priority': 100
})
# ── _remove_nat_rule ──────────────────────────────────────────────────
@patch('subprocess.run')
def test_remove_nat_rule_calls_iptables_D(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0)
self.manager._remove_nat_rule('rule_abc')
call_args = mock_sub.call_args[0][0]
self.assertIn('-D', call_args)
self.assertIn('POSTROUTING', call_args)
@patch('subprocess.run')
def test_remove_nat_rule_not_found_is_silent(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=1)
self.manager._remove_nat_rule('nonexistent_rule') # Should not raise
@patch('subprocess.run', side_effect=FileNotFoundError('iptables'))
def test_remove_nat_rule_exception_is_silent(self, _mock):
self.manager._remove_nat_rule('rule_x')
# ── get_routing_logs ──────────────────────────────────────────────────
@patch('subprocess.run')
def test_get_routing_logs_returns_dict(self, mock_sub):
mock_sub.return_value = MagicMock(returncode=0, stdout='some output\n')
result = self.manager.get_routing_logs()
self.assertIsInstance(result, dict)
self.assertIn('routes', result)
@patch('subprocess.run', side_effect=Exception('system error'))
def test_get_routing_logs_outer_exception_returns_error_dict(self, _mock):
# The outer try will succeed but inner dmesg calls will fail
result = self.manager.get_routing_logs()
self.assertIsInstance(result, dict)
# ── remove_firewall_rule exception path ───────────────────────────────
@patch('subprocess.run')
def test_remove_firewall_rule_exception_returns_false(self, mock_sub):
mock_sub.side_effect = Exception('unexpected error')
# The outer try in remove_firewall_rule should catch and return False
# This requires the rules file to have a matching rule that triggers subprocess
result = self.manager.remove_firewall_rule('nonexistent_rule_id')
# nonexistent rule -> returns False (not found path)
self.assertFalse(result)
# ── _get_routing_table ────────────────────────────────────────────────
@patch('subprocess.run')
def test_get_routing_table_returns_list(self, mock_sub):
mock_sub.return_value = MagicMock(
returncode=0,
stdout='default via 192.168.1.1 dev eth0\n10.0.0.0/24 dev wg0 proto kernel\n'
)
result = self.manager._get_routing_table()
self.assertIsInstance(result, list)
@patch('subprocess.run', side_effect=FileNotFoundError('docker'))
def test_get_routing_table_fallback_exception_returns_empty(self, _mock):
"""When both /proc/1/net/route and docker exec fail, return empty list."""
with patch('builtins.open', side_effect=FileNotFoundError('/proc/1/net/route')):
result = self.manager._get_routing_table()
self.assertEqual(result, [])
# ── start with sysctl exception ───────────────────────────────────────
@patch('subprocess.run', side_effect=FileNotFoundError('sysctl'))
def test_start_continues_when_sysctl_not_found(self, _mock):
result = self.manager.start()
self.assertTrue(result)
self.assertTrue(self.manager._service_running)
@patch('subprocess.run', side_effect=subprocess.CalledProcessError(1, 'sysctl'))
def test_start_continues_when_sysctl_fails(self, _mock):
result = self.manager.start()
self.assertTrue(result)
if __name__ == '__main__':
unittest.main()