Files
pic/tests/test_connectivity_manager.py
T
roof e38bd4e81f Phase 5: extended connectivity — WireGuard ext, OpenVPN, Tor exit routing
- ConnectivityManager: per-peer exit routing via iptables fwmark/policy tables
  (wg_ext=0x10/t110, openvpn=0x20/t120, tor=0x30/t130)
- Dedicated PIC_CONNECTIVITY chains (mangle+nat), kill-switch FORWARD DROP
- Config upload with sanitization: strips PostUp/PostDown and OVpn script dirs
- Peer exit_via field added to peer registry (backward-compat, default=default)
- 7 Flask routes at /api/connectivity/*
- Connectivity.jsx: 693-line frontend with exit cards, peer assignment table
- 72 new tests for ConnectivityManager (72 passing)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 10:48:20 -04:00

691 lines
26 KiB
Python

"""
Tests for ConnectivityManager — config validation, file upload, status,
exit listing, peer exit assignment, and route application.
All subprocess calls (docker exec iptables/ip) and filesystem paths are
isolated so these tests run without any live infrastructure.
"""
import os
import sys
import stat
import tempfile
import shutil
import unittest
from unittest.mock import MagicMock, patch, call
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
import connectivity_manager as cm_module
from connectivity_manager import ConnectivityManager
# ---------------------------------------------------------------------------
# Factory helper
# ---------------------------------------------------------------------------
_SENTINEL = object()
def _make_manager(tmp_dir=None, peer_registry=_SENTINEL, config_manager=None):
"""Build a ConnectivityManager with mocked dependencies.
Pass peer_registry=None explicitly to test the no-registry path.
Omit peer_registry (or pass _SENTINEL) to get a default MagicMock.
"""
if tmp_dir is None:
tmp_dir = tempfile.mkdtemp()
if config_manager is None:
config_manager = MagicMock()
config_manager.get_identity.return_value = {
'cell_name': 'test',
'ip_range': '172.20.0.0/16',
}
if peer_registry is _SENTINEL:
peer_registry = MagicMock()
peer_registry.list_peers.return_value = []
with patch.object(ConnectivityManager, '_subscribe_to_events', lambda self: None):
mgr = ConnectivityManager(
config_manager=config_manager,
peer_registry=peer_registry,
data_dir=tmp_dir,
config_dir=tmp_dir,
)
return mgr
def _mock_subprocess_ok():
"""Return a MagicMock mimicking a successful subprocess.run result."""
return MagicMock(returncode=0, stdout='', stderr='')
# ---------------------------------------------------------------------------
# _validate_wg_conf
# ---------------------------------------------------------------------------
class TestValidateWgConf(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.mgr = _make_manager(tmp_dir=self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_valid_config_passes_and_returns_cleaned_text(self):
conf = "[Interface]\nPrivateKey = abc123\nAddress = 10.99.0.1/24\n\n[Peer]\nPublicKey = xyz\n"
result = self.mgr._validate_wg_conf(conf)
self.assertIn('[Interface]', result)
self.assertIn('PrivateKey', result)
self.assertIn('[Peer]', result)
def test_postupdate_is_stripped_silently(self):
conf = "[Interface]\nPrivateKey = abc\nPostUp = iptables -A FORWARD -j ACCEPT\n"
result = self.mgr._validate_wg_conf(conf)
self.assertNotIn('PostUp', result)
self.assertIn('PrivateKey', result)
def test_postdown_is_stripped_silently(self):
conf = "[Interface]\nPrivateKey = abc\nPostDown = iptables -D FORWARD -j ACCEPT\n"
result = self.mgr._validate_wg_conf(conf)
self.assertNotIn('PostDown', result)
def test_preup_is_stripped_silently(self):
conf = "[Interface]\nPrivateKey = abc\nPreUp = /sbin/modprobe wireguard\n"
result = self.mgr._validate_wg_conf(conf)
self.assertNotIn('PreUp', result)
def test_predown_is_stripped_silently(self):
conf = "[Interface]\nPrivateKey = abc\nPreDown = /sbin/rmmod wireguard\n"
result = self.mgr._validate_wg_conf(conf)
self.assertNotIn('PreDown', result)
def test_interface_wg0_raises_value_error(self):
conf = "[Interface]\nName = wg0\nPrivateKey = abc\n"
with self.assertRaises(ValueError) as ctx:
self.mgr._validate_wg_conf(conf)
self.assertIn('wg0', str(ctx.exception))
def test_interface_wg0_via_interface_key_raises_value_error(self):
# 'Interface = wg0' (not just 'Name = wg0') should also be caught
conf = "[Interface]\nInterface = wg0\nPrivateKey = abc\n"
with self.assertRaises(ValueError):
self.mgr._validate_wg_conf(conf)
def test_interface_wg_ext0_passes(self):
conf = "[Interface]\nName = wg_ext0\nPrivateKey = abc\nAddress = 10.99.0.1/24\n"
result = self.mgr._validate_wg_conf(conf)
self.assertIn('wg_ext0', result)
def test_non_string_input_raises_value_error(self):
with self.assertRaises(ValueError):
self.mgr._validate_wg_conf(None)
def test_result_ends_with_newline(self):
conf = "[Interface]\nPrivateKey = abc\n"
result = self.mgr._validate_wg_conf(conf)
self.assertTrue(result.endswith('\n'))
def test_multiple_hooks_all_stripped(self):
conf = (
"[Interface]\n"
"PrivateKey = abc\n"
"PostUp = cmd1\n"
"PostDown = cmd2\n"
"PreUp = cmd3\n"
"PreDown = cmd4\n"
)
result = self.mgr._validate_wg_conf(conf)
for hook in ('PostUp', 'PostDown', 'PreUp', 'PreDown'):
self.assertNotIn(hook, result)
self.assertIn('PrivateKey', result)
# ---------------------------------------------------------------------------
# _validate_ovpn
# ---------------------------------------------------------------------------
class TestValidateOvpn(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.mgr = _make_manager(tmp_dir=self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _base_conf(self, extra=''):
return f"client\ndev tun\nproto udp\nremote vpn.example.com 1194\n{extra}"
def test_valid_ovpn_passes(self):
conf = self._base_conf()
result = self.mgr._validate_ovpn(conf)
self.assertIn('proto udp', result)
self.assertIn('remote vpn.example.com 1194', result)
def test_up_script_is_stripped(self):
conf = self._base_conf('up /sbin/connect.sh\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('up /sbin/connect.sh', result)
def test_down_script_is_stripped(self):
conf = self._base_conf('down /sbin/disconnect.sh\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('down /sbin/disconnect.sh', result)
def test_script_security_is_stripped(self):
conf = self._base_conf('script-security 2\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('script-security', result)
def test_plugin_is_stripped(self):
conf = self._base_conf('plugin /path/to/plugin.so\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('plugin', result)
def test_route_up_is_stripped(self):
conf = self._base_conf('route-up /sbin/route_cmd\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('route-up', result)
def test_route_pre_down_is_stripped(self):
conf = self._base_conf('route-pre-down /sbin/cleanup\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('route-pre-down', result)
def test_proto_udp_is_preserved(self):
conf = self._base_conf()
result = self.mgr._validate_ovpn(conf)
self.assertIn('proto udp', result)
def test_remote_directive_is_preserved(self):
conf = self._base_conf()
result = self.mgr._validate_ovpn(conf)
self.assertIn('remote vpn.example.com 1194', result)
def test_comments_are_preserved(self):
conf = self._base_conf('# this is a comment\n')
result = self.mgr._validate_ovpn(conf)
self.assertIn('# this is a comment', result)
def test_non_string_input_raises_value_error(self):
with self.assertRaises(ValueError):
self.mgr._validate_ovpn(42)
def test_result_ends_with_newline(self):
conf = self._base_conf()
result = self.mgr._validate_ovpn(conf)
self.assertTrue(result.endswith('\n'))
def test_all_forbidden_directives_stripped_together(self):
conf = self._base_conf(
'up /s\ndown /s\nscript-security 2\nplugin /p\nroute-up /r\nroute-pre-down /r\n'
)
result = self.mgr._validate_ovpn(conf)
for directive in ('up ', 'down ', 'script-security', 'plugin', 'route-up', 'route-pre-down'):
self.assertNotIn(directive, result)
# Safe directives survive
self.assertIn('proto udp', result)
# ---------------------------------------------------------------------------
# upload_wireguard_ext
# ---------------------------------------------------------------------------
class TestUploadWireguardExt(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.mgr = _make_manager(tmp_dir=self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _valid_conf(self):
return "[Interface]\nPrivateKey = abc\nAddress = 10.99.0.1/24\n\n[Peer]\nPublicKey = xyz\n"
def test_valid_conf_returns_ok_true(self):
result = self.mgr.upload_wireguard_ext(self._valid_conf())
self.assertTrue(result['ok'])
def test_valid_conf_writes_file_to_correct_path(self):
self.mgr.upload_wireguard_ext(self._valid_conf())
expected = os.path.join(self.tmp, 'connectivity', 'wireguard_ext', 'wg_ext0.conf')
self.assertTrue(os.path.isfile(expected), f'Expected file at {expected}')
def test_valid_conf_file_has_mode_0600(self):
self.mgr.upload_wireguard_ext(self._valid_conf())
path = os.path.join(self.tmp, 'connectivity', 'wireguard_ext', 'wg_ext0.conf')
mode = stat.S_IMODE(os.stat(path).st_mode)
self.assertEqual(mode, 0o600, f'Expected 0600, got {oct(mode)}')
def test_wg0_interface_returns_ok_false_with_error(self):
bad_conf = "[Interface]\nName = wg0\nPrivateKey = abc\n"
result = self.mgr.upload_wireguard_ext(bad_conf)
self.assertFalse(result['ok'])
self.assertIn('error', result)
self.assertIn('wg0', result['error'])
def test_file_content_has_hooks_stripped(self):
conf = "[Interface]\nPrivateKey = abc\nPostUp = evil\n"
self.mgr.upload_wireguard_ext(conf)
path = os.path.join(self.tmp, 'connectivity', 'wireguard_ext', 'wg_ext0.conf')
with open(path) as f:
content = f.read()
self.assertNotIn('PostUp', content)
self.assertIn('PrivateKey', content)
# ---------------------------------------------------------------------------
# upload_openvpn
# ---------------------------------------------------------------------------
class TestUploadOpenvpn(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.mgr = _make_manager(tmp_dir=self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _valid_ovpn(self):
return "client\ndev tun\nproto udp\nremote vpn.example.com 1194\n"
def test_valid_name_and_conf_returns_ok_true(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
self.assertTrue(result['ok'])
def test_valid_conf_writes_file_at_correct_path(self):
self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
expected = os.path.join(self.tmp, 'connectivity', 'openvpn', 'my-vpn.ovpn')
self.assertTrue(os.path.isfile(expected), f'Expected file at {expected}')
def test_valid_conf_file_has_mode_0600(self):
self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
path = os.path.join(self.tmp, 'connectivity', 'openvpn', 'my-vpn.ovpn')
mode = stat.S_IMODE(os.stat(path).st_mode)
self.assertEqual(mode, 0o600, f'Expected 0600, got {oct(mode)}')
def test_name_with_spaces_returns_ok_false(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my vpn')
self.assertFalse(result['ok'])
self.assertIn('error', result)
def test_name_with_slash_returns_ok_false(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='../evil')
self.assertFalse(result['ok'])
def test_name_with_uppercase_returns_ok_false(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='MyVPN')
self.assertFalse(result['ok'])
def test_name_too_long_returns_ok_false(self):
long_name = 'a' * 33
result = self.mgr.upload_openvpn(self._valid_ovpn(), name=long_name)
self.assertFalse(result['ok'])
def test_valid_hyphenated_name_passes(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
self.assertTrue(result['ok'])
def test_valid_underscore_name_passes(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my_vpn')
self.assertTrue(result['ok'])
def test_default_name_default_passes(self):
result = self.mgr.upload_openvpn(self._valid_ovpn())
self.assertTrue(result['ok'])
expected = os.path.join(self.tmp, 'connectivity', 'openvpn', 'default.ovpn')
self.assertTrue(os.path.isfile(expected))
def test_hooks_stripped_from_stored_file(self):
conf = "client\ndev tun\nup /sbin/bad.sh\nproto udp\n"
self.mgr.upload_openvpn(conf, name='clean')
path = os.path.join(self.tmp, 'connectivity', 'openvpn', 'clean.ovpn')
with open(path) as f:
content = f.read()
self.assertNotIn('up /sbin/bad.sh', content)
self.assertIn('proto udp', content)
# ---------------------------------------------------------------------------
# get_status
# ---------------------------------------------------------------------------
class TestGetStatus(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _mgr_with_subprocess_ok(self):
mgr = _make_manager(tmp_dir=self.tmp)
return mgr
def test_returns_dict(self):
mgr = self._mgr_with_subprocess_ok()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
status = mgr.get_status()
self.assertIsInstance(status, dict)
def test_service_key_equals_connectivity(self):
mgr = self._mgr_with_subprocess_ok()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
status = mgr.get_status()
self.assertEqual(status['service'], 'connectivity')
def test_running_key_present_and_true(self):
mgr = self._mgr_with_subprocess_ok()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
status = mgr.get_status()
self.assertIn('running', status)
self.assertTrue(status['running'])
def test_exits_key_present(self):
mgr = self._mgr_with_subprocess_ok()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
status = mgr.get_status()
self.assertIn('exits', status)
# ---------------------------------------------------------------------------
# list_exits
# ---------------------------------------------------------------------------
class TestListExits(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_returns_list(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
self.assertIsInstance(exits, list)
def test_each_item_has_type_field(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
for item in exits:
self.assertIn('type', item, f'Missing "type" in {item}')
def test_each_item_has_status_fields(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
for item in exits:
# _exit_status returns configured + iface_up (or subset)
self.assertIn('configured', item, f'Missing "configured" in {item}')
def test_default_not_in_exit_list(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
types = [e['type'] for e in exits]
self.assertNotIn('default', types)
def test_list_contains_wireguard_ext_openvpn_tor(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
types = {e['type'] for e in exits}
self.assertIn('wireguard_ext', types)
self.assertIn('openvpn', types)
self.assertIn('tor', types)
# ---------------------------------------------------------------------------
# set_peer_exit
# ---------------------------------------------------------------------------
class TestSetPeerExit(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _mgr(self, peer_registry=None):
if peer_registry is None:
peer_registry = MagicMock()
peer_registry.set_peer_exit_via.return_value = True
peer_registry.list_peers.return_value = []
return _make_manager(tmp_dir=self.tmp, peer_registry=peer_registry)
def test_valid_exit_type_returns_ok_true(self):
mgr = self._mgr()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.set_peer_exit('alice', 'wireguard_ext')
self.assertTrue(result['ok'])
def test_valid_exit_type_default_returns_ok_true(self):
mgr = self._mgr()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.set_peer_exit('alice', 'default')
self.assertTrue(result['ok'])
def test_invalid_exit_type_returns_ok_false(self):
mgr = self._mgr()
result = mgr.set_peer_exit('alice', 'shadowsocks')
self.assertFalse(result['ok'])
self.assertIn('error', result)
def test_invalid_exit_type_error_mentions_type(self):
mgr = self._mgr()
result = mgr.set_peer_exit('alice', 'bad_type')
self.assertIn('bad_type', result['error'])
def test_calls_peer_registry_set_peer_exit_via_with_correct_args(self):
pr = MagicMock()
pr.set_peer_exit_via.return_value = True
pr.list_peers.return_value = []
mgr = self._mgr(peer_registry=pr)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.set_peer_exit('bob', 'openvpn')
pr.set_peer_exit_via.assert_called_once_with('bob', 'openvpn')
def test_peer_not_found_in_registry_returns_ok_false(self):
pr = MagicMock()
pr.set_peer_exit_via.return_value = False # peer not found
pr.list_peers.return_value = []
mgr = self._mgr(peer_registry=pr)
result = mgr.set_peer_exit('unknown-peer', 'tor')
self.assertFalse(result['ok'])
self.assertIn('error', result)
def test_invalid_peer_name_returns_ok_false(self):
mgr = self._mgr()
result = mgr.set_peer_exit('peer with spaces!', 'default')
self.assertFalse(result['ok'])
def test_no_peer_registry_returns_ok_false(self):
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=None)
result = mgr.set_peer_exit('alice', 'wireguard_ext')
self.assertFalse(result['ok'])
self.assertIn('error', result)
# ---------------------------------------------------------------------------
# get_peer_exits
# ---------------------------------------------------------------------------
class TestGetPeerExits(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_returns_dict(self):
mgr = _make_manager(tmp_dir=self.tmp)
result = mgr.get_peer_exits()
self.assertIsInstance(result, dict)
def test_maps_peer_names_to_exit_types(self):
pr = MagicMock()
pr.list_peers.return_value = [
{'peer': 'alice', 'exit_via': 'wireguard_ext'},
{'peer': 'bob', 'exit_via': 'default'},
]
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
result = mgr.get_peer_exits()
self.assertEqual(result['alice'], 'wireguard_ext')
self.assertEqual(result['bob'], 'default')
def test_peer_without_exit_via_defaults_to_default(self):
pr = MagicMock()
pr.list_peers.return_value = [{'peer': 'charlie'}]
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
result = mgr.get_peer_exits()
self.assertEqual(result['charlie'], 'default')
def test_calls_peer_registry_list_peers(self):
pr = MagicMock()
pr.list_peers.return_value = []
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
mgr.get_peer_exits()
pr.list_peers.assert_called()
def test_no_peer_registry_returns_empty_dict(self):
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=None)
result = mgr.get_peer_exits()
self.assertEqual(result, {})
def test_empty_peer_list_returns_empty_dict(self):
pr = MagicMock()
pr.list_peers.return_value = []
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
result = mgr.get_peer_exits()
self.assertEqual(result, {})
# ---------------------------------------------------------------------------
# apply_routes
# ---------------------------------------------------------------------------
class TestApplyRoutes(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_returns_dict_with_ok_key(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.apply_routes()
self.assertIsInstance(result, dict)
self.assertIn('ok', result)
def test_returns_ok_true_on_success(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.apply_routes()
self.assertTrue(result['ok'])
def test_calls_ensure_chains(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(mgr, '_ensure_chains') as mock_ensure, \
patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.apply_routes()
mock_ensure.assert_called_once()
def test_calls_subprocess_for_iptables(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.apply_routes()
self.assertTrue(mock_sp.run.called)
# At least one call should involve 'iptables'
calls_str = str(mock_sp.run.call_args_list)
self.assertIn('iptables', calls_str)
def test_subprocess_failure_is_non_fatal_returns_ok_true(self):
"""apply_routes must not raise even when every subprocess call fails."""
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='error')
result = mgr.apply_routes()
# Must not raise; should still return a dict (ok may be True because
# routing errors are logged as warnings, not propagated)
self.assertIsInstance(result, dict)
self.assertIn('ok', result)
def test_ensure_chains_exception_is_non_fatal(self):
"""If _ensure_chains raises, apply_routes still returns a dict."""
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(mgr, '_ensure_chains', side_effect=RuntimeError('chain error')), \
patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.apply_routes()
self.assertIsInstance(result, dict)
def test_peer_with_wireguard_ext_exit_generates_mark_rule(self):
"""Peers with a non-default exit should trigger _add_mark_rule calls."""
pr = MagicMock()
pr.list_peers.return_value = [
{'peer': 'alice', 'exit_via': 'wireguard_ext'},
]
pr.get_peer.return_value = {'peer': 'alice', 'ip': '172.20.0.50/32'}
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
with patch.object(mgr, '_add_mark_rule') as mock_mark, \
patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.apply_routes()
mock_mark.assert_called()
call_args = mock_mark.call_args[0]
self.assertEqual(call_args[0], '172.20.0.50') # IP without CIDR
def test_peer_with_default_exit_skips_mark_rule(self):
"""Peers on default exit must not generate mark rules."""
pr = MagicMock()
pr.list_peers.return_value = [
{'peer': 'bob', 'exit_via': 'default'},
]
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
with patch.object(mgr, '_add_mark_rule') as mock_mark, \
patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.apply_routes()
mock_mark.assert_not_called()
def test_rules_applied_count_in_result(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.apply_routes()
self.assertIn('rules_applied', result)
self.assertIsInstance(result['rules_applied'], int)
if __name__ == '__main__':
unittest.main()