Files
pic/tests/test_connectivity_manager.py
T
roof aa1e5c41ec
Unit Tests / test (push) Successful in 12m6s
test: raise coverage 68.7% -> ~80.4%; add ~250 tests for new egress/DDNS/network paths
Coverage was below acceptable levels and several newly-added code paths
(sshuttle egress, proxy egress, DDNS provider stubs, DNS overview route,
peer-registry provisioning) had zero test coverage.

~250 new unit tests are added across 16 new test files. Existing test files
are updated to match refactored interfaces (DHCP removed, constants
introduced, network_manager restructured). .coveragerc is added to pin the
source mapping and the 70% floor so regressions are caught at commit time.

tests/test_enhanced_api.py was previously living in api/ (wrong location)
and is moved to tests/ where it belongs.

Integration test files are updated to remove references to DHCP endpoints
and add coverage for the new DNS overview and DDNS sync endpoints.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 09:03:39 -04:00

868 lines
34 KiB
Python

"""
Tests for ConnectivityManager — config validation, file upload, status,
exit listing, peer exit assignment, and route application.
All subprocess calls (docker exec iptables/ip) and filesystem paths are
isolated so these tests run without any live infrastructure.
"""
import os
import sys
import stat
import tempfile
import shutil
import unittest
from unittest.mock import MagicMock, patch, call
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
import connectivity_manager as cm_module
from connectivity_manager import ConnectivityManager
# ---------------------------------------------------------------------------
# Factory helper
# ---------------------------------------------------------------------------
_SENTINEL = object()
def _make_manager(tmp_dir=None, peer_registry=_SENTINEL, config_manager=None):
"""Build a ConnectivityManager with mocked dependencies.
Pass peer_registry=None explicitly to test the no-registry path.
Omit peer_registry (or pass _SENTINEL) to get a default MagicMock.
"""
if tmp_dir is None:
tmp_dir = tempfile.mkdtemp()
if config_manager is None:
config_manager = MagicMock()
config_manager.get_identity.return_value = {
'cell_name': 'test',
'ip_range': '172.20.0.0/16',
}
if peer_registry is _SENTINEL:
peer_registry = MagicMock()
peer_registry.list_peers.return_value = []
with patch.object(ConnectivityManager, '_subscribe_to_events', lambda self: None):
mgr = ConnectivityManager(
config_manager=config_manager,
peer_registry=peer_registry,
data_dir=tmp_dir,
config_dir=tmp_dir,
)
return mgr
def _mock_subprocess_ok():
"""Return a MagicMock mimicking a successful subprocess.run result."""
return MagicMock(returncode=0, stdout='', stderr='')
# ---------------------------------------------------------------------------
# _validate_wg_conf
# ---------------------------------------------------------------------------
class TestValidateWgConf(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.mgr = _make_manager(tmp_dir=self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_valid_config_passes_and_returns_cleaned_text(self):
conf = "[Interface]\nPrivateKey = abc123\nAddress = 10.99.0.1/24\n\n[Peer]\nPublicKey = xyz\n"
result = self.mgr._validate_wg_conf(conf)
self.assertIn('[Interface]', result)
self.assertIn('PrivateKey', result)
self.assertIn('[Peer]', result)
def test_postupdate_is_stripped_silently(self):
conf = "[Interface]\nPrivateKey = abc\nPostUp = iptables -A FORWARD -j ACCEPT\n"
result = self.mgr._validate_wg_conf(conf)
self.assertNotIn('PostUp', result)
self.assertIn('PrivateKey', result)
def test_postdown_is_stripped_silently(self):
conf = "[Interface]\nPrivateKey = abc\nPostDown = iptables -D FORWARD -j ACCEPT\n"
result = self.mgr._validate_wg_conf(conf)
self.assertNotIn('PostDown', result)
def test_preup_is_stripped_silently(self):
conf = "[Interface]\nPrivateKey = abc\nPreUp = /sbin/modprobe wireguard\n"
result = self.mgr._validate_wg_conf(conf)
self.assertNotIn('PreUp', result)
def test_predown_is_stripped_silently(self):
conf = "[Interface]\nPrivateKey = abc\nPreDown = /sbin/rmmod wireguard\n"
result = self.mgr._validate_wg_conf(conf)
self.assertNotIn('PreDown', result)
def test_interface_wg0_raises_value_error(self):
conf = "[Interface]\nName = wg0\nPrivateKey = abc\n"
with self.assertRaises(ValueError) as ctx:
self.mgr._validate_wg_conf(conf)
self.assertIn('wg0', str(ctx.exception))
def test_interface_wg0_via_interface_key_raises_value_error(self):
# 'Interface = wg0' (not just 'Name = wg0') should also be caught
conf = "[Interface]\nInterface = wg0\nPrivateKey = abc\n"
with self.assertRaises(ValueError):
self.mgr._validate_wg_conf(conf)
def test_interface_wg_ext0_passes(self):
conf = "[Interface]\nName = wg_ext0\nPrivateKey = abc\nAddress = 10.99.0.1/24\n"
result = self.mgr._validate_wg_conf(conf)
self.assertIn('wg_ext0', result)
def test_non_string_input_raises_value_error(self):
with self.assertRaises(ValueError):
self.mgr._validate_wg_conf(None)
def test_result_ends_with_newline(self):
conf = "[Interface]\nPrivateKey = abc\n"
result = self.mgr._validate_wg_conf(conf)
self.assertTrue(result.endswith('\n'))
def test_multiple_hooks_all_stripped(self):
conf = (
"[Interface]\n"
"PrivateKey = abc\n"
"PostUp = cmd1\n"
"PostDown = cmd2\n"
"PreUp = cmd3\n"
"PreDown = cmd4\n"
)
result = self.mgr._validate_wg_conf(conf)
for hook in ('PostUp', 'PostDown', 'PreUp', 'PreDown'):
self.assertNotIn(hook, result)
self.assertIn('PrivateKey', result)
# ---------------------------------------------------------------------------
# _validate_ovpn
# ---------------------------------------------------------------------------
class TestValidateOvpn(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.mgr = _make_manager(tmp_dir=self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _base_conf(self, extra=''):
return f"client\ndev tun\nproto udp\nremote vpn.example.com 1194\n{extra}"
def test_valid_ovpn_passes(self):
conf = self._base_conf()
result = self.mgr._validate_ovpn(conf)
self.assertIn('proto udp', result)
self.assertIn('remote vpn.example.com 1194', result)
def test_up_script_is_stripped(self):
conf = self._base_conf('up /sbin/connect.sh\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('up /sbin/connect.sh', result)
def test_down_script_is_stripped(self):
conf = self._base_conf('down /sbin/disconnect.sh\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('down /sbin/disconnect.sh', result)
def test_script_security_is_stripped(self):
conf = self._base_conf('script-security 2\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('script-security', result)
def test_plugin_is_stripped(self):
conf = self._base_conf('plugin /path/to/plugin.so\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('plugin', result)
def test_route_up_is_stripped(self):
conf = self._base_conf('route-up /sbin/route_cmd\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('route-up', result)
def test_route_pre_down_is_stripped(self):
conf = self._base_conf('route-pre-down /sbin/cleanup\n')
result = self.mgr._validate_ovpn(conf)
self.assertNotIn('route-pre-down', result)
def test_proto_udp_is_preserved(self):
conf = self._base_conf()
result = self.mgr._validate_ovpn(conf)
self.assertIn('proto udp', result)
def test_remote_directive_is_preserved(self):
conf = self._base_conf()
result = self.mgr._validate_ovpn(conf)
self.assertIn('remote vpn.example.com 1194', result)
def test_comments_are_preserved(self):
conf = self._base_conf('# this is a comment\n')
result = self.mgr._validate_ovpn(conf)
self.assertIn('# this is a comment', result)
def test_non_string_input_raises_value_error(self):
with self.assertRaises(ValueError):
self.mgr._validate_ovpn(42)
def test_result_ends_with_newline(self):
conf = self._base_conf()
result = self.mgr._validate_ovpn(conf)
self.assertTrue(result.endswith('\n'))
def test_all_forbidden_directives_stripped_together(self):
conf = self._base_conf(
'up /s\ndown /s\nscript-security 2\nplugin /p\nroute-up /r\nroute-pre-down /r\n'
)
result = self.mgr._validate_ovpn(conf)
for directive in ('up ', 'down ', 'script-security', 'plugin', 'route-up', 'route-pre-down'):
self.assertNotIn(directive, result)
# Safe directives survive
self.assertIn('proto udp', result)
# ---------------------------------------------------------------------------
# upload_wireguard_ext
# ---------------------------------------------------------------------------
class TestUploadWireguardExt(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.mgr = _make_manager(tmp_dir=self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _valid_conf(self):
return "[Interface]\nPrivateKey = abc\nAddress = 10.99.0.1/24\n\n[Peer]\nPublicKey = xyz\n"
def test_valid_conf_returns_ok_true(self):
result = self.mgr.upload_wireguard_ext(self._valid_conf())
self.assertTrue(result['ok'])
def test_valid_conf_writes_file_to_correct_path(self):
self.mgr.upload_wireguard_ext(self._valid_conf())
expected = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config', 'wg_ext0.conf')
self.assertTrue(os.path.isfile(expected), f'Expected file at {expected}')
def test_valid_conf_file_has_mode_0600(self):
self.mgr.upload_wireguard_ext(self._valid_conf())
path = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config', 'wg_ext0.conf')
mode = stat.S_IMODE(os.stat(path).st_mode)
self.assertEqual(mode, 0o600, f'Expected 0600, got {oct(mode)}')
def test_wg0_interface_returns_ok_false_with_error(self):
bad_conf = "[Interface]\nName = wg0\nPrivateKey = abc\n"
result = self.mgr.upload_wireguard_ext(bad_conf)
self.assertFalse(result['ok'])
self.assertIn('error', result)
self.assertIn('wg0', result['error'])
def test_file_content_has_hooks_stripped(self):
conf = "[Interface]\nPrivateKey = abc\nPostUp = evil\n"
self.mgr.upload_wireguard_ext(conf)
path = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config', 'wg_ext0.conf')
with open(path) as f:
content = f.read()
self.assertNotIn('PostUp', content)
self.assertIn('PrivateKey', content)
# ---------------------------------------------------------------------------
# upload_openvpn
# ---------------------------------------------------------------------------
class TestUploadOpenvpn(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.mgr = _make_manager(tmp_dir=self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _valid_ovpn(self):
return "client\ndev tun\nproto udp\nremote vpn.example.com 1194\n"
def test_valid_name_and_conf_returns_ok_true(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
self.assertTrue(result['ok'])
def test_valid_conf_writes_file_at_correct_path(self):
self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
expected = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'my-vpn.ovpn')
self.assertTrue(os.path.isfile(expected), f'Expected file at {expected}')
def test_valid_conf_file_has_mode_0600(self):
self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
path = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'my-vpn.ovpn')
mode = stat.S_IMODE(os.stat(path).st_mode)
self.assertEqual(mode, 0o600, f'Expected 0600, got {oct(mode)}')
def test_name_with_spaces_returns_ok_false(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my vpn')
self.assertFalse(result['ok'])
self.assertIn('error', result)
def test_name_with_slash_returns_ok_false(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='../evil')
self.assertFalse(result['ok'])
def test_name_with_uppercase_returns_ok_false(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='MyVPN')
self.assertFalse(result['ok'])
def test_name_too_long_returns_ok_false(self):
long_name = 'a' * 33
result = self.mgr.upload_openvpn(self._valid_ovpn(), name=long_name)
self.assertFalse(result['ok'])
def test_valid_hyphenated_name_passes(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
self.assertTrue(result['ok'])
def test_valid_underscore_name_passes(self):
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my_vpn')
self.assertTrue(result['ok'])
def test_default_name_default_passes(self):
result = self.mgr.upload_openvpn(self._valid_ovpn())
self.assertTrue(result['ok'])
expected = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'default.ovpn')
self.assertTrue(os.path.isfile(expected))
def test_hooks_stripped_from_stored_file(self):
conf = "client\ndev tun\nup /sbin/bad.sh\nproto udp\n"
self.mgr.upload_openvpn(conf, name='clean')
path = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'clean.ovpn')
with open(path) as f:
content = f.read()
self.assertNotIn('up /sbin/bad.sh', content)
self.assertIn('proto udp', content)
# ---------------------------------------------------------------------------
# _migrate_legacy_configs
# ---------------------------------------------------------------------------
class TestMigrateLegacyConfigs(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_no_op_when_legacy_dir_absent(self):
"""No errors when legacy connectivity/ dir does not exist."""
mgr = _make_manager(tmp_dir=self.tmp)
# Should not raise; legacy dir simply doesn't exist
mgr._migrate_legacy_configs(os.path.join(self.tmp, 'nonexistent'))
def test_wg_conf_copied_to_new_location(self):
legacy_wg = os.path.join(self.tmp, 'connectivity', 'wireguard_ext')
os.makedirs(legacy_wg)
src = os.path.join(legacy_wg, 'wg_ext0.conf')
with open(src, 'w') as f:
f.write('[Interface]\nPrivateKey = abc\n')
mgr = _make_manager(tmp_dir=self.tmp)
dst = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config', 'wg_ext0.conf')
self.assertTrue(os.path.isfile(dst), f'Expected migrated file at {dst}')
def test_ovpn_copied_to_new_location(self):
legacy_ovpn = os.path.join(self.tmp, 'connectivity', 'openvpn')
os.makedirs(legacy_ovpn)
src = os.path.join(legacy_ovpn, 'default.ovpn')
with open(src, 'w') as f:
f.write('client\ndev tun\n')
mgr = _make_manager(tmp_dir=self.tmp)
dst = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'default.ovpn')
self.assertTrue(os.path.isfile(dst), f'Expected migrated file at {dst}')
def test_existing_dst_not_overwritten(self):
legacy_wg = os.path.join(self.tmp, 'connectivity', 'wireguard_ext')
os.makedirs(legacy_wg)
with open(os.path.join(legacy_wg, 'wg_ext0.conf'), 'w') as f:
f.write('legacy\n')
# Pre-create the destination with different content
dst_dir = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config')
os.makedirs(dst_dir, exist_ok=True)
dst = os.path.join(dst_dir, 'wg_ext0.conf')
with open(dst, 'w') as f:
f.write('existing\n')
_make_manager(tmp_dir=self.tmp)
with open(dst) as f:
self.assertEqual(f.read(), 'existing\n')
# ---------------------------------------------------------------------------
# get_status
# ---------------------------------------------------------------------------
class TestGetStatus(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _mgr_with_subprocess_ok(self):
mgr = _make_manager(tmp_dir=self.tmp)
return mgr
def test_returns_dict(self):
mgr = self._mgr_with_subprocess_ok()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
status = mgr.get_status()
self.assertIsInstance(status, dict)
def test_service_key_equals_connectivity(self):
mgr = self._mgr_with_subprocess_ok()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
status = mgr.get_status()
self.assertEqual(status['service'], 'connectivity')
def test_running_key_present_and_true(self):
mgr = self._mgr_with_subprocess_ok()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
status = mgr.get_status()
self.assertIn('running', status)
self.assertTrue(status['running'])
def test_exits_key_present(self):
mgr = self._mgr_with_subprocess_ok()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
status = mgr.get_status()
self.assertIn('exits', status)
# ---------------------------------------------------------------------------
# list_exits
# ---------------------------------------------------------------------------
class TestListExits(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_returns_list(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
self.assertIsInstance(exits, list)
def test_each_item_has_type_field(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
for item in exits:
self.assertIn('type', item, f'Missing "type" in {item}')
def test_each_item_has_status_fields(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
for item in exits:
# _exit_status returns configured + iface_up (or subset)
self.assertIn('configured', item, f'Missing "configured" in {item}')
def test_default_not_in_exit_list(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
types = [e['type'] for e in exits]
self.assertNotIn('default', types)
def test_list_contains_wireguard_ext_openvpn_tor(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
types = {e['type'] for e in exits}
self.assertIn('wireguard_ext', types)
self.assertIn('openvpn', types)
self.assertIn('tor', types)
# ---------------------------------------------------------------------------
# set_peer_exit
# ---------------------------------------------------------------------------
class TestSetPeerExit(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _mgr(self, peer_registry=None):
if peer_registry is None:
peer_registry = MagicMock()
peer_registry.set_peer_exit_via.return_value = True
peer_registry.list_peers.return_value = []
return _make_manager(tmp_dir=self.tmp, peer_registry=peer_registry)
def test_valid_exit_type_returns_ok_true(self):
mgr = self._mgr()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.set_peer_exit('alice', 'wireguard_ext')
self.assertTrue(result['ok'])
def test_valid_exit_type_default_returns_ok_true(self):
mgr = self._mgr()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.set_peer_exit('alice', 'default')
self.assertTrue(result['ok'])
def test_invalid_exit_type_returns_ok_false(self):
mgr = self._mgr()
result = mgr.set_peer_exit('alice', 'shadowsocks')
self.assertFalse(result['ok'])
self.assertIn('error', result)
def test_invalid_exit_type_error_mentions_type(self):
mgr = self._mgr()
result = mgr.set_peer_exit('alice', 'bad_type')
self.assertIn('bad_type', result['error'])
def test_calls_peer_registry_set_peer_exit_via_with_correct_args(self):
pr = MagicMock()
pr.set_peer_exit_via.return_value = True
pr.list_peers.return_value = []
mgr = self._mgr(peer_registry=pr)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.set_peer_exit('bob', 'openvpn')
pr.set_peer_exit_via.assert_called_once_with('bob', 'openvpn')
def test_peer_not_found_in_registry_returns_ok_false(self):
pr = MagicMock()
pr.set_peer_exit_via.return_value = False # peer not found
pr.list_peers.return_value = []
mgr = self._mgr(peer_registry=pr)
result = mgr.set_peer_exit('unknown-peer', 'tor')
self.assertFalse(result['ok'])
self.assertIn('error', result)
def test_invalid_peer_name_returns_ok_false(self):
mgr = self._mgr()
result = mgr.set_peer_exit('peer with spaces!', 'default')
self.assertFalse(result['ok'])
def test_no_peer_registry_returns_ok_false(self):
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=None)
result = mgr.set_peer_exit('alice', 'wireguard_ext')
self.assertFalse(result['ok'])
self.assertIn('error', result)
# ---------------------------------------------------------------------------
# get_peer_exits
# ---------------------------------------------------------------------------
class TestGetPeerExits(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_returns_dict(self):
mgr = _make_manager(tmp_dir=self.tmp)
result = mgr.get_peer_exits()
self.assertIsInstance(result, dict)
def test_maps_peer_names_to_exit_types(self):
pr = MagicMock()
pr.list_peers.return_value = [
{'peer': 'alice', 'exit_via': 'wireguard_ext'},
{'peer': 'bob', 'exit_via': 'default'},
]
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
result = mgr.get_peer_exits()
self.assertEqual(result['alice'], 'wireguard_ext')
self.assertEqual(result['bob'], 'default')
def test_peer_without_exit_via_defaults_to_default(self):
pr = MagicMock()
pr.list_peers.return_value = [{'peer': 'charlie'}]
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
result = mgr.get_peer_exits()
self.assertEqual(result['charlie'], 'default')
def test_calls_peer_registry_list_peers(self):
pr = MagicMock()
pr.list_peers.return_value = []
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
mgr.get_peer_exits()
pr.list_peers.assert_called()
def test_no_peer_registry_returns_empty_dict(self):
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=None)
result = mgr.get_peer_exits()
self.assertEqual(result, {})
def test_empty_peer_list_returns_empty_dict(self):
pr = MagicMock()
pr.list_peers.return_value = []
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
result = mgr.get_peer_exits()
self.assertEqual(result, {})
# ---------------------------------------------------------------------------
# apply_routes
# ---------------------------------------------------------------------------
class TestApplyRoutes(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def test_returns_dict_with_ok_key(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.apply_routes()
self.assertIsInstance(result, dict)
self.assertIn('ok', result)
def test_returns_ok_true_on_success(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.apply_routes()
self.assertTrue(result['ok'])
def test_calls_ensure_chains(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(mgr, '_ensure_chains') as mock_ensure, \
patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.apply_routes()
mock_ensure.assert_called_once()
def test_calls_subprocess_for_iptables(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.apply_routes()
self.assertTrue(mock_sp.run.called)
# At least one call should involve 'iptables'
calls_str = str(mock_sp.run.call_args_list)
self.assertIn('iptables', calls_str)
def test_subprocess_failure_is_non_fatal_returns_ok_true(self):
"""apply_routes must not raise even when every subprocess call fails."""
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='error')
result = mgr.apply_routes()
# Must not raise; should still return a dict (ok may be True because
# routing errors are logged as warnings, not propagated)
self.assertIsInstance(result, dict)
self.assertIn('ok', result)
def test_ensure_chains_exception_is_non_fatal(self):
"""If _ensure_chains raises, apply_routes still returns a dict."""
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(mgr, '_ensure_chains', side_effect=RuntimeError('chain error')), \
patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.apply_routes()
self.assertIsInstance(result, dict)
def test_peer_with_wireguard_ext_exit_generates_mark_rule(self):
"""Peers with a non-default exit should trigger _add_mark_rule calls."""
pr = MagicMock()
pr.list_peers.return_value = [
{'peer': 'alice', 'exit_via': 'wireguard_ext'},
]
pr.get_peer.return_value = {'peer': 'alice', 'ip': '172.20.0.50/32'}
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
with patch.object(mgr, '_add_mark_rule') as mock_mark, \
patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.apply_routes()
mock_mark.assert_called()
call_args = mock_mark.call_args[0]
self.assertEqual(call_args[0], '172.20.0.50') # IP without CIDR
def test_peer_with_default_exit_skips_mark_rule(self):
"""Peers on default exit must not generate mark rules."""
pr = MagicMock()
pr.list_peers.return_value = [
{'peer': 'bob', 'exit_via': 'default'},
]
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
with patch.object(mgr, '_add_mark_rule') as mock_mark, \
patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
mgr.apply_routes()
mock_mark.assert_not_called()
def test_rules_applied_count_in_result(self):
mgr = _make_manager(tmp_dir=self.tmp)
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
result = mgr.apply_routes()
self.assertIn('rules_applied', result)
self.assertIsInstance(result['rules_applied'], int)
# ---------------------------------------------------------------------------
# _exit_status — status string + store-service bridge
# ---------------------------------------------------------------------------
class TestExitStatus(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _mgr(self, installed=None):
config_manager = MagicMock()
config_manager.get_installed_services.return_value = installed or {}
return _make_manager(tmp_dir=self.tmp, config_manager=config_manager)
def test_status_not_configured_when_nothing_present(self):
mgr = self._mgr()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
info = mgr._exit_status('wireguard_ext')
self.assertEqual(info['status'], 'not_configured')
self.assertFalse(info['configured'])
self.assertFalse(info['iface_up'])
def test_status_configured_when_legacy_file_present(self):
mgr = self._mgr()
path = os.path.join(mgr.wireguard_ext_dir, 'wg_ext0.conf')
with open(path, 'w') as f:
f.write('[Interface]\nPrivateKey = abc\n')
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
info = mgr._exit_status('wireguard_ext')
self.assertTrue(info['configured'])
self.assertEqual(info['status'], 'configured')
def test_status_active_when_iface_up(self):
mgr = self._mgr()
path = os.path.join(mgr.wireguard_ext_dir, 'wg_ext0.conf')
with open(path, 'w') as f:
f.write('[Interface]\nPrivateKey = abc\n')
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = MagicMock(
returncode=0, stdout='4: wg_ext0: <UP,LOWER_UP>', stderr=''
)
info = mgr._exit_status('wireguard_ext')
self.assertTrue(info['iface_up'])
self.assertEqual(info['status'], 'active')
def test_store_installed_wireguard_ext_reports_configured(self):
mgr = self._mgr(installed={'wireguard-ext': {'manifest': {'id': 'wireguard-ext'}}})
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
info = mgr._exit_status('wireguard_ext')
self.assertTrue(info['configured'])
self.assertEqual(info['status'], 'configured')
def test_store_installed_openvpn_client_reports_configured(self):
mgr = self._mgr(installed={'openvpn-client': {'manifest': {'id': 'openvpn-client'}}})
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
info = mgr._exit_status('openvpn')
self.assertTrue(info['configured'])
self.assertEqual(info['status'], 'configured')
def test_unrelated_store_service_does_not_configure_exit(self):
mgr = self._mgr(installed={'email': {'manifest': {'id': 'email'}}})
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
info = mgr._exit_status('wireguard_ext')
self.assertFalse(info['configured'])
self.assertEqual(info['status'], 'not_configured')
def test_running_container_reports_configured(self):
mgr = self._mgr()
def fake_run(cmd, **kwargs):
if 'inspect' in cmd:
return MagicMock(returncode=0, stdout='true\n', stderr='')
return MagicMock(returncode=1, stdout='', stderr='')
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.side_effect = fake_run
info = mgr._exit_status('wireguard_ext')
self.assertTrue(info['configured'])
self.assertEqual(info['status'], 'configured')
def test_stopped_container_does_not_configure_exit(self):
mgr = self._mgr()
def fake_run(cmd, **kwargs):
if 'inspect' in cmd:
return MagicMock(returncode=0, stdout='false\n', stderr='')
return MagicMock(returncode=1, stdout='', stderr='')
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.side_effect = fake_run
info = mgr._exit_status('wireguard_ext')
self.assertFalse(info['configured'])
def test_list_exits_entries_have_status_string(self):
mgr = self._mgr()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = _mock_subprocess_ok()
exits = mgr.list_exits()
for item in exits:
self.assertIn('status', item)
self.assertIn(item['status'], ('active', 'configured', 'not_configured'))
def test_tor_defaults_to_configured(self):
mgr = self._mgr()
with patch.object(cm_module, 'subprocess') as mock_sp:
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
info = mgr._exit_status('tor')
self.assertTrue(info['configured'])
self.assertEqual(info['status'], 'configured')
if __name__ == '__main__':
unittest.main()