feat: connectivity redesign phase 1 — multi-instance connection data model
Unit Tests / test (push) Successful in 12m51s
Unit Tests / test (push) Successful in 12m51s
Migrate from the single-exit-per-type model (one wireguard_exit, one tor_exit, etc.) to N named connection instances, each carrying its own resource allocations and vault-backed secret refs. config_manager.py: - Connectivity v2 schema: top-level `connections` list, each entry has id, name, type, enabled, status, config, secret_ref, and allocated resources (mark, table, iface, redirect_port). - Helpers: get_connectivity / list_connections / get_connection / add_connection / update_connection / delete_connection / set_connection_status. - v1→v2 migration: promotes legacy wireguard_exit / tor fields into the new list on first load; idempotent on v2 configs. connectivity_manager.py: - Resource allocator: per-instance fwmark range 0x1000–0x1FFF, routing table range 1000+, interface names, and redirect ports 9100–9199; all tracked in config to survive restarts. - Connection CRUD: create / update / delete / list / get with vault secret refs for WireGuard private keys and Tor credentials. - Single-Tor enforcement: rejects a second tor/tor_bridge instance at creation time. - Per-instance config validation for each connection type. - apply_routes, peer wiring, and egress hookups are intentionally left unchanged in this phase; they land in later phases alongside UI. tests/test_connectivity_connections.py (new, 473 lines): - Allocator uniqueness, v1→v2 migration round-trip, CRUD lifecycle, single-Tor enforcement, and status transitions. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,473 @@
|
||||
"""
|
||||
Tests for the connectivity v2 data model — named connection instances.
|
||||
|
||||
Covers the resource allocator (no mark/table/port collisions, correct iface
|
||||
naming, iface vs redirect-port split), the v1→v2 migration (legacy exits →
|
||||
one connection each, secret repointing, idempotency, empty case), connection
|
||||
CRUD (validation, single-tor, duplicate name, secret refs not values, delete
|
||||
frees resources + vault secret, delete blocked when referenced), and computed
|
||||
status.state (added vs configured).
|
||||
|
||||
ConfigManager runs against a tmp dir; the vault is an in-memory fake. No real
|
||||
Docker/iptables/subprocess is invoked (apply_routes is never called here).
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
|
||||
|
||||
from config_manager import ConfigManager
|
||||
from connectivity_manager import ConnectivityManager
|
||||
|
||||
VALID_KEY = (
|
||||
'-----BEGIN OPENSSH PRIVATE KEY-----\n'
|
||||
'b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAAB\n'
|
||||
'-----END OPENSSH PRIVATE KEY-----\n'
|
||||
)
|
||||
VALID_KNOWN_HOSTS = (
|
||||
'ssh.example.com,203.0.113.5 ssh-ed25519 '
|
||||
'AAAAC3NzaC1lZDI1NTE5AAAAIB5d0o0Yw1xP1Yw1xP1Yw1xP1Yw1xP1Yw1xP1Yw1xP1Y'
|
||||
)
|
||||
|
||||
|
||||
class FakeVault:
|
||||
"""In-memory stand-in for VaultManager's secret store."""
|
||||
|
||||
def __init__(self):
|
||||
self.store = {}
|
||||
|
||||
def store_secret(self, name, value):
|
||||
self.store[name] = value
|
||||
return True
|
||||
|
||||
def get_secret(self, name):
|
||||
return self.store.get(name)
|
||||
|
||||
def delete_secret(self, name):
|
||||
if name in self.store:
|
||||
del self.store[name]
|
||||
return True
|
||||
return False
|
||||
|
||||
def list_secrets(self):
|
||||
return list(self.store.keys())
|
||||
|
||||
|
||||
def _sshuttle_cfg(**overrides):
|
||||
cfg = {
|
||||
'host': 'ssh.example.com',
|
||||
'port': 22,
|
||||
'user': 'tunnel',
|
||||
'auth': 'key',
|
||||
'known_hosts': VALID_KNOWN_HOSTS,
|
||||
'exclude_subnets': ['10.0.0.0/8'],
|
||||
}
|
||||
cfg.update(overrides)
|
||||
return cfg
|
||||
|
||||
|
||||
def _proxy_cfg(**overrides):
|
||||
cfg = {'scheme': 'http', 'host': 'proxy.example.com', 'port': 3128}
|
||||
cfg.update(overrides)
|
||||
return cfg
|
||||
|
||||
|
||||
class _Base(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp()
|
||||
self.cfg_file = os.path.join(self.tmp, 'cell_config.json')
|
||||
self.data_dir = os.path.join(self.tmp, 'data')
|
||||
os.makedirs(self.data_dir, exist_ok=True)
|
||||
self.cm = ConfigManager(self.cfg_file, self.data_dir)
|
||||
self.vault = FakeVault()
|
||||
self.peer_registry = MagicMock()
|
||||
self.peer_registry.list_peers.return_value = []
|
||||
with patch.object(ConnectivityManager, '_subscribe_to_events',
|
||||
lambda self: None):
|
||||
self.mgr = ConnectivityManager(
|
||||
config_manager=self.cm,
|
||||
peer_registry=self.peer_registry,
|
||||
vault_manager=self.vault,
|
||||
data_dir=self.data_dir,
|
||||
config_dir=self.tmp,
|
||||
)
|
||||
# Start CRUD tests from a clean migrated v2 (no legacy exits). Migration
|
||||
# is exercised separately in TestMigration; here we want an empty slate
|
||||
# so the always-configured Tor default doesn't get auto-migrated in.
|
||||
self.cm.configs['connectivity'] = {'version': 2, 'connections': []}
|
||||
self.cm._save_all_configs()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Resource allocator
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestAllocator(_Base):
|
||||
|
||||
def test_iface_types_get_iface_not_port(self):
|
||||
for ctype in ('wireguard_ext', 'openvpn'):
|
||||
cid = 'conn_deadbeef'
|
||||
mark, table, iface, port = self.mgr._allocate_resources(ctype, cid)
|
||||
self.assertIsNotNone(iface)
|
||||
self.assertIsNone(port)
|
||||
self.assertTrue(iface.startswith(
|
||||
self.mgr.IFACE_PREFIXES[ctype]))
|
||||
self.assertLessEqual(len(iface), 15)
|
||||
|
||||
def test_redirect_types_get_port_not_iface(self):
|
||||
for ctype in ('tor', 'sshuttle', 'proxy'):
|
||||
cid = 'conn_abcd1234'
|
||||
mark, table, iface, port = self.mgr._allocate_resources(ctype, cid)
|
||||
self.assertIsNone(iface)
|
||||
self.assertIsNotNone(port)
|
||||
self.assertTrue(
|
||||
self.mgr.REDIRECT_PORT_BASE <= port <= self.mgr.REDIRECT_PORT_MAX)
|
||||
|
||||
def test_iface_uses_8hex_id(self):
|
||||
_m, _t, iface, _p = self.mgr._allocate_resources(
|
||||
'wireguard_ext', 'conn_0123456789')
|
||||
self.assertEqual(iface, 'wgext_01234567')
|
||||
|
||||
def test_no_collisions_across_many_connections(self):
|
||||
created = []
|
||||
# Mix of iface + redirect types; tor is single-instance so exclude it.
|
||||
plan = (['wireguard_ext', 'openvpn', 'sshuttle', 'proxy'] * 4)
|
||||
for i, ctype in enumerate(plan):
|
||||
if ctype == 'sshuttle':
|
||||
res = self.mgr.create_connection(
|
||||
ctype, f'ssh-{i}', _sshuttle_cfg(),
|
||||
secrets={'private_key': VALID_KEY})
|
||||
elif ctype == 'proxy':
|
||||
res = self.mgr.create_connection(ctype, f'px-{i}', _proxy_cfg())
|
||||
elif ctype == 'wireguard_ext':
|
||||
res = self.mgr.create_connection(
|
||||
ctype, f'wg-{i}', {},
|
||||
secrets={'conf': '[Interface]\nPrivateKey = x\n'})
|
||||
else:
|
||||
res = self.mgr.create_connection(
|
||||
ctype, f'ov-{i}', {},
|
||||
secrets={'conf': 'client\nremote vpn.example.com 1194\n'})
|
||||
self.assertTrue(res['ok'], res)
|
||||
created.append(res['connection'])
|
||||
|
||||
marks = [c['mark'] for c in created]
|
||||
tables = [c['table'] for c in created]
|
||||
ifaces = [c['iface'] for c in created if c['iface']]
|
||||
ports = [c['redirect_port'] for c in created if c['redirect_port']]
|
||||
self.assertEqual(len(marks), len(set(marks)))
|
||||
self.assertEqual(len(tables), len(set(tables)))
|
||||
self.assertEqual(len(ifaces), len(set(ifaces)))
|
||||
self.assertEqual(len(ports), len(set(ports)))
|
||||
for m in marks:
|
||||
self.assertEqual(m % self.mgr.MARK_STRIDE, 0)
|
||||
self.assertTrue(self.mgr.MARK_BASE <= m <= self.mgr.MARK_MAX)
|
||||
for t in tables:
|
||||
self.assertGreaterEqual(t, self.mgr.TABLE_BASE)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Connection CRUD
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCreateConnection(_Base):
|
||||
|
||||
def test_create_proxy_allocates_and_persists(self):
|
||||
res = self.mgr.create_connection('proxy', 'Work proxy', _proxy_cfg())
|
||||
self.assertTrue(res['ok'], res)
|
||||
rec = res['connection']
|
||||
self.assertTrue(rec['id'].startswith('conn_'))
|
||||
self.assertEqual(rec['type'], 'proxy')
|
||||
self.assertIsNotNone(rec['redirect_port'])
|
||||
self.assertIsNone(rec['iface'])
|
||||
# Persisted to config_manager.
|
||||
self.assertEqual(len(self.cm.list_connections()), 1)
|
||||
|
||||
def test_secret_stored_as_ref_not_value(self):
|
||||
res = self.mgr.create_connection(
|
||||
'sshuttle', 'tun', _sshuttle_cfg(),
|
||||
secrets={'private_key': VALID_KEY})
|
||||
self.assertTrue(res['ok'], res)
|
||||
rec = res['connection']
|
||||
cid = rec['id']
|
||||
# Refs recorded, values absent from the record + config.
|
||||
self.assertIn(f'{cid}_private_key', rec['secret_refs'])
|
||||
self.assertIn(f'{cid}_known_hosts', rec['secret_refs'])
|
||||
self.assertNotIn('private_key', rec['config'])
|
||||
stored = self.cm.get_connection(cid)
|
||||
self.assertNotIn('private_key', stored['config'])
|
||||
# Secret value lives only in the vault.
|
||||
self.assertEqual(self.vault.get_secret(f'{cid}_private_key'), VALID_KEY)
|
||||
|
||||
def test_invalid_type_rejected(self):
|
||||
res = self.mgr.create_connection('bogus', 'x', {})
|
||||
self.assertFalse(res['ok'])
|
||||
|
||||
def test_invalid_name_rejected(self):
|
||||
res = self.mgr.create_connection('proxy', '', _proxy_cfg())
|
||||
self.assertFalse(res['ok'])
|
||||
|
||||
def test_invalid_config_rejected(self):
|
||||
res = self.mgr.create_connection('proxy', 'p', _proxy_cfg(scheme='ftp'))
|
||||
self.assertFalse(res['ok'])
|
||||
self.assertEqual(len(self.cm.list_connections()), 0)
|
||||
|
||||
def test_single_tor_enforced(self):
|
||||
first = self.mgr.create_connection('tor', 'Tor 1', {})
|
||||
self.assertTrue(first['ok'], first)
|
||||
second = self.mgr.create_connection('tor', 'Tor 2', {})
|
||||
self.assertFalse(second['ok'])
|
||||
self.assertIn('single', second['error'].lower())
|
||||
|
||||
def test_duplicate_name_rejected(self):
|
||||
a = self.mgr.create_connection('proxy', 'Dup', _proxy_cfg())
|
||||
self.assertTrue(a['ok'])
|
||||
b = self.mgr.create_connection('proxy', 'dup', _proxy_cfg(port=8080))
|
||||
self.assertFalse(b['ok'])
|
||||
self.assertIn('already exists', b['error'])
|
||||
|
||||
def test_state_configured_when_complete(self):
|
||||
res = self.mgr.create_connection('proxy', 'full', _proxy_cfg())
|
||||
self.assertEqual(res['connection']['status']['state'], 'configured')
|
||||
self.assertEqual(res['connection']['status']['health'], 'unknown')
|
||||
|
||||
def test_state_added_when_missing_secret(self):
|
||||
# wireguard_ext with no conf secret → required secret missing → added.
|
||||
# (validator requires conf, so create one that omits it must fail; to
|
||||
# exercise the 'added' path we build a record directly via migration of
|
||||
# an installed-but-unconfigured exit — see TestStatusState.)
|
||||
res = self.mgr.create_connection(
|
||||
'wireguard_ext', 'wgfull', {},
|
||||
secrets={'conf': '[Interface]\nPrivateKey = x\n'})
|
||||
self.assertTrue(res['ok'], res)
|
||||
self.assertEqual(res['connection']['status']['state'], 'configured')
|
||||
|
||||
|
||||
class TestDeleteConnection(_Base):
|
||||
|
||||
def test_delete_frees_resources_and_secret(self):
|
||||
res = self.mgr.create_connection(
|
||||
'sshuttle', 'tun', _sshuttle_cfg(),
|
||||
secrets={'private_key': VALID_KEY})
|
||||
cid = res['connection']['id']
|
||||
self.assertIn(f'{cid}_private_key', self.vault.list_secrets())
|
||||
|
||||
out = self.mgr.delete_connection(cid)
|
||||
self.assertTrue(out['ok'], out)
|
||||
self.assertEqual(len(self.cm.list_connections()), 0)
|
||||
self.assertNotIn(f'{cid}_private_key', self.vault.list_secrets())
|
||||
self.assertNotIn(f'{cid}_known_hosts', self.vault.list_secrets())
|
||||
|
||||
def test_freed_resources_are_reusable(self):
|
||||
a = self.mgr.create_connection('proxy', 'A', _proxy_cfg())
|
||||
mark_a = a['connection']['mark']
|
||||
self.mgr.delete_connection(a['connection']['id'])
|
||||
b = self.mgr.create_connection('proxy', 'B', _proxy_cfg())
|
||||
self.assertEqual(b['connection']['mark'], mark_a)
|
||||
|
||||
def test_delete_blocked_when_referenced(self):
|
||||
res = self.mgr.create_connection('proxy', 'ref', _proxy_cfg())
|
||||
cid = res['connection']['id']
|
||||
self.peer_registry.list_peers.return_value = [
|
||||
{'peer': 'alice', 'connection_id': cid}]
|
||||
out = self.mgr.delete_connection(cid)
|
||||
self.assertFalse(out['ok'])
|
||||
self.assertIn('in use', out['error'])
|
||||
# Still present, secret intact.
|
||||
self.assertEqual(len(self.cm.list_connections()), 1)
|
||||
|
||||
def test_delete_unknown_returns_error(self):
|
||||
out = self.mgr.delete_connection('conn_nope')
|
||||
self.assertFalse(out['ok'])
|
||||
|
||||
|
||||
class TestUpdateConnection(_Base):
|
||||
|
||||
def test_update_name(self):
|
||||
res = self.mgr.create_connection('proxy', 'old', _proxy_cfg())
|
||||
cid = res['connection']['id']
|
||||
out = self.mgr.update_connection(cid, name='new')
|
||||
self.assertTrue(out['ok'], out)
|
||||
self.assertEqual(self.cm.get_connection(cid)['name'], 'new')
|
||||
|
||||
def test_update_duplicate_name_rejected(self):
|
||||
a = self.mgr.create_connection('proxy', 'A', _proxy_cfg())
|
||||
b = self.mgr.create_connection('proxy', 'B', _proxy_cfg(port=8080))
|
||||
out = self.mgr.update_connection(b['connection']['id'], name='A')
|
||||
self.assertFalse(out['ok'])
|
||||
|
||||
def test_update_secret_repoints_vault(self):
|
||||
res = self.mgr.create_connection(
|
||||
'sshuttle', 'tun', _sshuttle_cfg(),
|
||||
secrets={'private_key': VALID_KEY})
|
||||
cid = res['connection']['id']
|
||||
new_key = VALID_KEY.replace('AAAAB', 'BBBBB')
|
||||
out = self.mgr.update_connection(
|
||||
cid, config=_sshuttle_cfg(), secrets={'private_key': new_key})
|
||||
self.assertTrue(out['ok'], out)
|
||||
self.assertEqual(self.vault.get_secret(f'{cid}_private_key'), new_key)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# list/get enrichment
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestListGet(_Base):
|
||||
|
||||
def test_list_and_get_strip_secret_values(self):
|
||||
res = self.mgr.create_connection(
|
||||
'sshuttle', 'tun', _sshuttle_cfg(),
|
||||
secrets={'private_key': VALID_KEY})
|
||||
cid = res['connection']['id']
|
||||
listed = self.mgr.list_connections()
|
||||
self.assertEqual(len(listed), 1)
|
||||
self.assertNotIn('private_key', listed[0]['config'])
|
||||
self.assertEqual(listed[0]['status']['health'], 'unknown')
|
||||
got = self.mgr.get_connection(cid)
|
||||
self.assertEqual(got['id'], cid)
|
||||
self.assertNotIn('private_key', got['config'])
|
||||
|
||||
def test_get_unknown_returns_none(self):
|
||||
self.assertIsNone(self.mgr.get_connection('conn_missing'))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Computed status.state
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestStatusState(_Base):
|
||||
|
||||
def test_compute_state_proxy(self):
|
||||
self.assertEqual(
|
||||
self.mgr._compute_state('proxy', _proxy_cfg(), []), 'configured')
|
||||
self.assertEqual(
|
||||
self.mgr._compute_state('proxy', {'scheme': 'http'}, []), 'added')
|
||||
|
||||
def test_compute_state_sshuttle_password_auth(self):
|
||||
cfg = _sshuttle_cfg(auth='password')
|
||||
cid = 'conn_x'
|
||||
self.assertEqual(
|
||||
self.mgr._compute_state(cfg['auth'] and 'sshuttle', cfg,
|
||||
[f'{cid}_known_hosts', f'{cid}_password']),
|
||||
'configured')
|
||||
self.assertEqual(
|
||||
self.mgr._compute_state('sshuttle', cfg, [f'{cid}_known_hosts']),
|
||||
'added')
|
||||
|
||||
def test_compute_state_tor_always_configured(self):
|
||||
self.assertEqual(self.mgr._compute_state('tor', {}, []), 'configured')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# v1 → v2 migration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMigration(_Base):
|
||||
|
||||
def test_empty_legacy_becomes_version2_empty(self):
|
||||
# No exits configured: _exit_status reports nothing configured.
|
||||
self.cm.configs['connectivity'] = {'exits': {}, 'peer_exit_map': {}}
|
||||
with patch.object(self.mgr, '_exit_status',
|
||||
return_value={'configured': False}):
|
||||
v2 = self.cm.get_connectivity()
|
||||
self.assertEqual(v2['version'], 2)
|
||||
self.assertEqual(v2['connections'], [])
|
||||
|
||||
def test_each_configured_exit_becomes_one_connection(self):
|
||||
configured = {'sshuttle', 'proxy', 'tor'}
|
||||
|
||||
def fake_status(exit_type):
|
||||
return {'configured': exit_type in configured}
|
||||
|
||||
# Seed legacy config + secrets.
|
||||
self.cm.configs['connectivity'] = {
|
||||
'exits': {
|
||||
'sshuttle': {'host': 'h', 'port': 22, 'user': 'u',
|
||||
'auth': 'key', 'exclude_subnets': []},
|
||||
'proxy': {'scheme': 'http', 'host': 'p', 'port': 3128, 'user': ''},
|
||||
},
|
||||
'peer_exit_map': {},
|
||||
}
|
||||
self.vault.store_secret('connectivity_sshuttle_key', VALID_KEY)
|
||||
|
||||
with patch.object(self.mgr, '_exit_status', side_effect=fake_status):
|
||||
v2 = self.cm.get_connectivity()
|
||||
|
||||
self.assertEqual(v2['version'], 2)
|
||||
types = sorted(c['type'] for c in v2['connections'])
|
||||
self.assertEqual(types, ['proxy', 'sshuttle', 'tor'])
|
||||
# Exactly one per type.
|
||||
self.assertEqual(len(v2['connections']), 3)
|
||||
|
||||
def test_secret_repointed_not_lost(self):
|
||||
self.cm.configs['connectivity'] = {
|
||||
'exits': {'sshuttle': {'host': 'h', 'port': 22, 'user': 'u',
|
||||
'auth': 'key', 'exclude_subnets': []}},
|
||||
'peer_exit_map': {},
|
||||
}
|
||||
self.vault.store_secret('connectivity_sshuttle_key', VALID_KEY)
|
||||
|
||||
def fake_status(exit_type):
|
||||
return {'configured': exit_type == 'sshuttle'}
|
||||
|
||||
with patch.object(self.mgr, '_exit_status', side_effect=fake_status):
|
||||
v2 = self.cm.get_connectivity()
|
||||
|
||||
conn = next(c for c in v2['connections'] if c['type'] == 'sshuttle')
|
||||
cid = conn['id']
|
||||
new_ref = f'{cid}_private_key'
|
||||
# Old name gone, new name holds the value — nothing lost.
|
||||
self.assertIsNone(self.vault.get_secret('connectivity_sshuttle_key'))
|
||||
self.assertEqual(self.vault.get_secret(new_ref), VALID_KEY)
|
||||
self.assertIn(new_ref, conn['secret_refs'])
|
||||
|
||||
def test_migration_idempotent(self):
|
||||
def fake_status(exit_type):
|
||||
return {'configured': exit_type == 'proxy'}
|
||||
|
||||
self.cm.configs['connectivity'] = {
|
||||
'exits': {'proxy': {'scheme': 'http', 'host': 'p', 'port': 3128}},
|
||||
'peer_exit_map': {},
|
||||
}
|
||||
with patch.object(self.mgr, '_exit_status', side_effect=fake_status):
|
||||
first = self.cm.get_connectivity()
|
||||
ids_first = [c['id'] for c in first['connections']]
|
||||
# Second access must not re-run migration / duplicate.
|
||||
second = self.cm.get_connectivity()
|
||||
ids_second = [c['id'] for c in second['connections']]
|
||||
self.assertEqual(ids_first, ids_second)
|
||||
self.assertEqual(len(second['connections']), 1)
|
||||
|
||||
def test_migration_allocates_distinct_resources(self):
|
||||
configured = {'wireguard_ext', 'openvpn', 'sshuttle', 'proxy', 'tor'}
|
||||
self.cm.configs['connectivity'] = {'exits': {}, 'peer_exit_map': {}}
|
||||
|
||||
def fake_status(exit_type):
|
||||
return {'configured': exit_type in configured}
|
||||
|
||||
with patch.object(self.mgr, '_exit_status', side_effect=fake_status):
|
||||
v2 = self.cm.get_connectivity()
|
||||
|
||||
conns = v2['connections']
|
||||
marks = [c['mark'] for c in conns]
|
||||
tables = [c['table'] for c in conns]
|
||||
self.assertEqual(len(marks), len(set(marks)))
|
||||
self.assertEqual(len(tables), len(set(tables)))
|
||||
for c in conns:
|
||||
if c['type'] in ('wireguard_ext', 'openvpn'):
|
||||
self.assertIsNotNone(c['iface'])
|
||||
self.assertIsNone(c['redirect_port'])
|
||||
else:
|
||||
self.assertIsNone(c['iface'])
|
||||
self.assertIsNotNone(c['redirect_port'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user