import sys import subprocess from pathlib import Path # Add api directory to path api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) import unittest import tempfile import shutil import os from unittest.mock import patch, MagicMock, call from routing_manager import RoutingManager import json class TestRoutingManager(unittest.TestCase): def setUp(self): self.test_dir = tempfile.mkdtemp() self.data_dir = os.path.join(self.test_dir, 'data') self.config_dir = os.path.join(self.test_dir, 'config') os.makedirs(self.data_dir, exist_ok=True) os.makedirs(self.config_dir, exist_ok=True) self.manager = RoutingManager(data_dir=self.data_dir, config_dir=self.config_dir) def tearDown(self): shutil.rmtree(self.test_dir) def test_initialization(self): # Test RoutingManager initialization and config creation self.assertTrue(os.path.exists(self.manager.routing_dir)) self.assertTrue(os.path.exists(self.manager.rules_file)) # Check that rules file contains default structure with open(self.manager.rules_file) as f: rules = json.load(f) self.assertIn('nat_rules', rules) self.assertIn('peer_routes', rules) self.assertIn('exit_nodes', rules) self.assertIn('bridge_routes', rules) self.assertIn('split_routes', rules) self.assertIn('firewall_rules', rules) self.assertIsInstance(rules['nat_rules'], list) self.assertIsInstance(rules['peer_routes'], dict) self.assertIsInstance(rules['exit_nodes'], list) self.assertIsInstance(rules['bridge_routes'], list) self.assertIsInstance(rules['split_routes'], list) self.assertIsInstance(rules['firewall_rules'], list) @patch.object(RoutingManager, '_apply_nat_rule', return_value=True) @patch.object(RoutingManager, '_remove_nat_rule', return_value=True) def test_add_and_remove_nat_rule(self, mock_remove_nat, mock_apply_nat): # Add a valid NAT rule result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0') self.assertTrue(result) # Check that the rule is persisted with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(len(rules['nat_rules']), 1) rule = rules['nat_rules'][0] self.assertEqual(rule['source_network'], '10.0.0.0/24') self.assertEqual(rule['target_interface'], 'eth0') self.assertEqual(rule['nat_type'], 'MASQUERADE') self.assertTrue(rule['enabled']) # Remove the NAT rule rule_id = rule['id'] result = self.manager.remove_nat_rule(rule_id) self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(len(rules['nat_rules']), 0) # Test invalid NAT rule (bad CIDR) result = self.manager.add_nat_rule('bad-cidr', 'eth0') self.assertFalse(result) # Test invalid NAT rule (bad interface) result = self.manager.add_nat_rule('10.0.0.0/24', '') self.assertFalse(result) # Test invalid NAT rule (bad nat_type) result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', nat_type='INVALID') self.assertFalse(result) # Test invalid NAT rule (bad protocol) result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', protocol='INVALID') self.assertFalse(result) @patch.object(RoutingManager, '_apply_peer_route', return_value=True) @patch.object(RoutingManager, '_remove_peer_route', return_value=True) def test_add_and_remove_peer_route(self, mock_remove_peer, mock_apply_peer): # Add a valid peer route allowed_networks = ['10.0.0.0/24'] result = self.manager.add_peer_route('peer1', '10.0.0.2', allowed_networks) self.assertTrue(result) # Check that the route is persisted with open(self.manager.rules_file) as f: rules = json.load(f) self.assertIn('peer1', rules['peer_routes']) route = rules['peer_routes']['peer1'] self.assertEqual(route['peer_name'], 'peer1') self.assertEqual(route['peer_ip'], '10.0.0.2') self.assertEqual(route['allowed_networks'], allowed_networks) self.assertEqual(route['route_type'], 'lan') self.assertTrue(route['enabled']) # Remove the peer route result = self.manager.remove_peer_route('peer1') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertNotIn('peer1', rules['peer_routes']) # Test invalid peer route (bad peer_name) result = self.manager.add_peer_route('', '10.0.0.2', allowed_networks) self.assertFalse(result) # Test invalid peer route (bad peer_ip) result = self.manager.add_peer_route('peer2', '', allowed_networks) self.assertFalse(result) # Test invalid peer route (bad allowed_networks) result = self.manager.add_peer_route('peer3', '10.0.0.3', ['bad-cidr']) self.assertFalse(result) # Test invalid peer route (bad route_type) result = self.manager.add_peer_route('peer4', '10.0.0.4', allowed_networks, route_type='invalid') self.assertFalse(result) @patch.object(RoutingManager, '_apply_exit_node') def test_add_exit_node_valid(self, mock_apply): result = self.manager.add_exit_node('peer1', '10.0.0.2') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(len(rules['exit_nodes']), 1) node = rules['exit_nodes'][0] self.assertEqual(node['peer_name'], 'peer1') self.assertEqual(node['peer_ip'], '10.0.0.2') self.assertTrue(node['enabled']) mock_apply.assert_called_once() @patch.object(RoutingManager, '_apply_exit_node') def test_add_exit_node_with_allowed_domains(self, mock_apply): result = self.manager.add_exit_node('peer1', '10.0.0.2', allowed_domains=['example.com']) self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(rules['exit_nodes'][0]['allowed_domains'], ['example.com']) def test_add_exit_node_invalid_peer_name(self): result = self.manager.add_exit_node('bad name!', '10.0.0.2') self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) def test_add_exit_node_invalid_ip(self): result = self.manager.add_exit_node('peer1', 'not-an-ip') self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) def test_add_exit_node_invalid_domains(self): result = self.manager.add_exit_node('peer1', '10.0.0.2', allowed_domains='not-a-list') self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) def test_add_exit_node_invalid_domain_format(self): result = self.manager.add_exit_node('peer1', '10.0.0.2', allowed_domains=['bad domain!']) self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) @patch.object(RoutingManager, '_apply_bridge_route') def test_add_bridge_route_valid(self, mock_apply): result = self.manager.add_bridge_route('src-peer', '192.168.1.0/24', ['10.0.0.0/24']) self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(len(rules['bridge_routes']), 1) route = rules['bridge_routes'][0] self.assertEqual(route['source_peer'], 'src-peer') self.assertEqual(route['target_peer'], '192.168.1.0/24') mock_apply.assert_called_once() def test_add_bridge_route_invalid_source(self): result = self.manager.add_bridge_route('bad name!', '192.168.1.0/24', ['10.0.0.0/24']) self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) def test_add_bridge_route_invalid_target(self): result = self.manager.add_bridge_route('src-peer', 'not-a-network', ['10.0.0.0/24']) self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) def test_add_bridge_route_empty_allowed_networks(self): result = self.manager.add_bridge_route('src-peer', '192.168.1.0/24', []) self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) def test_add_bridge_route_invalid_network_in_list(self): result = self.manager.add_bridge_route('src-peer', '192.168.1.0/24', ['not-a-cidr']) self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) @patch.object(RoutingManager, '_apply_split_route') def test_add_split_route_valid(self, mock_apply): result = self.manager.add_split_route('10.0.0.0/24', '10.0.0.1') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(len(rules['split_routes']), 1) route = rules['split_routes'][0] self.assertEqual(route['network'], '10.0.0.0/24') self.assertEqual(route['exit_peer'], '10.0.0.1') mock_apply.assert_called_once() @patch.object(RoutingManager, '_apply_split_route') def test_add_split_route_with_fallback(self, mock_apply): result = self.manager.add_split_route('10.0.0.0/24', '10.0.0.1', fallback_peer='10.0.0.2') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(rules['split_routes'][0]['fallback_peer'], '10.0.0.2') def test_add_split_route_invalid_network(self): result = self.manager.add_split_route('not-a-cidr', '10.0.0.1') self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) def test_add_split_route_invalid_exit_peer(self): result = self.manager.add_split_route('10.0.0.0/24', 'not-an-ip') self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) def test_add_split_route_invalid_fallback(self): result = self.manager.add_split_route('10.0.0.0/24', '10.0.0.1', fallback_peer='not-ip') self.assertIsInstance(result, dict) self.assertFalse(result.get('success', True)) @patch.object(RoutingManager, '_apply_firewall_rule') def test_add_firewall_rule_valid(self, mock_apply): result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(len(rules['firewall_rules']), 1) rule = rules['firewall_rules'][0] self.assertEqual(rule['rule_type'], 'FORWARD') self.assertEqual(rule['source'], '10.0.0.0/24') self.assertEqual(rule['destination'], '192.168.1.0/24') self.assertEqual(rule['action'], 'ACCEPT') mock_apply.assert_called_once() @patch.object(RoutingManager, '_apply_firewall_rule') def test_add_firewall_rule_with_port(self, mock_apply): result = self.manager.add_firewall_rule( 'INPUT', '10.0.0.0/24', '192.168.1.0/24', protocol='TCP', port='80') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) rule = rules['firewall_rules'][0] self.assertEqual(rule['protocol'], 'TCP') self.assertEqual(rule['port'], '80') @patch.object(RoutingManager, '_apply_firewall_rule') def test_add_firewall_rule_with_port_range(self, mock_apply): result = self.manager.add_firewall_rule( 'INPUT', '10.0.0.0/24', '192.168.1.0/24', port_range='1000-2000') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(rules['firewall_rules'][0]['port_range'], '1000-2000') def test_add_firewall_rule_invalid_type(self): result = self.manager.add_firewall_rule('BADCHAIN', '10.0.0.0/24', '192.168.1.0/24') self.assertFalse(result) def test_add_firewall_rule_invalid_source(self): result = self.manager.add_firewall_rule('FORWARD', 'not-cidr', '192.168.1.0/24') self.assertFalse(result) def test_add_firewall_rule_invalid_destination(self): result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', 'not-cidr') self.assertFalse(result) def test_add_firewall_rule_invalid_action(self): result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24', action='JUMP') self.assertFalse(result) def test_add_firewall_rule_invalid_protocol(self): result = self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24', protocol='SCTP') self.assertFalse(result) def test_add_firewall_rule_invalid_port_value(self): result = self.manager.add_firewall_rule( 'FORWARD', '10.0.0.0/24', '192.168.1.0/24', port='99999') self.assertFalse(result) def test_add_firewall_rule_port_not_number(self): result = self.manager.add_firewall_rule( 'FORWARD', '10.0.0.0/24', '192.168.1.0/24', port='abc') self.assertFalse(result) def test_add_firewall_rule_invalid_port_range_format(self): result = self.manager.add_firewall_rule( 'FORWARD', '10.0.0.0/24', '192.168.1.0/24', port_range='abc-def') self.assertFalse(result) @patch.object(RoutingManager, '_apply_firewall_rule') @patch('subprocess.run') def test_remove_firewall_rule(self, mock_sub, mock_apply): mock_proc = MagicMock() mock_proc.returncode = 0 mock_sub.return_value = mock_proc self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24') with open(self.manager.rules_file) as f: rules = json.load(f) rule_id = rules['firewall_rules'][0]['id'] result = self.manager.remove_firewall_rule(rule_id) self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(len(rules['firewall_rules']), 0) @patch.object(RoutingManager, '_apply_firewall_rule') def test_remove_firewall_rule_not_found(self, mock_apply): result = self.manager.remove_firewall_rule('nonexistent_id') self.assertFalse(result) @patch.object(RoutingManager, '_apply_firewall_rule') @patch('subprocess.run') def test_remove_firewall_rule_with_tcp_port(self, mock_sub, mock_apply): """remove_firewall_rule builds correct iptables -D cmd for TCP+port rule.""" mock_sub.return_value = MagicMock(returncode=0) self.manager.add_firewall_rule( 'INPUT', '10.0.0.0/24', '192.168.1.0/24', protocol='TCP', port='443') with open(self.manager.rules_file) as f: rules = json.load(f) rule_id = rules['firewall_rules'][0]['id'] result = self.manager.remove_firewall_rule(rule_id) self.assertTrue(result) # Check subprocess was called with iptables -D call_args = mock_sub.call_args[0][0] self.assertIn('iptables', call_args) self.assertIn('-D', call_args) def test_get_routing_status_empty(self): status = self.manager.get_routing_status() self.assertIn('nat_rules_count', status) self.assertEqual(status['nat_rules_count'], 0) self.assertIn('firewall_rules_count', status) self.assertIn('peer_routes_count', status) self.assertIn('exit_nodes_count', status) self.assertIn('bridge_routes_count', status) self.assertIn('split_routes_count', status) self.assertIn('routing_table', status) self.assertIn('active_rules', status) @patch.object(RoutingManager, '_apply_nat_rule') @patch.object(RoutingManager, '_apply_firewall_rule') @patch.object(RoutingManager, '_apply_peer_route') def test_get_routing_status_counts(self, mock_peer, mock_fw, mock_nat): self.manager.add_nat_rule('10.0.0.0/24', 'eth0') self.manager.add_firewall_rule('FORWARD', '10.0.0.0/24', '192.168.1.0/24') self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24']) status = self.manager.get_routing_status() self.assertEqual(status['nat_rules_count'], 1) self.assertEqual(status['firewall_rules_count'], 1) self.assertEqual(status['peer_routes_count'], 1) def test_get_routing_status_corrupted_file(self): # Corrupt the rules file to trigger exception with open(self.manager.rules_file, 'w') as f: f.write('{corrupt') status = self.manager.get_routing_status() # Should return safe defaults self.assertEqual(status['nat_rules_count'], 0) def test_get_nat_rules_empty(self): rules = self.manager.get_nat_rules() self.assertEqual(rules, []) @patch.object(RoutingManager, '_apply_nat_rule') def test_get_nat_rules_returns_list(self, mock_apply): self.manager.add_nat_rule('10.0.0.0/24', 'eth0') rules = self.manager.get_nat_rules() self.assertEqual(len(rules), 1) self.assertEqual(rules[0]['source_network'], '10.0.0.0/24') def test_get_peer_routes_empty(self): routes = self.manager.get_peer_routes() self.assertEqual(routes, []) @patch.object(RoutingManager, '_apply_peer_route') def test_get_peer_routes_returns_list(self, mock_apply): self.manager.add_peer_route('peer1', '10.0.0.2', ['10.0.0.0/24']) routes = self.manager.get_peer_routes() self.assertEqual(len(routes), 1) self.assertEqual(routes[0]['peer_name'], 'peer1') def test_get_firewall_rules_empty(self): rules = self.manager.get_firewall_rules() self.assertEqual(rules, []) @patch.object(RoutingManager, '_apply_peer_route') def test_update_peer_ip_updates_and_reapplies(self, mock_apply): self.manager.add_peer_route('peer1', '10.0.0.2', ['10.0.0.0/24']) result = self.manager.update_peer_ip('peer1', '10.0.0.99') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) self.assertEqual(rules['peer_routes']['peer1']['peer_ip'], '10.0.0.99') def test_update_peer_ip_nonexistent_peer(self): result = self.manager.update_peer_ip('nobody', '10.0.0.99') self.assertFalse(result) def test_remove_peer_route_nonexistent(self): result = self.manager.remove_peer_route('nobody') self.assertFalse(result) def test_get_status_returns_dict(self): status = self.manager.get_status() self.assertIsInstance(status, dict) self.assertIn('running', status) self.assertIn('status', status) self.assertIn('nat_rules_count', status) def test_start_and_stop_service(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0) result = self.manager.start() self.assertTrue(result) self.assertTrue(self.manager._service_running) result = self.manager.stop() self.assertTrue(result) self.assertFalse(self.manager._service_running) def test_start_persists_service_state(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0) self.manager.start() # State file should exist now self.assertTrue(os.path.exists(self.manager._state_file)) with open(self.manager._state_file) as f: state = json.load(f) self.assertTrue(state['running']) def test_stop_persists_service_state(self): self.manager.stop() with open(self.manager._state_file) as f: state = json.load(f) self.assertFalse(state['running']) def test_restart_service(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0) with patch('time.sleep'): result = self.manager.restart() self.assertTrue(result) def test_test_connectivity_returns_dict(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0, stdout='', stderr='') result = self.manager.test_connectivity() self.assertIsInstance(result, dict) self.assertIn('routing_functionality', result) self.assertIn('iptables_access', result) self.assertIn('network_interfaces', result) self.assertIn('routing_table_access', result) def test_test_routing_connectivity_returns_results(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0, stdout='OK', stderr='') results = self.manager.test_routing_connectivity('8.8.8.8') self.assertIn('ping', results) self.assertIn('traceroute', results) def test_test_routing_connectivity_with_via_peer(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0, stdout='OK', stderr='') results = self.manager.test_routing_connectivity('8.8.8.8', via_peer='10.0.0.1') self.assertIn('peer_route', results) def test_test_routing_connectivity_subprocess_exception(self): with patch('subprocess.run', side_effect=Exception('timeout')): results = self.manager.test_routing_connectivity('8.8.8.8') self.assertIn('ping', results) self.assertFalse(results['ping']['success']) def test_get_routing_logs(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0, stdout='log line', stderr='') logs = self.manager.get_routing_logs() self.assertIsInstance(logs, dict) self.assertIn('routes', logs) def test_parse_route_basic(self): parsed = self.manager._parse_route('10.0.0.0/24 via 172.20.0.1 dev eth0 metric 100') self.assertEqual(parsed['destination'], '10.0.0.0/24') self.assertEqual(parsed['via'], '172.20.0.1') self.assertEqual(parsed['dev'], 'eth0') self.assertEqual(parsed['metric'], '100') def test_parse_route_no_via(self): parsed = self.manager._parse_route('10.0.0.0/24 dev eth0') self.assertEqual(parsed['destination'], '10.0.0.0/24') self.assertEqual(parsed['dev'], 'eth0') self.assertEqual(parsed['via'], '') def test_parse_route_empty_string(self): parsed = self.manager._parse_route('') self.assertEqual(parsed['destination'], '') def test_validate_cidr_valid(self): self.assertTrue(self.manager._validate_cidr('10.0.0.0/24')) self.assertTrue(self.manager._validate_cidr('192.168.1.0/24')) self.assertTrue(self.manager._validate_cidr('172.16.0.0/12')) self.assertTrue(self.manager._validate_cidr('0.0.0.0/0')) def test_validate_cidr_invalid(self): self.assertFalse(self.manager._validate_cidr('not-a-cidr')) self.assertFalse(self.manager._validate_cidr('')) self.assertFalse(self.manager._validate_cidr('10.0.0.300/24')) def test_load_service_state_from_file(self): """State is restored from the file on the next instantiation.""" self.manager._service_running = True self.manager._save_service_state() new_manager = RoutingManager(data_dir=self.data_dir, config_dir=self.config_dir) self.assertTrue(new_manager._service_running) def test_load_service_state_no_file(self): """Without a state file, service defaults to running=True.""" if os.path.exists(self.manager._state_file): os.remove(self.manager._state_file) new_manager = RoutingManager(data_dir=self.data_dir, config_dir=self.config_dir) self.assertTrue(new_manager._service_running) def test_get_live_iptables(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0, stdout='Chain INPUT', stderr='') result = self.manager.get_live_iptables() self.assertIn('filter', result) self.assertIn('nat', result) def test_get_live_iptables_subprocess_exception(self): with patch('subprocess.run', side_effect=Exception('no docker')): result = self.manager.get_live_iptables() self.assertIn('filter', result) self.assertIn('nat', result) @patch.object(RoutingManager, '_apply_nat_rule') def test_nat_rule_snat_type(self, mock_apply): result = self.manager.add_nat_rule( '10.0.0.0/24', 'eth0', nat_type='SNAT', internal_ip='10.0.0.5', internal_port='8080') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) rule = rules['nat_rules'][0] self.assertEqual(rule['nat_type'], 'SNAT') @patch.object(RoutingManager, '_apply_nat_rule') def test_nat_rule_dnat_type(self, mock_apply): result = self.manager.add_nat_rule( '10.0.0.0/24', 'eth0', nat_type='DNAT', internal_ip='10.0.0.5', external_port='80', internal_port='8080') self.assertTrue(result) with open(self.manager.rules_file) as f: rules = json.load(f) rule = rules['nat_rules'][0] self.assertEqual(rule['nat_type'], 'DNAT') @patch.object(RoutingManager, '_apply_nat_rule') def test_nat_rule_tcp_protocol(self, mock_apply): result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', protocol='TCP') self.assertTrue(result) @patch.object(RoutingManager, '_apply_nat_rule') def test_nat_rule_udp_protocol(self, mock_apply): result = self.manager.add_nat_rule('10.0.0.0/24', 'eth0', protocol='UDP') self.assertTrue(result) @patch.object(RoutingManager, '_apply_peer_route') def test_peer_route_exit_type(self, mock_apply): result = self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'], route_type='exit') self.assertTrue(result) @patch.object(RoutingManager, '_apply_peer_route') def test_peer_route_bridge_type(self, mock_apply): result = self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'], route_type='bridge') self.assertTrue(result) @patch.object(RoutingManager, '_apply_peer_route') def test_peer_route_split_type(self, mock_apply): result = self.manager.add_peer_route('p1', '10.0.0.2', ['10.0.0.0/24'], route_type='split') self.assertTrue(result) def test_ensure_config_exists_is_idempotent(self): """Calling _ensure_config_exists twice does not raise.""" self.manager._ensure_config_exists() self.manager._ensure_config_exists() self.assertTrue(os.path.exists(self.manager.rules_file)) def test_save_rules_failure_is_silent(self): """_save_rules failure (e.g. permission error) doesn't raise.""" with patch('builtins.open', side_effect=OSError('disk full')): # Should not raise self.manager._save_rules({'nat_rules': [], 'peer_routes': {}}) def test_load_rules_failure_returns_empty_dict(self): """_load_rules failure returns {}.""" with open(self.manager.rules_file, 'w') as f: f.write('{corrupt') result = self.manager._load_rules() self.assertEqual(result, {}) def test_test_iptables_access_not_found(self): """FileNotFoundError from iptables returns success=True (dev mode).""" with patch('subprocess.run', side_effect=FileNotFoundError()): result = self.manager._test_iptables_access() self.assertTrue(result['success']) def test_test_network_interfaces_not_found(self): """FileNotFoundError from ip link show returns success=True (dev mode).""" with patch('subprocess.run', side_effect=FileNotFoundError()): result = self.manager._test_network_interfaces() self.assertTrue(result['success']) def test_test_routing_table_not_found(self): """FileNotFoundError from ip route show returns success=True (dev mode).""" with patch('subprocess.run', side_effect=FileNotFoundError()): result = self.manager._test_routing_table_access() self.assertTrue(result['success']) def test_is_routing_service_running_reflects_state(self): self.manager._service_running = True self.assertTrue(self.manager._is_routing_service_running()) self.manager._service_running = False self.assertFalse(self.manager._is_routing_service_running()) @patch('subprocess.run') def test_apply_nat_rule_masquerade(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) rule = { 'source_network': '10.0.0.0/24', 'target_interface': 'eth0', 'masquerade': True, 'nat_type': 'MASQUERADE', 'protocol': 'ALL', 'internal_ip': None, 'external_port': None, 'internal_port': None, } self.manager._apply_nat_rule(rule) mock_sub.assert_called_once() call_args = mock_sub.call_args[0][0] self.assertIn('iptables', call_args) self.assertIn('MASQUERADE', call_args) @patch('subprocess.run') def test_apply_nat_rule_dnat(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) rule = { 'source_network': '10.0.0.0/24', 'target_interface': 'eth0', 'masquerade': False, 'nat_type': 'DNAT', 'protocol': 'TCP', 'internal_ip': '192.168.1.10', 'external_port': '80', 'internal_port': '8080', } self.manager._apply_nat_rule(rule) mock_sub.assert_called_once() call_args = mock_sub.call_args[0][0] self.assertIn('DNAT', call_args) @patch('subprocess.run') def test_apply_nat_rule_snat(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) rule = { 'source_network': '10.0.0.0/24', 'target_interface': 'eth0', 'masquerade': False, 'nat_type': 'SNAT', 'protocol': 'TCP', 'internal_ip': '192.168.1.10', 'external_port': None, 'internal_port': '8080', } self.manager._apply_nat_rule(rule) mock_sub.assert_called_once() call_args = mock_sub.call_args[0][0] self.assertIn('SNAT', call_args) @patch('subprocess.run') def test_apply_firewall_rule_with_port(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) rule = { 'rule_type': 'INPUT', 'source': '10.0.0.0/24', 'destination': '192.168.1.0/24', 'action': 'ACCEPT', 'protocol': 'TCP', 'port': '443', 'port_range': None, } self.manager._apply_firewall_rule(rule) mock_sub.assert_called_once() call_args = mock_sub.call_args[0][0] self.assertIn('--dport', call_args) self.assertIn('443', call_args) @patch('subprocess.run') def test_apply_firewall_rule_with_port_range(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) rule = { 'rule_type': 'INPUT', 'source': '10.0.0.0/24', 'destination': '192.168.1.0/24', 'action': 'ACCEPT', 'protocol': 'ALL', 'port': None, 'port_range': '1000-2000', } self.manager._apply_firewall_rule(rule) call_args = mock_sub.call_args[0][0] # Port range should be passed as 1000:2000 for iptables self.assertIn('1000:2000', call_args) def test_parse_proc_net_route_valid_data(self): """_parse_proc_net_route parses /proc/net/route format correctly.""" import tempfile route_content = ( "Iface\tDestination\tGateway\tFlags\tRefCnt\tUse\tMetric\tMask\tMTU\tWindow\tIRTT\n" "eth0\t00000000\t0101A8C0\t0003\t0\t0\t100\t00000000\t0\t0\t0\n" "eth0\t000011AC\t00000000\t0001\t0\t0\t0\tF0FFFFFF\t0\t0\t0\n" ) with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: f.write(route_content) fname = f.name try: routes = self.manager._parse_proc_net_route(fname) self.assertIsInstance(routes, list) # Should have parsed the two non-header lines self.assertEqual(len(routes), 2) # First entry should be a default route self.assertIn('route', routes[0]) finally: os.unlink(fname) class TestRoutingManagerInternalMethods(unittest.TestCase): """Tests for internal methods that are harder to reach via public API.""" def setUp(self): self.test_dir = tempfile.mkdtemp() self.data_dir = os.path.join(self.test_dir, 'data') self.config_dir = os.path.join(self.test_dir, 'config') os.makedirs(self.data_dir, exist_ok=True) os.makedirs(self.config_dir, exist_ok=True) self.manager = RoutingManager(self.data_dir, self.config_dir) def tearDown(self): shutil.rmtree(self.test_dir) # ── _apply_peer_route ───────────────────────────────────────────────── @patch('subprocess.run') def test_apply_peer_route_calls_ip_route(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) self.manager._apply_peer_route({ 'peer_name': 'alice', 'peer_ip': '10.0.0.2', 'allowed_networks': ['192.168.100.0/24', '10.1.0.0/16'] }) self.assertEqual(mock_sub.call_count, 2) @patch('subprocess.run', side_effect=subprocess.CalledProcessError(1, 'ip')) def test_apply_peer_route_exception_is_silent(self, _mock): # Should not raise self.manager._apply_peer_route({ 'peer_name': 'alice', 'peer_ip': '10.0.0.2', 'allowed_networks': ['192.168.100.0/24'] }) # ── _remove_peer_route ──────────────────────────────────────────────── @patch('subprocess.run') def test_remove_peer_route_calls_ip_route_del(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) self.manager._remove_peer_route('alice') mock_sub.assert_called_once() call_args = mock_sub.call_args[0][0] self.assertIn('del', call_args) @patch('subprocess.run', side_effect=FileNotFoundError('ip not found')) def test_remove_peer_route_exception_is_silent(self, _mock): self.manager._remove_peer_route('alice') # Should not raise # ── _apply_exit_node ────────────────────────────────────────────────── @patch('subprocess.run') def test_apply_exit_node_adds_default_route(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) self.manager._apply_exit_node({ 'peer_name': 'exitnode', 'peer_ip': '10.0.0.2' }) call_args = mock_sub.call_args[0][0] self.assertIn('default', call_args) @patch('subprocess.run', side_effect=Exception('permission denied')) def test_apply_exit_node_exception_is_silent(self, _mock): self.manager._apply_exit_node({'peer_name': 'exit', 'peer_ip': '10.0.0.2'}) # ── _apply_bridge_route ─────────────────────────────────────────────── @patch('subprocess.run') def test_apply_bridge_route_adds_forward_rules(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) self.manager._apply_bridge_route({ 'source_peer': 'src', 'target_peer': 'dst', 'allowed_networks': ['10.1.0.0/16', '10.2.0.0/16'] }) self.assertEqual(mock_sub.call_count, 2) @patch('subprocess.run', side_effect=Exception('fail')) def test_apply_bridge_route_exception_is_silent(self, _mock): self.manager._apply_bridge_route({ 'source_peer': 'src', 'target_peer': 'dst', 'allowed_networks': ['10.1.0.0/16'] }) # ── _apply_split_route ──────────────────────────────────────────────── @patch('subprocess.run') def test_apply_split_route_adds_specific_route(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) self.manager._apply_split_route({ 'network': '10.100.0.0/16', 'exit_peer': 'exit_wg', 'priority': 100 }) self.assertEqual(mock_sub.call_count, 1) call_args = mock_sub.call_args[0][0] self.assertIn('10.100.0.0/16', call_args) @patch('subprocess.run', side_effect=Exception('fail')) def test_apply_split_route_exception_is_silent(self, _mock): self.manager._apply_split_route({ 'network': '10.100.0.0/16', 'exit_peer': 'wg0', 'priority': 100 }) # ── _remove_nat_rule ────────────────────────────────────────────────── @patch('subprocess.run') def test_remove_nat_rule_calls_iptables_D(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0) self.manager._remove_nat_rule('rule_abc') call_args = mock_sub.call_args[0][0] self.assertIn('-D', call_args) self.assertIn('POSTROUTING', call_args) @patch('subprocess.run') def test_remove_nat_rule_not_found_is_silent(self, mock_sub): mock_sub.return_value = MagicMock(returncode=1) self.manager._remove_nat_rule('nonexistent_rule') # Should not raise @patch('subprocess.run', side_effect=FileNotFoundError('iptables')) def test_remove_nat_rule_exception_is_silent(self, _mock): self.manager._remove_nat_rule('rule_x') # ── get_routing_logs ────────────────────────────────────────────────── @patch('subprocess.run') def test_get_routing_logs_returns_dict(self, mock_sub): mock_sub.return_value = MagicMock(returncode=0, stdout='some output\n') result = self.manager.get_routing_logs() self.assertIsInstance(result, dict) self.assertIn('routes', result) @patch('subprocess.run', side_effect=Exception('system error')) def test_get_routing_logs_outer_exception_returns_error_dict(self, _mock): # The outer try will succeed but inner dmesg calls will fail result = self.manager.get_routing_logs() self.assertIsInstance(result, dict) # ── remove_firewall_rule exception path ─────────────────────────────── @patch('subprocess.run') def test_remove_firewall_rule_exception_returns_false(self, mock_sub): mock_sub.side_effect = Exception('unexpected error') # The outer try in remove_firewall_rule should catch and return False # This requires the rules file to have a matching rule that triggers subprocess result = self.manager.remove_firewall_rule('nonexistent_rule_id') # nonexistent rule -> returns False (not found path) self.assertFalse(result) # ── _get_routing_table ──────────────────────────────────────────────── @patch('subprocess.run') def test_get_routing_table_returns_list(self, mock_sub): mock_sub.return_value = MagicMock( returncode=0, stdout='default via 192.168.1.1 dev eth0\n10.0.0.0/24 dev wg0 proto kernel\n' ) result = self.manager._get_routing_table() self.assertIsInstance(result, list) @patch('subprocess.run', side_effect=FileNotFoundError('docker')) def test_get_routing_table_fallback_exception_returns_empty(self, _mock): """When both /proc/1/net/route and docker exec fail, return empty list.""" with patch('builtins.open', side_effect=FileNotFoundError('/proc/1/net/route')): result = self.manager._get_routing_table() self.assertEqual(result, []) # ── start with sysctl exception ─────────────────────────────────────── @patch('subprocess.run', side_effect=FileNotFoundError('sysctl')) def test_start_continues_when_sysctl_not_found(self, _mock): result = self.manager.start() self.assertTrue(result) self.assertTrue(self.manager._service_running) @patch('subprocess.run', side_effect=subprocess.CalledProcessError(1, 'sysctl')) def test_start_continues_when_sysctl_fails(self, _mock): result = self.manager.start() self.assertTrue(result) if __name__ == '__main__': unittest.main()