aa1e5c41ec
Unit Tests / test (push) Successful in 12m6s
Coverage was below acceptable levels and several newly-added code paths (sshuttle egress, proxy egress, DDNS provider stubs, DNS overview route, peer-registry provisioning) had zero test coverage. ~250 new unit tests are added across 16 new test files. Existing test files are updated to match refactored interfaces (DHCP removed, constants introduced, network_manager restructured). .coveragerc is added to pin the source mapping and the 70% floor so regressions are caught at commit time. tests/test_enhanced_api.py was previously living in api/ (wrong location) and is moved to tests/ where it belongs. Integration test files are updated to remove references to DHCP endpoints and add coverage for the new DNS overview and DDNS sync endpoints. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
925 lines
41 KiB
Python
925 lines
41 KiB
Python
import sys
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
# Add api directory to path
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
import unittest
|
|
import tempfile
|
|
import shutil
|
|
import os
|
|
from unittest.mock import patch, MagicMock, call
|
|
from routing_manager import RoutingManager
|
|
import json
|
|
|
|
class TestRoutingManager(unittest.TestCase):
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.data_dir = os.path.join(self.test_dir, 'data')
|
|
self.config_dir = os.path.join(self.test_dir, 'config')
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
os.makedirs(self.config_dir, exist_ok=True)
|
|
self.manager = RoutingManager(data_dir=self.data_dir, config_dir=self.config_dir)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_initialization(self):
|
|
# Test RoutingManager initialization and config creation
|
|
self.assertTrue(os.path.exists(self.manager.routing_dir))
|
|
self.assertTrue(os.path.exists(self.manager.rules_file))
|
|
# Check that rules file contains default structure
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertIn('nat_rules', rules)
|
|
self.assertIn('peer_routes', rules)
|
|
self.assertIn('exit_nodes', rules)
|
|
self.assertIn('bridge_routes', rules)
|
|
self.assertIn('split_routes', rules)
|
|
self.assertIn('firewall_rules', rules)
|
|
self.assertIsInstance(rules['nat_rules'], list)
|
|
self.assertIsInstance(rules['peer_routes'], dict)
|
|
self.assertIsInstance(rules['exit_nodes'], list)
|
|
self.assertIsInstance(rules['bridge_routes'], list)
|
|
self.assertIsInstance(rules['split_routes'], list)
|
|
self.assertIsInstance(rules['firewall_rules'], list)
|
|
|
|
@patch.object(RoutingManager, '_apply_nat_rule', return_value=True)
|
|
@patch.object(RoutingManager, '_remove_nat_rule', return_value=True)
|
|
def test_add_and_remove_nat_rule(self, mock_remove_nat, mock_apply_nat):
|
|
# Add a valid NAT rule
|
|
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0')
|
|
self.assertTrue(result)
|
|
# Check that the rule is persisted
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(len(rules['nat_rules']), 1)
|
|
rule = rules['nat_rules'][0]
|
|
self.assertEqual(rule['source_network'], '10.0.0.0/24')
|
|
self.assertEqual(rule['target_interface'], 'eth0')
|
|
self.assertEqual(rule['nat_type'], 'MASQUERADE')
|
|
self.assertTrue(rule['enabled'])
|
|
# Remove the NAT rule
|
|
rule_id = rule['id']
|
|
result = self.manager.remove_nat_rule(rule_id)
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(len(rules['nat_rules']), 0)
|
|
# Test invalid NAT rule (bad CIDR)
|
|
result = self.manager.add_nat_rule('bad-cidr', 'eth0')
|
|
self.assertFalse(result)
|
|
# Test invalid NAT rule (bad interface)
|
|
result = self.manager.add_nat_rule('10.0.0.0/24', '')
|
|
self.assertFalse(result)
|
|
# Test invalid NAT rule (bad nat_type)
|
|
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', nat_type='INVALID')
|
|
self.assertFalse(result)
|
|
# Test invalid NAT rule (bad protocol)
|
|
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', protocol='INVALID')
|
|
self.assertFalse(result)
|
|
|
|
@patch.object(RoutingManager, '_apply_peer_route', return_value=True)
|
|
@patch.object(RoutingManager, '_remove_peer_route', return_value=True)
|
|
def test_add_and_remove_peer_route(self, mock_remove_peer, mock_apply_peer):
|
|
# Add a valid peer route
|
|
allowed_networks = ['10.0.0.0/24']
|
|
result = self.manager.add_peer_route('peer1', '10.0.0.2', allowed_networks)
|
|
self.assertTrue(result)
|
|
# Check that the route is persisted
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertIn('peer1', rules['peer_routes'])
|
|
route = rules['peer_routes']['peer1']
|
|
self.assertEqual(route['peer_name'], 'peer1')
|
|
self.assertEqual(route['peer_ip'], '10.0.0.2')
|
|
self.assertEqual(route['allowed_networks'], allowed_networks)
|
|
self.assertEqual(route['route_type'], 'lan')
|
|
self.assertTrue(route['enabled'])
|
|
# Remove the peer route
|
|
result = self.manager.remove_peer_route('peer1')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertNotIn('peer1', rules['peer_routes'])
|
|
# Test invalid peer route (bad peer_name)
|
|
result = self.manager.add_peer_route('', '10.0.0.2', allowed_networks)
|
|
self.assertFalse(result)
|
|
# Test invalid peer route (bad peer_ip)
|
|
result = self.manager.add_peer_route('peer2', '', allowed_networks)
|
|
self.assertFalse(result)
|
|
# Test invalid peer route (bad allowed_networks)
|
|
result = self.manager.add_peer_route('peer3', '10.0.0.3', ['bad-cidr'])
|
|
self.assertFalse(result)
|
|
# Test invalid peer route (bad route_type)
|
|
result = self.manager.add_peer_route('peer4', '10.0.0.4', allowed_networks, route_type='invalid')
|
|
self.assertFalse(result)
|
|
|
|
@patch.object(RoutingManager, '_apply_exit_node')
|
|
def test_add_exit_node_valid(self, mock_apply):
|
|
result = self.manager.add_exit_node('peer1', '10.0.0.2')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(len(rules['exit_nodes']), 1)
|
|
node = rules['exit_nodes'][0]
|
|
self.assertEqual(node['peer_name'], 'peer1')
|
|
self.assertEqual(node['peer_ip'], '10.0.0.2')
|
|
self.assertTrue(node['enabled'])
|
|
mock_apply.assert_called_once()
|
|
|
|
@patch.object(RoutingManager, '_apply_exit_node')
|
|
def test_add_exit_node_with_allowed_domains(self, mock_apply):
|
|
result = self.manager.add_exit_node('peer1', '10.0.0.2', allowed_domains=['example.com'])
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(rules['exit_nodes'][0]['allowed_domains'], ['example.com'])
|
|
|
|
def test_add_exit_node_invalid_peer_name(self):
|
|
result = self.manager.add_exit_node('bad name!', '10.0.0.2')
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
def test_add_exit_node_invalid_ip(self):
|
|
result = self.manager.add_exit_node('peer1', 'not-an-ip')
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
def test_add_exit_node_invalid_domains(self):
|
|
result = self.manager.add_exit_node('peer1', '10.0.0.2', allowed_domains='not-a-list')
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
def test_add_exit_node_invalid_domain_format(self):
|
|
result = self.manager.add_exit_node('peer1', '10.0.0.2', allowed_domains=['bad domain!'])
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
@patch.object(RoutingManager, '_apply_bridge_route')
|
|
def test_add_bridge_route_valid(self, mock_apply):
|
|
result = self.manager.add_bridge_route('src-peer', '192.168.1.0/24', ['10.0.0.0/24'])
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(len(rules['bridge_routes']), 1)
|
|
route = rules['bridge_routes'][0]
|
|
self.assertEqual(route['source_peer'], 'src-peer')
|
|
self.assertEqual(route['target_peer'], '192.168.1.0/24')
|
|
mock_apply.assert_called_once()
|
|
|
|
def test_add_bridge_route_invalid_source(self):
|
|
result = self.manager.add_bridge_route('bad name!', '192.168.1.0/24', ['10.0.0.0/24'])
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
def test_add_bridge_route_invalid_target(self):
|
|
result = self.manager.add_bridge_route('src-peer', 'not-a-network', ['10.0.0.0/24'])
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
def test_add_bridge_route_empty_allowed_networks(self):
|
|
result = self.manager.add_bridge_route('src-peer', '192.168.1.0/24', [])
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
def test_add_bridge_route_invalid_network_in_list(self):
|
|
result = self.manager.add_bridge_route('src-peer', '192.168.1.0/24', ['not-a-cidr'])
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
@patch.object(RoutingManager, '_apply_split_route')
|
|
def test_add_split_route_valid(self, mock_apply):
|
|
result = self.manager.add_split_route('10.0.0.0/24', '10.0.0.1')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(len(rules['split_routes']), 1)
|
|
route = rules['split_routes'][0]
|
|
self.assertEqual(route['network'], '10.0.0.0/24')
|
|
self.assertEqual(route['exit_peer'], '10.0.0.1')
|
|
mock_apply.assert_called_once()
|
|
|
|
@patch.object(RoutingManager, '_apply_split_route')
|
|
def test_add_split_route_with_fallback(self, mock_apply):
|
|
result = self.manager.add_split_route('10.0.0.0/24', '10.0.0.1', fallback_peer='10.0.0.2')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(rules['split_routes'][0]['fallback_peer'], '10.0.0.2')
|
|
|
|
def test_add_split_route_invalid_network(self):
|
|
result = self.manager.add_split_route('not-a-cidr', '10.0.0.1')
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
def test_add_split_route_invalid_exit_peer(self):
|
|
result = self.manager.add_split_route('10.0.0.0/24', 'not-an-ip')
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
def test_add_split_route_invalid_fallback(self):
|
|
result = self.manager.add_split_route('10.0.0.0/24', '10.0.0.1', fallback_peer='not-ip')
|
|
self.assertIsInstance(result, dict)
|
|
self.assertFalse(result.get('success', True))
|
|
|
|
@patch.object(RoutingManager, '_apply_firewall_rule')
|
|
def test_add_firewall_rule_valid(self, mock_apply):
|
|
result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(len(rules['firewall_rules']), 1)
|
|
rule = rules['firewall_rules'][0]
|
|
self.assertEqual(rule['rule_type'], 'FORWARD')
|
|
self.assertEqual(rule['source'], '10.0.0.0/24')
|
|
self.assertEqual(rule['destination'], '192.168.1.0/24')
|
|
self.assertEqual(rule['action'], 'ACCEPT')
|
|
mock_apply.assert_called_once()
|
|
|
|
@patch.object(RoutingManager, '_apply_firewall_rule')
|
|
def test_add_firewall_rule_with_port(self, mock_apply):
|
|
result = self.manager.add_firewall_rule(
|
|
'INPUT', '10.0.0.0/24', '192.168.1.0/24', protocol='TCP', port='80')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
rule = rules['firewall_rules'][0]
|
|
self.assertEqual(rule['protocol'], 'TCP')
|
|
self.assertEqual(rule['port'], '80')
|
|
|
|
@patch.object(RoutingManager, '_apply_firewall_rule')
|
|
def test_add_firewall_rule_with_port_range(self, mock_apply):
|
|
result = self.manager.add_firewall_rule(
|
|
'INPUT', '10.0.0.0/24', '192.168.1.0/24', port_range='1000-2000')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(rules['firewall_rules'][0]['port_range'], '1000-2000')
|
|
|
|
def test_add_firewall_rule_invalid_type(self):
|
|
result = self.manager.add_firewall_rule('BADCHAIN', '10.0.0.0/24', '192.168.1.0/24')
|
|
self.assertFalse(result)
|
|
|
|
def test_add_firewall_rule_invalid_source(self):
|
|
result = self.manager.add_firewall_rule('FORWARD', 'not-cidr', '192.168.1.0/24')
|
|
self.assertFalse(result)
|
|
|
|
def test_add_firewall_rule_invalid_destination(self):
|
|
result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', 'not-cidr')
|
|
self.assertFalse(result)
|
|
|
|
def test_add_firewall_rule_invalid_action(self):
|
|
result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24', action='JUMP')
|
|
self.assertFalse(result)
|
|
|
|
def test_add_firewall_rule_invalid_protocol(self):
|
|
result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24', protocol='SCTP')
|
|
self.assertFalse(result)
|
|
|
|
def test_add_firewall_rule_invalid_port_value(self):
|
|
result = self.manager.add_firewall_rule(
|
|
'FORWARD', '10.0.0.0/24', '192.168.1.0/24', port='99999')
|
|
self.assertFalse(result)
|
|
|
|
def test_add_firewall_rule_port_not_number(self):
|
|
result = self.manager.add_firewall_rule(
|
|
'FORWARD', '10.0.0.0/24', '192.168.1.0/24', port='abc')
|
|
self.assertFalse(result)
|
|
|
|
def test_add_firewall_rule_invalid_port_range_format(self):
|
|
result = self.manager.add_firewall_rule(
|
|
'FORWARD', '10.0.0.0/24', '192.168.1.0/24', port_range='abc-def')
|
|
self.assertFalse(result)
|
|
|
|
@patch.object(RoutingManager, '_apply_firewall_rule')
|
|
@patch('subprocess.run')
|
|
def test_remove_firewall_rule(self, mock_sub, mock_apply):
|
|
mock_proc = MagicMock()
|
|
mock_proc.returncode = 0
|
|
mock_sub.return_value = mock_proc
|
|
self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24')
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
rule_id = rules['firewall_rules'][0]['id']
|
|
result = self.manager.remove_firewall_rule(rule_id)
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(len(rules['firewall_rules']), 0)
|
|
|
|
@patch.object(RoutingManager, '_apply_firewall_rule')
|
|
def test_remove_firewall_rule_not_found(self, mock_apply):
|
|
result = self.manager.remove_firewall_rule('nonexistent_id')
|
|
self.assertFalse(result)
|
|
|
|
@patch.object(RoutingManager, '_apply_firewall_rule')
|
|
@patch('subprocess.run')
|
|
def test_remove_firewall_rule_with_tcp_port(self, mock_sub, mock_apply):
|
|
"""remove_firewall_rule builds correct iptables -D cmd for TCP+port rule."""
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
self.manager.add_firewall_rule(
|
|
'INPUT', '10.0.0.0/24', '192.168.1.0/24', protocol='TCP', port='443')
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
rule_id = rules['firewall_rules'][0]['id']
|
|
result = self.manager.remove_firewall_rule(rule_id)
|
|
self.assertTrue(result)
|
|
# Check subprocess was called with iptables -D
|
|
call_args = mock_sub.call_args[0][0]
|
|
self.assertIn('iptables', call_args)
|
|
self.assertIn('-D', call_args)
|
|
|
|
def test_get_routing_status_empty(self):
|
|
status = self.manager.get_routing_status()
|
|
self.assertIn('nat_rules_count', status)
|
|
self.assertEqual(status['nat_rules_count'], 0)
|
|
self.assertIn('firewall_rules_count', status)
|
|
self.assertIn('peer_routes_count', status)
|
|
self.assertIn('exit_nodes_count', status)
|
|
self.assertIn('bridge_routes_count', status)
|
|
self.assertIn('split_routes_count', status)
|
|
self.assertIn('routing_table', status)
|
|
self.assertIn('active_rules', status)
|
|
|
|
@patch.object(RoutingManager, '_apply_nat_rule')
|
|
@patch.object(RoutingManager, '_apply_firewall_rule')
|
|
@patch.object(RoutingManager, '_apply_peer_route')
|
|
def test_get_routing_status_counts(self, mock_peer, mock_fw, mock_nat):
|
|
self.manager.add_nat_rule('10.0.0.0/24', 'eth0')
|
|
self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24')
|
|
self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'])
|
|
status = self.manager.get_routing_status()
|
|
self.assertEqual(status['nat_rules_count'], 1)
|
|
self.assertEqual(status['firewall_rules_count'], 1)
|
|
self.assertEqual(status['peer_routes_count'], 1)
|
|
|
|
def test_get_routing_status_corrupted_file(self):
|
|
# Corrupt the rules file to trigger exception
|
|
with open(self.manager.rules_file, 'w') as f:
|
|
f.write('{corrupt')
|
|
status = self.manager.get_routing_status()
|
|
# Should return safe defaults
|
|
self.assertEqual(status['nat_rules_count'], 0)
|
|
|
|
def test_get_nat_rules_empty(self):
|
|
rules = self.manager.get_nat_rules()
|
|
self.assertEqual(rules, [])
|
|
|
|
@patch.object(RoutingManager, '_apply_nat_rule')
|
|
def test_get_nat_rules_returns_list(self, mock_apply):
|
|
self.manager.add_nat_rule('10.0.0.0/24', 'eth0')
|
|
rules = self.manager.get_nat_rules()
|
|
self.assertEqual(len(rules), 1)
|
|
self.assertEqual(rules[0]['source_network'], '10.0.0.0/24')
|
|
|
|
def test_get_peer_routes_empty(self):
|
|
routes = self.manager.get_peer_routes()
|
|
self.assertEqual(routes, [])
|
|
|
|
@patch.object(RoutingManager, '_apply_peer_route')
|
|
def test_get_peer_routes_returns_list(self, mock_apply):
|
|
self.manager.add_peer_route('peer1', '10.0.0.2', ['10.0.0.0/24'])
|
|
routes = self.manager.get_peer_routes()
|
|
self.assertEqual(len(routes), 1)
|
|
self.assertEqual(routes[0]['peer_name'], 'peer1')
|
|
|
|
def test_get_firewall_rules_empty(self):
|
|
rules = self.manager.get_firewall_rules()
|
|
self.assertEqual(rules, [])
|
|
|
|
@patch.object(RoutingManager, '_apply_peer_route')
|
|
def test_update_peer_ip_updates_and_reapplies(self, mock_apply):
|
|
self.manager.add_peer_route('peer1', '10.0.0.2', ['10.0.0.0/24'])
|
|
result = self.manager.update_peer_ip('peer1', '10.0.0.99')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
self.assertEqual(rules['peer_routes']['peer1']['peer_ip'], '10.0.0.99')
|
|
|
|
def test_update_peer_ip_nonexistent_peer(self):
|
|
result = self.manager.update_peer_ip('nobody', '10.0.0.99')
|
|
self.assertFalse(result)
|
|
|
|
def test_remove_peer_route_nonexistent(self):
|
|
result = self.manager.remove_peer_route('nobody')
|
|
self.assertFalse(result)
|
|
|
|
def test_get_status_returns_dict(self):
|
|
status = self.manager.get_status()
|
|
self.assertIsInstance(status, dict)
|
|
self.assertIn('running', status)
|
|
self.assertIn('status', status)
|
|
self.assertIn('nat_rules_count', status)
|
|
|
|
def test_start_and_stop_service(self):
|
|
with patch('subprocess.run') as mock_sub:
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
result = self.manager.start()
|
|
self.assertTrue(result)
|
|
self.assertTrue(self.manager._service_running)
|
|
result = self.manager.stop()
|
|
self.assertTrue(result)
|
|
self.assertFalse(self.manager._service_running)
|
|
|
|
def test_start_persists_service_state(self):
|
|
with patch('subprocess.run') as mock_sub:
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
self.manager.start()
|
|
# State file should exist now
|
|
self.assertTrue(os.path.exists(self.manager._state_file))
|
|
with open(self.manager._state_file) as f:
|
|
state = json.load(f)
|
|
self.assertTrue(state['running'])
|
|
|
|
def test_stop_persists_service_state(self):
|
|
self.manager.stop()
|
|
with open(self.manager._state_file) as f:
|
|
state = json.load(f)
|
|
self.assertFalse(state['running'])
|
|
|
|
def test_restart_service(self):
|
|
with patch('subprocess.run') as mock_sub:
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
with patch('time.sleep'):
|
|
result = self.manager.restart()
|
|
self.assertTrue(result)
|
|
|
|
def test_test_connectivity_returns_dict(self):
|
|
with patch('subprocess.run') as mock_sub:
|
|
mock_sub.return_value = MagicMock(returncode=0, stdout='', stderr='')
|
|
result = self.manager.test_connectivity()
|
|
self.assertIsInstance(result, dict)
|
|
self.assertIn('routing_functionality', result)
|
|
self.assertIn('iptables_access', result)
|
|
self.assertIn('network_interfaces', result)
|
|
self.assertIn('routing_table_access', result)
|
|
|
|
def test_test_routing_connectivity_returns_results(self):
|
|
with patch('subprocess.run') as mock_sub:
|
|
mock_sub.return_value = MagicMock(returncode=0, stdout='OK', stderr='')
|
|
results = self.manager.test_routing_connectivity('8.8.8.8')
|
|
self.assertIn('ping', results)
|
|
self.assertIn('traceroute', results)
|
|
|
|
def test_test_routing_connectivity_with_via_peer(self):
|
|
with patch('subprocess.run') as mock_sub:
|
|
mock_sub.return_value = MagicMock(returncode=0, stdout='OK', stderr='')
|
|
results = self.manager.test_routing_connectivity('8.8.8.8', via_peer='10.0.0.1')
|
|
self.assertIn('peer_route', results)
|
|
|
|
def test_test_routing_connectivity_subprocess_exception(self):
|
|
with patch('subprocess.run', side_effect=Exception('timeout')):
|
|
results = self.manager.test_routing_connectivity('8.8.8.8')
|
|
self.assertIn('ping', results)
|
|
self.assertFalse(results['ping']['success'])
|
|
|
|
def test_get_routing_logs(self):
|
|
with patch('subprocess.run') as mock_sub:
|
|
mock_sub.return_value = MagicMock(returncode=0, stdout='log line', stderr='')
|
|
logs = self.manager.get_routing_logs()
|
|
self.assertIsInstance(logs, dict)
|
|
self.assertIn('routes', logs)
|
|
|
|
def test_parse_route_basic(self):
|
|
parsed = self.manager._parse_route('10.0.0.0/24 via 172.20.0.1 dev eth0 metric 100')
|
|
self.assertEqual(parsed['destination'], '10.0.0.0/24')
|
|
self.assertEqual(parsed['via'], '172.20.0.1')
|
|
self.assertEqual(parsed['dev'], 'eth0')
|
|
self.assertEqual(parsed['metric'], '100')
|
|
|
|
def test_parse_route_no_via(self):
|
|
parsed = self.manager._parse_route('10.0.0.0/24 dev eth0')
|
|
self.assertEqual(parsed['destination'], '10.0.0.0/24')
|
|
self.assertEqual(parsed['dev'], 'eth0')
|
|
self.assertEqual(parsed['via'], '')
|
|
|
|
def test_parse_route_empty_string(self):
|
|
parsed = self.manager._parse_route('')
|
|
self.assertEqual(parsed['destination'], '')
|
|
|
|
def test_validate_cidr_valid(self):
|
|
self.assertTrue(self.manager._validate_cidr('10.0.0.0/24'))
|
|
self.assertTrue(self.manager._validate_cidr('192.168.1.0/24'))
|
|
self.assertTrue(self.manager._validate_cidr('172.16.0.0/12'))
|
|
self.assertTrue(self.manager._validate_cidr('0.0.0.0/0'))
|
|
|
|
def test_validate_cidr_invalid(self):
|
|
self.assertFalse(self.manager._validate_cidr('not-a-cidr'))
|
|
self.assertFalse(self.manager._validate_cidr(''))
|
|
self.assertFalse(self.manager._validate_cidr('10.0.0.300/24'))
|
|
|
|
def test_load_service_state_from_file(self):
|
|
"""State is restored from the file on the next instantiation."""
|
|
self.manager._service_running = True
|
|
self.manager._save_service_state()
|
|
new_manager = RoutingManager(data_dir=self.data_dir, config_dir=self.config_dir)
|
|
self.assertTrue(new_manager._service_running)
|
|
|
|
def test_load_service_state_no_file(self):
|
|
"""Without a state file, service defaults to running=True."""
|
|
if os.path.exists(self.manager._state_file):
|
|
os.remove(self.manager._state_file)
|
|
new_manager = RoutingManager(data_dir=self.data_dir, config_dir=self.config_dir)
|
|
self.assertTrue(new_manager._service_running)
|
|
|
|
def test_get_live_iptables(self):
|
|
with patch('subprocess.run') as mock_sub:
|
|
mock_sub.return_value = MagicMock(returncode=0, stdout='Chain INPUT', stderr='')
|
|
result = self.manager.get_live_iptables()
|
|
self.assertIn('filter', result)
|
|
self.assertIn('nat', result)
|
|
|
|
def test_get_live_iptables_subprocess_exception(self):
|
|
with patch('subprocess.run', side_effect=Exception('no docker')):
|
|
result = self.manager.get_live_iptables()
|
|
self.assertIn('filter', result)
|
|
self.assertIn('nat', result)
|
|
|
|
@patch.object(RoutingManager, '_apply_nat_rule')
|
|
def test_nat_rule_snat_type(self, mock_apply):
|
|
result = self.manager.add_nat_rule(
|
|
'10.0.0.0/24', 'eth0', nat_type='SNAT',
|
|
internal_ip='10.0.0.5', internal_port='8080')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
rule = rules['nat_rules'][0]
|
|
self.assertEqual(rule['nat_type'], 'SNAT')
|
|
|
|
@patch.object(RoutingManager, '_apply_nat_rule')
|
|
def test_nat_rule_dnat_type(self, mock_apply):
|
|
result = self.manager.add_nat_rule(
|
|
'10.0.0.0/24', 'eth0', nat_type='DNAT',
|
|
internal_ip='10.0.0.5', external_port='80', internal_port='8080')
|
|
self.assertTrue(result)
|
|
with open(self.manager.rules_file) as f:
|
|
rules = json.load(f)
|
|
rule = rules['nat_rules'][0]
|
|
self.assertEqual(rule['nat_type'], 'DNAT')
|
|
|
|
@patch.object(RoutingManager, '_apply_nat_rule')
|
|
def test_nat_rule_tcp_protocol(self, mock_apply):
|
|
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', protocol='TCP')
|
|
self.assertTrue(result)
|
|
|
|
@patch.object(RoutingManager, '_apply_nat_rule')
|
|
def test_nat_rule_udp_protocol(self, mock_apply):
|
|
result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', protocol='UDP')
|
|
self.assertTrue(result)
|
|
|
|
@patch.object(RoutingManager, '_apply_peer_route')
|
|
def test_peer_route_exit_type(self, mock_apply):
|
|
result = self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'], route_type='exit')
|
|
self.assertTrue(result)
|
|
|
|
@patch.object(RoutingManager, '_apply_peer_route')
|
|
def test_peer_route_bridge_type(self, mock_apply):
|
|
result = self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'], route_type='bridge')
|
|
self.assertTrue(result)
|
|
|
|
@patch.object(RoutingManager, '_apply_peer_route')
|
|
def test_peer_route_split_type(self, mock_apply):
|
|
result = self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'], route_type='split')
|
|
self.assertTrue(result)
|
|
|
|
def test_ensure_config_exists_is_idempotent(self):
|
|
"""Calling _ensure_config_exists twice does not raise."""
|
|
self.manager._ensure_config_exists()
|
|
self.manager._ensure_config_exists()
|
|
self.assertTrue(os.path.exists(self.manager.rules_file))
|
|
|
|
def test_save_rules_failure_is_silent(self):
|
|
"""_save_rules failure (e.g. permission error) doesn't raise."""
|
|
with patch('builtins.open', side_effect=OSError('disk full')):
|
|
# Should not raise
|
|
self.manager._save_rules({'nat_rules': [], 'peer_routes': {}})
|
|
|
|
def test_load_rules_failure_returns_empty_dict(self):
|
|
"""_load_rules failure returns {}."""
|
|
with open(self.manager.rules_file, 'w') as f:
|
|
f.write('{corrupt')
|
|
result = self.manager._load_rules()
|
|
self.assertEqual(result, {})
|
|
|
|
def test_test_iptables_access_not_found(self):
|
|
"""FileNotFoundError from iptables returns success=True (dev mode)."""
|
|
with patch('subprocess.run', side_effect=FileNotFoundError()):
|
|
result = self.manager._test_iptables_access()
|
|
self.assertTrue(result['success'])
|
|
|
|
def test_test_network_interfaces_not_found(self):
|
|
"""FileNotFoundError from ip link show returns success=True (dev mode)."""
|
|
with patch('subprocess.run', side_effect=FileNotFoundError()):
|
|
result = self.manager._test_network_interfaces()
|
|
self.assertTrue(result['success'])
|
|
|
|
def test_test_routing_table_not_found(self):
|
|
"""FileNotFoundError from ip route show returns success=True (dev mode)."""
|
|
with patch('subprocess.run', side_effect=FileNotFoundError()):
|
|
result = self.manager._test_routing_table_access()
|
|
self.assertTrue(result['success'])
|
|
|
|
def test_is_routing_service_running_reflects_state(self):
|
|
self.manager._service_running = True
|
|
self.assertTrue(self.manager._is_routing_service_running())
|
|
self.manager._service_running = False
|
|
self.assertFalse(self.manager._is_routing_service_running())
|
|
|
|
@patch('subprocess.run')
|
|
def test_apply_nat_rule_masquerade(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
rule = {
|
|
'source_network': '10.0.0.0/24',
|
|
'target_interface': 'eth0',
|
|
'masquerade': True,
|
|
'nat_type': 'MASQUERADE',
|
|
'protocol': 'ALL',
|
|
'internal_ip': None,
|
|
'external_port': None,
|
|
'internal_port': None,
|
|
}
|
|
self.manager._apply_nat_rule(rule)
|
|
mock_sub.assert_called_once()
|
|
call_args = mock_sub.call_args[0][0]
|
|
self.assertIn('iptables', call_args)
|
|
self.assertIn('MASQUERADE', call_args)
|
|
|
|
@patch('subprocess.run')
|
|
def test_apply_nat_rule_dnat(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
rule = {
|
|
'source_network': '10.0.0.0/24',
|
|
'target_interface': 'eth0',
|
|
'masquerade': False,
|
|
'nat_type': 'DNAT',
|
|
'protocol': 'TCP',
|
|
'internal_ip': '192.168.1.10',
|
|
'external_port': '80',
|
|
'internal_port': '8080',
|
|
}
|
|
self.manager._apply_nat_rule(rule)
|
|
mock_sub.assert_called_once()
|
|
call_args = mock_sub.call_args[0][0]
|
|
self.assertIn('DNAT', call_args)
|
|
|
|
@patch('subprocess.run')
|
|
def test_apply_nat_rule_snat(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
rule = {
|
|
'source_network': '10.0.0.0/24',
|
|
'target_interface': 'eth0',
|
|
'masquerade': False,
|
|
'nat_type': 'SNAT',
|
|
'protocol': 'TCP',
|
|
'internal_ip': '192.168.1.10',
|
|
'external_port': None,
|
|
'internal_port': '8080',
|
|
}
|
|
self.manager._apply_nat_rule(rule)
|
|
mock_sub.assert_called_once()
|
|
call_args = mock_sub.call_args[0][0]
|
|
self.assertIn('SNAT', call_args)
|
|
|
|
@patch('subprocess.run')
|
|
def test_apply_firewall_rule_with_port(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
rule = {
|
|
'rule_type': 'INPUT',
|
|
'source': '10.0.0.0/24',
|
|
'destination': '192.168.1.0/24',
|
|
'action': 'ACCEPT',
|
|
'protocol': 'TCP',
|
|
'port': '443',
|
|
'port_range': None,
|
|
}
|
|
self.manager._apply_firewall_rule(rule)
|
|
mock_sub.assert_called_once()
|
|
call_args = mock_sub.call_args[0][0]
|
|
self.assertIn('--dport', call_args)
|
|
self.assertIn('443', call_args)
|
|
|
|
@patch('subprocess.run')
|
|
def test_apply_firewall_rule_with_port_range(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
rule = {
|
|
'rule_type': 'INPUT',
|
|
'source': '10.0.0.0/24',
|
|
'destination': '192.168.1.0/24',
|
|
'action': 'ACCEPT',
|
|
'protocol': 'ALL',
|
|
'port': None,
|
|
'port_range': '1000-2000',
|
|
}
|
|
self.manager._apply_firewall_rule(rule)
|
|
call_args = mock_sub.call_args[0][0]
|
|
# Port range should be passed as 1000:2000 for iptables
|
|
self.assertIn('1000:2000', call_args)
|
|
|
|
def test_parse_proc_net_route_valid_data(self):
|
|
"""_parse_proc_net_route parses /proc/net/route format correctly."""
|
|
import tempfile
|
|
route_content = (
|
|
"Iface\tDestination\tGateway\tFlags\tRefCnt\tUse\tMetric\tMask\tMTU\tWindow\tIRTT\n"
|
|
"eth0\t00000000\t0101A8C0\t0003\t0\t0\t100\t00000000\t0\t0\t0\n"
|
|
"eth0\t000011AC\t00000000\t0001\t0\t0\t0\tF0FFFFFF\t0\t0\t0\n"
|
|
)
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
|
f.write(route_content)
|
|
fname = f.name
|
|
try:
|
|
routes = self.manager._parse_proc_net_route(fname)
|
|
self.assertIsInstance(routes, list)
|
|
# Should have parsed the two non-header lines
|
|
self.assertEqual(len(routes), 2)
|
|
# First entry should be a default route
|
|
self.assertIn('route', routes[0])
|
|
finally:
|
|
os.unlink(fname)
|
|
|
|
|
|
class TestRoutingManagerInternalMethods(unittest.TestCase):
|
|
"""Tests for internal methods that are harder to reach via public API."""
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.data_dir = os.path.join(self.test_dir, 'data')
|
|
self.config_dir = os.path.join(self.test_dir, 'config')
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
os.makedirs(self.config_dir, exist_ok=True)
|
|
self.manager = RoutingManager(self.data_dir, self.config_dir)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
# ── _apply_peer_route ─────────────────────────────────────────────────
|
|
|
|
@patch('subprocess.run')
|
|
def test_apply_peer_route_calls_ip_route(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
self.manager._apply_peer_route({
|
|
'peer_name': 'alice',
|
|
'peer_ip': '10.0.0.2',
|
|
'allowed_networks': ['192.168.100.0/24', '10.1.0.0/16']
|
|
})
|
|
self.assertEqual(mock_sub.call_count, 2)
|
|
|
|
@patch('subprocess.run', side_effect=subprocess.CalledProcessError(1, 'ip'))
|
|
def test_apply_peer_route_exception_is_silent(self, _mock):
|
|
# Should not raise
|
|
self.manager._apply_peer_route({
|
|
'peer_name': 'alice',
|
|
'peer_ip': '10.0.0.2',
|
|
'allowed_networks': ['192.168.100.0/24']
|
|
})
|
|
|
|
# ── _remove_peer_route ────────────────────────────────────────────────
|
|
|
|
@patch('subprocess.run')
|
|
def test_remove_peer_route_calls_ip_route_del(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
self.manager._remove_peer_route('alice')
|
|
mock_sub.assert_called_once()
|
|
call_args = mock_sub.call_args[0][0]
|
|
self.assertIn('del', call_args)
|
|
|
|
@patch('subprocess.run', side_effect=FileNotFoundError('ip not found'))
|
|
def test_remove_peer_route_exception_is_silent(self, _mock):
|
|
self.manager._remove_peer_route('alice') # Should not raise
|
|
|
|
# ── _apply_exit_node ──────────────────────────────────────────────────
|
|
|
|
@patch('subprocess.run')
|
|
def test_apply_exit_node_adds_default_route(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
self.manager._apply_exit_node({
|
|
'peer_name': 'exitnode',
|
|
'peer_ip': '10.0.0.2'
|
|
})
|
|
call_args = mock_sub.call_args[0][0]
|
|
self.assertIn('default', call_args)
|
|
|
|
@patch('subprocess.run', side_effect=Exception('permission denied'))
|
|
def test_apply_exit_node_exception_is_silent(self, _mock):
|
|
self.manager._apply_exit_node({'peer_name': 'exit', 'peer_ip': '10.0.0.2'})
|
|
|
|
# ── _apply_bridge_route ───────────────────────────────────────────────
|
|
|
|
@patch('subprocess.run')
|
|
def test_apply_bridge_route_adds_forward_rules(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
self.manager._apply_bridge_route({
|
|
'source_peer': 'src',
|
|
'target_peer': 'dst',
|
|
'allowed_networks': ['10.1.0.0/16', '10.2.0.0/16']
|
|
})
|
|
self.assertEqual(mock_sub.call_count, 2)
|
|
|
|
@patch('subprocess.run', side_effect=Exception('fail'))
|
|
def test_apply_bridge_route_exception_is_silent(self, _mock):
|
|
self.manager._apply_bridge_route({
|
|
'source_peer': 'src', 'target_peer': 'dst',
|
|
'allowed_networks': ['10.1.0.0/16']
|
|
})
|
|
|
|
# ── _apply_split_route ────────────────────────────────────────────────
|
|
|
|
@patch('subprocess.run')
|
|
def test_apply_split_route_adds_specific_route(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
self.manager._apply_split_route({
|
|
'network': '10.100.0.0/16',
|
|
'exit_peer': 'exit_wg',
|
|
'priority': 100
|
|
})
|
|
self.assertEqual(mock_sub.call_count, 1)
|
|
call_args = mock_sub.call_args[0][0]
|
|
self.assertIn('10.100.0.0/16', call_args)
|
|
|
|
@patch('subprocess.run', side_effect=Exception('fail'))
|
|
def test_apply_split_route_exception_is_silent(self, _mock):
|
|
self.manager._apply_split_route({
|
|
'network': '10.100.0.0/16', 'exit_peer': 'wg0', 'priority': 100
|
|
})
|
|
|
|
# ── _remove_nat_rule ──────────────────────────────────────────────────
|
|
|
|
@patch('subprocess.run')
|
|
def test_remove_nat_rule_calls_iptables_D(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0)
|
|
self.manager._remove_nat_rule('rule_abc')
|
|
call_args = mock_sub.call_args[0][0]
|
|
self.assertIn('-D', call_args)
|
|
self.assertIn('POSTROUTING', call_args)
|
|
|
|
@patch('subprocess.run')
|
|
def test_remove_nat_rule_not_found_is_silent(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=1)
|
|
self.manager._remove_nat_rule('nonexistent_rule') # Should not raise
|
|
|
|
@patch('subprocess.run', side_effect=FileNotFoundError('iptables'))
|
|
def test_remove_nat_rule_exception_is_silent(self, _mock):
|
|
self.manager._remove_nat_rule('rule_x')
|
|
|
|
# ── get_routing_logs ──────────────────────────────────────────────────
|
|
|
|
@patch('subprocess.run')
|
|
def test_get_routing_logs_returns_dict(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(returncode=0, stdout='some output\n')
|
|
result = self.manager.get_routing_logs()
|
|
self.assertIsInstance(result, dict)
|
|
self.assertIn('routes', result)
|
|
|
|
@patch('subprocess.run', side_effect=Exception('system error'))
|
|
def test_get_routing_logs_outer_exception_returns_error_dict(self, _mock):
|
|
# The outer try will succeed but inner dmesg calls will fail
|
|
result = self.manager.get_routing_logs()
|
|
self.assertIsInstance(result, dict)
|
|
|
|
# ── remove_firewall_rule exception path ───────────────────────────────
|
|
|
|
@patch('subprocess.run')
|
|
def test_remove_firewall_rule_exception_returns_false(self, mock_sub):
|
|
mock_sub.side_effect = Exception('unexpected error')
|
|
# The outer try in remove_firewall_rule should catch and return False
|
|
# This requires the rules file to have a matching rule that triggers subprocess
|
|
result = self.manager.remove_firewall_rule('nonexistent_rule_id')
|
|
# nonexistent rule -> returns False (not found path)
|
|
self.assertFalse(result)
|
|
|
|
# ── _get_routing_table ────────────────────────────────────────────────
|
|
|
|
@patch('subprocess.run')
|
|
def test_get_routing_table_returns_list(self, mock_sub):
|
|
mock_sub.return_value = MagicMock(
|
|
returncode=0,
|
|
stdout='default via 192.168.1.1 dev eth0\n10.0.0.0/24 dev wg0 proto kernel\n'
|
|
)
|
|
result = self.manager._get_routing_table()
|
|
self.assertIsInstance(result, list)
|
|
|
|
@patch('subprocess.run', side_effect=FileNotFoundError('docker'))
|
|
def test_get_routing_table_fallback_exception_returns_empty(self, _mock):
|
|
"""When both /proc/1/net/route and docker exec fail, return empty list."""
|
|
with patch('builtins.open', side_effect=FileNotFoundError('/proc/1/net/route')):
|
|
result = self.manager._get_routing_table()
|
|
self.assertEqual(result, [])
|
|
|
|
# ── start with sysctl exception ───────────────────────────────────────
|
|
|
|
@patch('subprocess.run', side_effect=FileNotFoundError('sysctl'))
|
|
def test_start_continues_when_sysctl_not_found(self, _mock):
|
|
result = self.manager.start()
|
|
self.assertTrue(result)
|
|
self.assertTrue(self.manager._service_running)
|
|
|
|
@patch('subprocess.run', side_effect=subprocess.CalledProcessError(1, 'sysctl'))
|
|
def test_start_continues_when_sysctl_fails(self, _mock):
|
|
result = self.manager.start()
|
|
self.assertTrue(result)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|