89aed4efe0
Unit Tests / test (push) Successful in 12m6s
apply_routes now iterates over connection instances rather than types:
each instance gets its own fwmark, routing table, interface, and
redirect_port via _routing_connections / _resolve_peer_connection /
_apply_connection_for_src; kill-switch is enforced per iface-instance.
Old per-type MARKS/TABLES constants are kept only as migration scaffolding.
peer_registry: exit_via is now stored as a connection id (or 'default');
_migrate_exit_via_to_connection_id runs on _load_peers to upgrade legacy
type-string values; set_peer_exit_via validates against known connection
ids; VALID_EXIT_VIA removed; config_manager wired in from managers.py.
egress_manager: egress_overrides keyed by service_id → connection_id;
local MARKS/TABLES/EXIT_TYPES/_REDIRECT_PORTS/_add_tor_redirect removed;
(mark, table, redirect_port) resolved at apply-time via
connectivity_manager.get_connection; manifest egress.allowed still
enforced by connection type.
api/app.py + api.js: PUT peer/service exit endpoints accept {connection_id};
back-compat shim resolves a legacy type string to its single active instance.
Tests extended: two same-type instances produce distinct marks/tables/ports;
peer exit_via and egress override id migrations round-trip correctly;
single-instance behaviour is equivalent to the old type-keyed path.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1051 lines
42 KiB
Python
1051 lines
42 KiB
Python
"""
|
|
Tests for ConnectivityManager — config validation, file upload, status,
|
|
exit listing, peer exit assignment, and route application.
|
|
|
|
All subprocess calls (docker exec iptables/ip) and filesystem paths are
|
|
isolated so these tests run without any live infrastructure.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import stat
|
|
import tempfile
|
|
import shutil
|
|
import unittest
|
|
from unittest.mock import MagicMock, patch, call
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
|
|
|
|
import connectivity_manager as cm_module
|
|
from connectivity_manager import ConnectivityManager
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Factory helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SENTINEL = object()
|
|
|
|
|
|
def _make_manager(tmp_dir=None, peer_registry=_SENTINEL, config_manager=None):
|
|
"""Build a ConnectivityManager with mocked dependencies.
|
|
|
|
Pass peer_registry=None explicitly to test the no-registry path.
|
|
Omit peer_registry (or pass _SENTINEL) to get a default MagicMock.
|
|
"""
|
|
if tmp_dir is None:
|
|
tmp_dir = tempfile.mkdtemp()
|
|
|
|
if config_manager is None:
|
|
config_manager = MagicMock()
|
|
config_manager.get_identity.return_value = {
|
|
'cell_name': 'test',
|
|
'ip_range': '172.20.0.0/16',
|
|
}
|
|
config_manager.list_connections.return_value = []
|
|
|
|
if peer_registry is _SENTINEL:
|
|
peer_registry = MagicMock()
|
|
peer_registry.list_peers.return_value = []
|
|
|
|
with patch.object(ConnectivityManager, '_subscribe_to_events', lambda self: None):
|
|
mgr = ConnectivityManager(
|
|
config_manager=config_manager,
|
|
peer_registry=peer_registry,
|
|
data_dir=tmp_dir,
|
|
config_dir=tmp_dir,
|
|
)
|
|
return mgr
|
|
|
|
|
|
def _mock_subprocess_ok():
|
|
"""Return a MagicMock mimicking a successful subprocess.run result."""
|
|
return MagicMock(returncode=0, stdout='', stderr='')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _validate_wg_conf
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestValidateWgConf(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.mgr = _make_manager(tmp_dir=self.tmp)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_valid_config_passes_and_returns_cleaned_text(self):
|
|
conf = "[Interface]\nPrivateKey = abc123\nAddress = 10.99.0.1/24\n\n[Peer]\nPublicKey = xyz\n"
|
|
result = self.mgr._validate_wg_conf(conf)
|
|
self.assertIn('[Interface]', result)
|
|
self.assertIn('PrivateKey', result)
|
|
self.assertIn('[Peer]', result)
|
|
|
|
def test_postupdate_is_stripped_silently(self):
|
|
conf = "[Interface]\nPrivateKey = abc\nPostUp = iptables -A FORWARD -j ACCEPT\n"
|
|
result = self.mgr._validate_wg_conf(conf)
|
|
self.assertNotIn('PostUp', result)
|
|
self.assertIn('PrivateKey', result)
|
|
|
|
def test_postdown_is_stripped_silently(self):
|
|
conf = "[Interface]\nPrivateKey = abc\nPostDown = iptables -D FORWARD -j ACCEPT\n"
|
|
result = self.mgr._validate_wg_conf(conf)
|
|
self.assertNotIn('PostDown', result)
|
|
|
|
def test_preup_is_stripped_silently(self):
|
|
conf = "[Interface]\nPrivateKey = abc\nPreUp = /sbin/modprobe wireguard\n"
|
|
result = self.mgr._validate_wg_conf(conf)
|
|
self.assertNotIn('PreUp', result)
|
|
|
|
def test_predown_is_stripped_silently(self):
|
|
conf = "[Interface]\nPrivateKey = abc\nPreDown = /sbin/rmmod wireguard\n"
|
|
result = self.mgr._validate_wg_conf(conf)
|
|
self.assertNotIn('PreDown', result)
|
|
|
|
def test_interface_wg0_raises_value_error(self):
|
|
conf = "[Interface]\nName = wg0\nPrivateKey = abc\n"
|
|
with self.assertRaises(ValueError) as ctx:
|
|
self.mgr._validate_wg_conf(conf)
|
|
self.assertIn('wg0', str(ctx.exception))
|
|
|
|
def test_interface_wg0_via_interface_key_raises_value_error(self):
|
|
# 'Interface = wg0' (not just 'Name = wg0') should also be caught
|
|
conf = "[Interface]\nInterface = wg0\nPrivateKey = abc\n"
|
|
with self.assertRaises(ValueError):
|
|
self.mgr._validate_wg_conf(conf)
|
|
|
|
def test_interface_wg_ext0_passes(self):
|
|
conf = "[Interface]\nName = wg_ext0\nPrivateKey = abc\nAddress = 10.99.0.1/24\n"
|
|
result = self.mgr._validate_wg_conf(conf)
|
|
self.assertIn('wg_ext0', result)
|
|
|
|
def test_non_string_input_raises_value_error(self):
|
|
with self.assertRaises(ValueError):
|
|
self.mgr._validate_wg_conf(None)
|
|
|
|
def test_result_ends_with_newline(self):
|
|
conf = "[Interface]\nPrivateKey = abc\n"
|
|
result = self.mgr._validate_wg_conf(conf)
|
|
self.assertTrue(result.endswith('\n'))
|
|
|
|
def test_multiple_hooks_all_stripped(self):
|
|
conf = (
|
|
"[Interface]\n"
|
|
"PrivateKey = abc\n"
|
|
"PostUp = cmd1\n"
|
|
"PostDown = cmd2\n"
|
|
"PreUp = cmd3\n"
|
|
"PreDown = cmd4\n"
|
|
)
|
|
result = self.mgr._validate_wg_conf(conf)
|
|
for hook in ('PostUp', 'PostDown', 'PreUp', 'PreDown'):
|
|
self.assertNotIn(hook, result)
|
|
self.assertIn('PrivateKey', result)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _validate_ovpn
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestValidateOvpn(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.mgr = _make_manager(tmp_dir=self.tmp)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _base_conf(self, extra=''):
|
|
return f"client\ndev tun\nproto udp\nremote vpn.example.com 1194\n{extra}"
|
|
|
|
def test_valid_ovpn_passes(self):
|
|
conf = self._base_conf()
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertIn('proto udp', result)
|
|
self.assertIn('remote vpn.example.com 1194', result)
|
|
|
|
def test_up_script_is_stripped(self):
|
|
conf = self._base_conf('up /sbin/connect.sh\n')
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertNotIn('up /sbin/connect.sh', result)
|
|
|
|
def test_down_script_is_stripped(self):
|
|
conf = self._base_conf('down /sbin/disconnect.sh\n')
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertNotIn('down /sbin/disconnect.sh', result)
|
|
|
|
def test_script_security_is_stripped(self):
|
|
conf = self._base_conf('script-security 2\n')
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertNotIn('script-security', result)
|
|
|
|
def test_plugin_is_stripped(self):
|
|
conf = self._base_conf('plugin /path/to/plugin.so\n')
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertNotIn('plugin', result)
|
|
|
|
def test_route_up_is_stripped(self):
|
|
conf = self._base_conf('route-up /sbin/route_cmd\n')
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertNotIn('route-up', result)
|
|
|
|
def test_route_pre_down_is_stripped(self):
|
|
conf = self._base_conf('route-pre-down /sbin/cleanup\n')
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertNotIn('route-pre-down', result)
|
|
|
|
def test_proto_udp_is_preserved(self):
|
|
conf = self._base_conf()
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertIn('proto udp', result)
|
|
|
|
def test_remote_directive_is_preserved(self):
|
|
conf = self._base_conf()
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertIn('remote vpn.example.com 1194', result)
|
|
|
|
def test_comments_are_preserved(self):
|
|
conf = self._base_conf('# this is a comment\n')
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertIn('# this is a comment', result)
|
|
|
|
def test_non_string_input_raises_value_error(self):
|
|
with self.assertRaises(ValueError):
|
|
self.mgr._validate_ovpn(42)
|
|
|
|
def test_result_ends_with_newline(self):
|
|
conf = self._base_conf()
|
|
result = self.mgr._validate_ovpn(conf)
|
|
self.assertTrue(result.endswith('\n'))
|
|
|
|
def test_all_forbidden_directives_stripped_together(self):
|
|
conf = self._base_conf(
|
|
'up /s\ndown /s\nscript-security 2\nplugin /p\nroute-up /r\nroute-pre-down /r\n'
|
|
)
|
|
result = self.mgr._validate_ovpn(conf)
|
|
for directive in ('up ', 'down ', 'script-security', 'plugin', 'route-up', 'route-pre-down'):
|
|
self.assertNotIn(directive, result)
|
|
# Safe directives survive
|
|
self.assertIn('proto udp', result)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# upload_wireguard_ext
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUploadWireguardExt(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.mgr = _make_manager(tmp_dir=self.tmp)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _valid_conf(self):
|
|
return "[Interface]\nPrivateKey = abc\nAddress = 10.99.0.1/24\n\n[Peer]\nPublicKey = xyz\n"
|
|
|
|
def test_valid_conf_returns_ok_true(self):
|
|
result = self.mgr.upload_wireguard_ext(self._valid_conf())
|
|
self.assertTrue(result['ok'])
|
|
|
|
def test_valid_conf_writes_file_to_correct_path(self):
|
|
self.mgr.upload_wireguard_ext(self._valid_conf())
|
|
expected = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config', 'wg_ext0.conf')
|
|
self.assertTrue(os.path.isfile(expected), f'Expected file at {expected}')
|
|
|
|
def test_valid_conf_file_has_mode_0600(self):
|
|
self.mgr.upload_wireguard_ext(self._valid_conf())
|
|
path = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config', 'wg_ext0.conf')
|
|
mode = stat.S_IMODE(os.stat(path).st_mode)
|
|
self.assertEqual(mode, 0o600, f'Expected 0600, got {oct(mode)}')
|
|
|
|
def test_wg0_interface_returns_ok_false_with_error(self):
|
|
bad_conf = "[Interface]\nName = wg0\nPrivateKey = abc\n"
|
|
result = self.mgr.upload_wireguard_ext(bad_conf)
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('error', result)
|
|
self.assertIn('wg0', result['error'])
|
|
|
|
def test_file_content_has_hooks_stripped(self):
|
|
conf = "[Interface]\nPrivateKey = abc\nPostUp = evil\n"
|
|
self.mgr.upload_wireguard_ext(conf)
|
|
path = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config', 'wg_ext0.conf')
|
|
with open(path) as f:
|
|
content = f.read()
|
|
self.assertNotIn('PostUp', content)
|
|
self.assertIn('PrivateKey', content)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# upload_openvpn
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUploadOpenvpn(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.mgr = _make_manager(tmp_dir=self.tmp)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _valid_ovpn(self):
|
|
return "client\ndev tun\nproto udp\nremote vpn.example.com 1194\n"
|
|
|
|
def test_valid_name_and_conf_returns_ok_true(self):
|
|
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
|
|
self.assertTrue(result['ok'])
|
|
|
|
def test_valid_conf_writes_file_at_correct_path(self):
|
|
self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
|
|
expected = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'my-vpn.ovpn')
|
|
self.assertTrue(os.path.isfile(expected), f'Expected file at {expected}')
|
|
|
|
def test_valid_conf_file_has_mode_0600(self):
|
|
self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
|
|
path = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'my-vpn.ovpn')
|
|
mode = stat.S_IMODE(os.stat(path).st_mode)
|
|
self.assertEqual(mode, 0o600, f'Expected 0600, got {oct(mode)}')
|
|
|
|
def test_name_with_spaces_returns_ok_false(self):
|
|
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my vpn')
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('error', result)
|
|
|
|
def test_name_with_slash_returns_ok_false(self):
|
|
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='../evil')
|
|
self.assertFalse(result['ok'])
|
|
|
|
def test_name_with_uppercase_returns_ok_false(self):
|
|
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='MyVPN')
|
|
self.assertFalse(result['ok'])
|
|
|
|
def test_name_too_long_returns_ok_false(self):
|
|
long_name = 'a' * 33
|
|
result = self.mgr.upload_openvpn(self._valid_ovpn(), name=long_name)
|
|
self.assertFalse(result['ok'])
|
|
|
|
def test_valid_hyphenated_name_passes(self):
|
|
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my-vpn')
|
|
self.assertTrue(result['ok'])
|
|
|
|
def test_valid_underscore_name_passes(self):
|
|
result = self.mgr.upload_openvpn(self._valid_ovpn(), name='my_vpn')
|
|
self.assertTrue(result['ok'])
|
|
|
|
def test_default_name_default_passes(self):
|
|
result = self.mgr.upload_openvpn(self._valid_ovpn())
|
|
self.assertTrue(result['ok'])
|
|
expected = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'default.ovpn')
|
|
self.assertTrue(os.path.isfile(expected))
|
|
|
|
def test_hooks_stripped_from_stored_file(self):
|
|
conf = "client\ndev tun\nup /sbin/bad.sh\nproto udp\n"
|
|
self.mgr.upload_openvpn(conf, name='clean')
|
|
path = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'clean.ovpn')
|
|
with open(path) as f:
|
|
content = f.read()
|
|
self.assertNotIn('up /sbin/bad.sh', content)
|
|
self.assertIn('proto udp', content)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _migrate_legacy_configs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMigrateLegacyConfigs(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_no_op_when_legacy_dir_absent(self):
|
|
"""No errors when legacy connectivity/ dir does not exist."""
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
# Should not raise; legacy dir simply doesn't exist
|
|
mgr._migrate_legacy_configs(os.path.join(self.tmp, 'nonexistent'))
|
|
|
|
def test_wg_conf_copied_to_new_location(self):
|
|
legacy_wg = os.path.join(self.tmp, 'connectivity', 'wireguard_ext')
|
|
os.makedirs(legacy_wg)
|
|
src = os.path.join(legacy_wg, 'wg_ext0.conf')
|
|
with open(src, 'w') as f:
|
|
f.write('[Interface]\nPrivateKey = abc\n')
|
|
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
dst = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config', 'wg_ext0.conf')
|
|
self.assertTrue(os.path.isfile(dst), f'Expected migrated file at {dst}')
|
|
|
|
def test_ovpn_copied_to_new_location(self):
|
|
legacy_ovpn = os.path.join(self.tmp, 'connectivity', 'openvpn')
|
|
os.makedirs(legacy_ovpn)
|
|
src = os.path.join(legacy_ovpn, 'default.ovpn')
|
|
with open(src, 'w') as f:
|
|
f.write('client\ndev tun\n')
|
|
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
dst = os.path.join(self.tmp, 'services', 'openvpn-client', 'config', 'default.ovpn')
|
|
self.assertTrue(os.path.isfile(dst), f'Expected migrated file at {dst}')
|
|
|
|
def test_existing_dst_not_overwritten(self):
|
|
legacy_wg = os.path.join(self.tmp, 'connectivity', 'wireguard_ext')
|
|
os.makedirs(legacy_wg)
|
|
with open(os.path.join(legacy_wg, 'wg_ext0.conf'), 'w') as f:
|
|
f.write('legacy\n')
|
|
|
|
# Pre-create the destination with different content
|
|
dst_dir = os.path.join(self.tmp, 'services', 'wireguard-ext', 'config')
|
|
os.makedirs(dst_dir, exist_ok=True)
|
|
dst = os.path.join(dst_dir, 'wg_ext0.conf')
|
|
with open(dst, 'w') as f:
|
|
f.write('existing\n')
|
|
|
|
_make_manager(tmp_dir=self.tmp)
|
|
with open(dst) as f:
|
|
self.assertEqual(f.read(), 'existing\n')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetStatus(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _mgr_with_subprocess_ok(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
return mgr
|
|
|
|
def test_returns_dict(self):
|
|
mgr = self._mgr_with_subprocess_ok()
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
status = mgr.get_status()
|
|
self.assertIsInstance(status, dict)
|
|
|
|
def test_service_key_equals_connectivity(self):
|
|
mgr = self._mgr_with_subprocess_ok()
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
status = mgr.get_status()
|
|
self.assertEqual(status['service'], 'connectivity')
|
|
|
|
def test_running_key_present_and_true(self):
|
|
mgr = self._mgr_with_subprocess_ok()
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
status = mgr.get_status()
|
|
self.assertIn('running', status)
|
|
self.assertTrue(status['running'])
|
|
|
|
def test_exits_key_present(self):
|
|
mgr = self._mgr_with_subprocess_ok()
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
status = mgr.get_status()
|
|
self.assertIn('exits', status)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_exits
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestListExits(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_returns_list(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
exits = mgr.list_exits()
|
|
self.assertIsInstance(exits, list)
|
|
|
|
def test_each_item_has_type_field(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
exits = mgr.list_exits()
|
|
for item in exits:
|
|
self.assertIn('type', item, f'Missing "type" in {item}')
|
|
|
|
def test_each_item_has_status_fields(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
exits = mgr.list_exits()
|
|
for item in exits:
|
|
# _exit_status returns configured + iface_up (or subset)
|
|
self.assertIn('configured', item, f'Missing "configured" in {item}')
|
|
|
|
def test_default_not_in_exit_list(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
exits = mgr.list_exits()
|
|
types = [e['type'] for e in exits]
|
|
self.assertNotIn('default', types)
|
|
|
|
def test_list_contains_wireguard_ext_openvpn_tor(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
exits = mgr.list_exits()
|
|
types = {e['type'] for e in exits}
|
|
self.assertIn('wireguard_ext', types)
|
|
self.assertIn('openvpn', types)
|
|
self.assertIn('tor', types)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# set_peer_exit
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSetPeerExit(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _mgr(self, peer_registry=None):
|
|
if peer_registry is None:
|
|
peer_registry = MagicMock()
|
|
peer_registry.set_peer_exit_via.return_value = True
|
|
peer_registry.list_peers.return_value = []
|
|
return _make_manager(tmp_dir=self.tmp, peer_registry=peer_registry)
|
|
|
|
def test_valid_connection_id_returns_ok_true(self):
|
|
mgr = self._mgr()
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
result = mgr.set_peer_exit('alice', 'conn_abcd')
|
|
self.assertTrue(result['ok'])
|
|
|
|
def test_default_returns_ok_true(self):
|
|
mgr = self._mgr()
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
result = mgr.set_peer_exit('alice', 'default')
|
|
self.assertTrue(result['ok'])
|
|
|
|
def test_empty_connection_id_returns_ok_false(self):
|
|
mgr = self._mgr()
|
|
result = mgr.set_peer_exit('alice', '')
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('error', result)
|
|
|
|
def test_unknown_connection_for_existing_peer_returns_ok_false(self):
|
|
"""When the peer exists but the connection id is rejected by the
|
|
registry, set_peer_exit reports an unknown-connection error."""
|
|
pr = MagicMock()
|
|
pr.set_peer_exit_via.return_value = False
|
|
pr.get_peer.return_value = {'peer': 'alice', 'ip': '10.0.0.5'}
|
|
pr.list_peers.return_value = []
|
|
mgr = self._mgr(peer_registry=pr)
|
|
result = mgr.set_peer_exit('alice', 'conn_ghost')
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('unknown connection', result['error'])
|
|
|
|
def test_calls_peer_registry_set_peer_exit_via_with_correct_args(self):
|
|
pr = MagicMock()
|
|
pr.set_peer_exit_via.return_value = True
|
|
pr.list_peers.return_value = []
|
|
pr.get_peer.return_value = {'peer': 'bob', 'exit_via': 'conn_xyz'}
|
|
mgr = self._mgr(peer_registry=pr)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.set_peer_exit('bob', 'conn_xyz')
|
|
pr.set_peer_exit_via.assert_called_once_with('bob', 'conn_xyz')
|
|
|
|
def test_peer_not_found_in_registry_returns_ok_false(self):
|
|
pr = MagicMock()
|
|
pr.set_peer_exit_via.return_value = False # peer not found
|
|
pr.get_peer.return_value = None # peer truly absent
|
|
pr.list_peers.return_value = []
|
|
mgr = self._mgr(peer_registry=pr)
|
|
result = mgr.set_peer_exit('unknown-peer', 'conn_tor')
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('not found', result['error'])
|
|
|
|
def test_invalid_peer_name_returns_ok_false(self):
|
|
mgr = self._mgr()
|
|
result = mgr.set_peer_exit('peer with spaces!', 'default')
|
|
self.assertFalse(result['ok'])
|
|
|
|
def test_no_peer_registry_returns_ok_false(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=None)
|
|
result = mgr.set_peer_exit('alice', 'wireguard_ext')
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('error', result)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_peer_exits
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetPeerExits(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_returns_dict(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
result = mgr.get_peer_exits()
|
|
self.assertIsInstance(result, dict)
|
|
|
|
def test_maps_peer_names_to_exit_types(self):
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = [
|
|
{'peer': 'alice', 'exit_via': 'wireguard_ext'},
|
|
{'peer': 'bob', 'exit_via': 'default'},
|
|
]
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
|
|
result = mgr.get_peer_exits()
|
|
self.assertEqual(result['alice'], 'wireguard_ext')
|
|
self.assertEqual(result['bob'], 'default')
|
|
|
|
def test_peer_without_exit_via_defaults_to_default(self):
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = [{'peer': 'charlie'}]
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
|
|
result = mgr.get_peer_exits()
|
|
self.assertEqual(result['charlie'], 'default')
|
|
|
|
def test_calls_peer_registry_list_peers(self):
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = []
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
|
|
mgr.get_peer_exits()
|
|
pr.list_peers.assert_called()
|
|
|
|
def test_no_peer_registry_returns_empty_dict(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=None)
|
|
result = mgr.get_peer_exits()
|
|
self.assertEqual(result, {})
|
|
|
|
def test_empty_peer_list_returns_empty_dict(self):
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = []
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr)
|
|
result = mgr.get_peer_exits()
|
|
self.assertEqual(result, {})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# apply_routes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestApplyRoutes(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_returns_dict_with_ok_key(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
result = mgr.apply_routes()
|
|
self.assertIsInstance(result, dict)
|
|
self.assertIn('ok', result)
|
|
|
|
def test_returns_ok_true_on_success(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
result = mgr.apply_routes()
|
|
self.assertTrue(result['ok'])
|
|
|
|
def test_calls_ensure_chains(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(mgr, '_ensure_chains') as mock_ensure, \
|
|
patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.apply_routes()
|
|
mock_ensure.assert_called_once()
|
|
|
|
def test_calls_subprocess_for_iptables(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.apply_routes()
|
|
self.assertTrue(mock_sp.run.called)
|
|
# At least one call should involve 'iptables'
|
|
calls_str = str(mock_sp.run.call_args_list)
|
|
self.assertIn('iptables', calls_str)
|
|
|
|
def test_subprocess_failure_is_non_fatal_returns_ok_true(self):
|
|
"""apply_routes must not raise even when every subprocess call fails."""
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='error')
|
|
result = mgr.apply_routes()
|
|
# Must not raise; should still return a dict (ok may be True because
|
|
# routing errors are logged as warnings, not propagated)
|
|
self.assertIsInstance(result, dict)
|
|
self.assertIn('ok', result)
|
|
|
|
def test_ensure_chains_exception_is_non_fatal(self):
|
|
"""If _ensure_chains raises, apply_routes still returns a dict."""
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(mgr, '_ensure_chains', side_effect=RuntimeError('chain error')), \
|
|
patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
result = mgr.apply_routes()
|
|
self.assertIsInstance(result, dict)
|
|
|
|
def _cm_with_connections(self, connections):
|
|
cm = MagicMock()
|
|
cm.get_identity.return_value = {'cell_name': 't', 'ip_range': '172.20.0.0/16'}
|
|
cm.list_connections.return_value = connections
|
|
return cm
|
|
|
|
def test_peer_with_connection_exit_generates_mark_rule(self):
|
|
"""A peer whose exit_via is a connection id gets that connection's mark."""
|
|
conns = [{'id': 'conn_wg', 'type': 'wireguard_ext', 'enabled': True,
|
|
'mark': 0x1000, 'table': 1000, 'iface': 'wgext_x',
|
|
'redirect_port': None}]
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = [{'peer': 'alice', 'exit_via': 'conn_wg'}]
|
|
pr.get_peer.return_value = {'peer': 'alice', 'ip': '172.20.0.50/32'}
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr,
|
|
config_manager=self._cm_with_connections(conns))
|
|
with patch.object(mgr, '_add_mark_rule') as mock_mark, \
|
|
patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.apply_routes()
|
|
mock_mark.assert_called()
|
|
call_args = mock_mark.call_args[0]
|
|
self.assertEqual(call_args[0], '172.20.0.50') # IP without CIDR
|
|
self.assertEqual(call_args[1], 0x1000) # the connection's mark
|
|
|
|
def test_peer_with_default_exit_skips_mark_rule(self):
|
|
"""Peers on default exit must not generate mark rules."""
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = [{'peer': 'bob', 'exit_via': 'default'}]
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr,
|
|
config_manager=self._cm_with_connections([]))
|
|
with patch.object(mgr, '_add_mark_rule') as mock_mark, \
|
|
patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.apply_routes()
|
|
mock_mark.assert_not_called()
|
|
|
|
def test_rules_applied_count_in_result(self):
|
|
mgr = _make_manager(tmp_dir=self.tmp)
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
result = mgr.apply_routes()
|
|
self.assertIn('rules_applied', result)
|
|
self.assertIsInstance(result['rules_applied'], int)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# apply_routes — instance-aware routing (connectivity v2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestApplyRoutesInstances(unittest.TestCase):
|
|
"""apply_routes must drive routing from connection instances, so two
|
|
instances of the same type route through distinct tables/marks without
|
|
collision, and each peer gets its own connection's mark."""
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _cm(self, connections):
|
|
cm = MagicMock()
|
|
cm.get_identity.return_value = {'cell_name': 't', 'ip_range': '172.20.0.0/16'}
|
|
cm.list_connections.return_value = connections
|
|
return cm
|
|
|
|
@staticmethod
|
|
def _docker_args(call):
|
|
"""Strip the `docker exec <container> <ip|iptables>` prefix from a call."""
|
|
args = call.args[0]
|
|
# args == ['docker', 'exec', CONTAINER, 'ip'|'iptables', <sub-args...>]
|
|
return args[4:]
|
|
|
|
def test_two_wireguard_ext_instances_distinct_tables_no_collision(self):
|
|
conns = [
|
|
{'id': 'conn_a', 'type': 'wireguard_ext', 'enabled': True,
|
|
'mark': 0x1000, 'table': 1000, 'iface': 'wgext_a', 'redirect_port': None},
|
|
{'id': 'conn_b', 'type': 'wireguard_ext', 'enabled': True,
|
|
'mark': 0x1010, 'table': 1001, 'iface': 'wgext_b', 'redirect_port': None},
|
|
]
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = []
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr,
|
|
config_manager=self._cm(conns))
|
|
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.apply_routes()
|
|
|
|
rule_adds = []
|
|
for c in mock_sp.run.call_args_list:
|
|
args = self._docker_args(c)
|
|
if args[:3] == ['rule', 'add', 'fwmark']:
|
|
rule_adds.append(args)
|
|
|
|
# One ip rule per instance, each pointing its own mark at its own table.
|
|
pairs = {(a[3], a[5]) for a in rule_adds} # (fwmark_hex, table)
|
|
self.assertIn(('0x1000', '1000'), pairs)
|
|
self.assertIn(('0x1010', '1001'), pairs)
|
|
# Marks and tables must be distinct — no collision.
|
|
marks = [a[3] for a in rule_adds]
|
|
tables = [a[5] for a in rule_adds]
|
|
self.assertEqual(len(set(marks)), len(marks))
|
|
self.assertEqual(len(set(tables)), len(tables))
|
|
|
|
def test_two_peers_on_two_instances_get_different_marks(self):
|
|
conns = [
|
|
{'id': 'conn_a', 'type': 'wireguard_ext', 'enabled': True,
|
|
'mark': 0x1000, 'table': 1000, 'iface': 'wgext_a', 'redirect_port': None},
|
|
{'id': 'conn_b', 'type': 'wireguard_ext', 'enabled': True,
|
|
'mark': 0x1010, 'table': 1001, 'iface': 'wgext_b', 'redirect_port': None},
|
|
]
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = [
|
|
{'peer': 'alice', 'exit_via': 'conn_a'},
|
|
{'peer': 'bob', 'exit_via': 'conn_b'},
|
|
]
|
|
pr.get_peer.side_effect = lambda n: {
|
|
'alice': {'peer': 'alice', 'ip': '172.20.0.50/32'},
|
|
'bob': {'peer': 'bob', 'ip': '172.20.0.51/32'},
|
|
}[n]
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr,
|
|
config_manager=self._cm(conns))
|
|
|
|
marks_by_ip = {}
|
|
|
|
def capture(src_ip, mark):
|
|
marks_by_ip[src_ip] = mark
|
|
|
|
with patch.object(mgr, '_add_mark_rule', side_effect=capture), \
|
|
patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.apply_routes()
|
|
|
|
self.assertEqual(marks_by_ip['172.20.0.50'], 0x1000)
|
|
self.assertEqual(marks_by_ip['172.20.0.51'], 0x1010)
|
|
self.assertNotEqual(marks_by_ip['172.20.0.50'], marks_by_ip['172.20.0.51'])
|
|
|
|
def test_two_redirect_instances_distinct_ports(self):
|
|
"""Two redirect-style instances REDIRECT their peers to distinct ports."""
|
|
conns = [
|
|
{'id': 'conn_p1', 'type': 'proxy', 'enabled': True,
|
|
'mark': 0x1000, 'table': 1000, 'iface': None, 'redirect_port': 9100},
|
|
{'id': 'conn_p2', 'type': 'proxy', 'enabled': True,
|
|
'mark': 0x1010, 'table': 1001, 'iface': None, 'redirect_port': 9101},
|
|
]
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = [
|
|
{'peer': 'alice', 'exit_via': 'conn_p1'},
|
|
{'peer': 'bob', 'exit_via': 'conn_p2'},
|
|
]
|
|
pr.get_peer.side_effect = lambda n: {
|
|
'alice': {'peer': 'alice', 'ip': '172.20.0.50/32'},
|
|
'bob': {'peer': 'bob', 'ip': '172.20.0.51/32'},
|
|
}[n]
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr,
|
|
config_manager=self._cm(conns))
|
|
|
|
ports_by_ip = {}
|
|
|
|
def capture(src_ip, port):
|
|
ports_by_ip[src_ip] = port
|
|
|
|
with patch.object(mgr, '_add_redirect', side_effect=capture), \
|
|
patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.apply_routes()
|
|
|
|
self.assertEqual(ports_by_ip['172.20.0.50'], 9100)
|
|
self.assertEqual(ports_by_ip['172.20.0.51'], 9101)
|
|
|
|
def test_single_instance_equivalence(self):
|
|
"""An already-migrated cell with one instance per type yields the same
|
|
effective rules as before: one ip rule (mark→table), one mark per
|
|
peer, killswitch on the iface-based instance."""
|
|
conns = [
|
|
{'id': 'conn_wg', 'type': 'wireguard_ext', 'enabled': True,
|
|
'mark': 0x1000, 'table': 1000, 'iface': 'wgext_x', 'redirect_port': None},
|
|
]
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = [{'peer': 'alice', 'exit_via': 'conn_wg'}]
|
|
pr.get_peer.return_value = {'peer': 'alice', 'ip': '172.20.0.50/32'}
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr,
|
|
config_manager=self._cm(conns))
|
|
|
|
with patch.object(mgr, '_add_killswitch') as mock_ks, \
|
|
patch.object(mgr, '_add_mark_rule') as mock_mark, \
|
|
patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.apply_routes()
|
|
|
|
mock_mark.assert_called_once_with('172.20.0.50', 0x1000)
|
|
mock_ks.assert_called_once_with(0x1000, 'wgext_x')
|
|
|
|
def test_disabled_instance_is_skipped(self):
|
|
conns = [
|
|
{'id': 'conn_off', 'type': 'wireguard_ext', 'enabled': False,
|
|
'mark': 0x1000, 'table': 1000, 'iface': 'wgext_x', 'redirect_port': None},
|
|
]
|
|
pr = MagicMock()
|
|
pr.list_peers.return_value = []
|
|
mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr,
|
|
config_manager=self._cm(conns))
|
|
with patch.object(mgr, '_add_killswitch') as mock_ks, \
|
|
patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
mgr.apply_routes()
|
|
mock_ks.assert_not_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _exit_status — status string + store-service bridge
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestExitStatus(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _mgr(self, installed=None):
|
|
config_manager = MagicMock()
|
|
config_manager.get_installed_services.return_value = installed or {}
|
|
return _make_manager(tmp_dir=self.tmp, config_manager=config_manager)
|
|
|
|
def test_status_not_configured_when_nothing_present(self):
|
|
mgr = self._mgr()
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
|
|
info = mgr._exit_status('wireguard_ext')
|
|
self.assertEqual(info['status'], 'not_configured')
|
|
self.assertFalse(info['configured'])
|
|
self.assertFalse(info['iface_up'])
|
|
|
|
def test_status_configured_when_legacy_file_present(self):
|
|
mgr = self._mgr()
|
|
path = os.path.join(mgr.wireguard_ext_dir, 'wg_ext0.conf')
|
|
with open(path, 'w') as f:
|
|
f.write('[Interface]\nPrivateKey = abc\n')
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
|
|
info = mgr._exit_status('wireguard_ext')
|
|
self.assertTrue(info['configured'])
|
|
self.assertEqual(info['status'], 'configured')
|
|
|
|
def test_status_active_when_iface_up(self):
|
|
mgr = self._mgr()
|
|
path = os.path.join(mgr.wireguard_ext_dir, 'wg_ext0.conf')
|
|
with open(path, 'w') as f:
|
|
f.write('[Interface]\nPrivateKey = abc\n')
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = MagicMock(
|
|
returncode=0, stdout='4: wg_ext0: <UP,LOWER_UP>', stderr=''
|
|
)
|
|
info = mgr._exit_status('wireguard_ext')
|
|
self.assertTrue(info['iface_up'])
|
|
self.assertEqual(info['status'], 'active')
|
|
|
|
def test_store_installed_wireguard_ext_reports_configured(self):
|
|
mgr = self._mgr(installed={'wireguard-ext': {'manifest': {'id': 'wireguard-ext'}}})
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
|
|
info = mgr._exit_status('wireguard_ext')
|
|
self.assertTrue(info['configured'])
|
|
self.assertEqual(info['status'], 'configured')
|
|
|
|
def test_store_installed_openvpn_client_reports_configured(self):
|
|
mgr = self._mgr(installed={'openvpn-client': {'manifest': {'id': 'openvpn-client'}}})
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
|
|
info = mgr._exit_status('openvpn')
|
|
self.assertTrue(info['configured'])
|
|
self.assertEqual(info['status'], 'configured')
|
|
|
|
def test_unrelated_store_service_does_not_configure_exit(self):
|
|
mgr = self._mgr(installed={'email': {'manifest': {'id': 'email'}}})
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
|
|
info = mgr._exit_status('wireguard_ext')
|
|
self.assertFalse(info['configured'])
|
|
self.assertEqual(info['status'], 'not_configured')
|
|
|
|
def test_running_container_reports_configured(self):
|
|
mgr = self._mgr()
|
|
|
|
def fake_run(cmd, **kwargs):
|
|
if 'inspect' in cmd:
|
|
return MagicMock(returncode=0, stdout='true\n', stderr='')
|
|
return MagicMock(returncode=1, stdout='', stderr='')
|
|
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.side_effect = fake_run
|
|
info = mgr._exit_status('wireguard_ext')
|
|
self.assertTrue(info['configured'])
|
|
self.assertEqual(info['status'], 'configured')
|
|
|
|
def test_stopped_container_does_not_configure_exit(self):
|
|
mgr = self._mgr()
|
|
|
|
def fake_run(cmd, **kwargs):
|
|
if 'inspect' in cmd:
|
|
return MagicMock(returncode=0, stdout='false\n', stderr='')
|
|
return MagicMock(returncode=1, stdout='', stderr='')
|
|
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.side_effect = fake_run
|
|
info = mgr._exit_status('wireguard_ext')
|
|
self.assertFalse(info['configured'])
|
|
|
|
def test_list_exits_entries_have_status_string(self):
|
|
mgr = self._mgr()
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = _mock_subprocess_ok()
|
|
exits = mgr.list_exits()
|
|
for item in exits:
|
|
self.assertIn('status', item)
|
|
self.assertIn(item['status'], ('active', 'configured', 'not_configured'))
|
|
|
|
def test_tor_defaults_to_configured(self):
|
|
mgr = self._mgr()
|
|
with patch.object(cm_module, 'subprocess') as mock_sp:
|
|
mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='')
|
|
info = mgr._exit_status('tor')
|
|
self.assertTrue(info['configured'])
|
|
self.assertEqual(info['status'], 'configured')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|