feat(cells): Phase 3 tests + Phase 4 UI for cell service-sharing

Phase 3 — tests (50 new, total now 1071):
- test_cell_link_manager: atomicity (WG fail → DNS not called, link not
  persisted), DNS warning non-fatal, inbound_services arg, unknown service
  filtered, update/get permissions, lazy migration of legacy entries
- test_wireguard_manager: subnet overlap rejection (exact, supernet, adjacent
  non-overlapping, different class-A, honours wg0.conf configured network)
- test_firewall_manager: _cell_tag sanitisation, apply_cell_rules emits correct
  ACCEPT/DROP per service + catch-all DROP, clear_cell_rules no-op and exact
  line removal, apply_all_cell_rules iterates with correct args
- test_cells_endpoints: RuntimeError→400, GET /services, GET/PUT permissions
  (200/400/404 paths, service name validation, arg forwarding)

Phase 4 — UI:
- CellNetwork.jsx: replace flat cell list with CellPanel expandable cards;
  add ServiceShareToggle (ARIA switch, saves immediately), InboundServiceBadge
  (read-only), DisconnectConfirmModal (replaces window.confirm); relative
  timestamps; paste validation on blur; WireGuard status merged by public_key
- api.js: add cellLinkAPI.getPermissions, updatePermissions, getServices

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-01 08:45:32 -04:00
parent 0b103ffafb
commit 562d866a65
6 changed files with 1023 additions and 47 deletions
+238
View File
@@ -160,3 +160,241 @@ class TestCellLinkManagerConnections(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
# ---------------------------------------------------------------------------
# TestAddConnectionAtomicity
# ---------------------------------------------------------------------------
class TestAddConnectionAtomicity(unittest.TestCase):
"""Verify that add_connection rolls back correctly when WG or DNS steps fail."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_wg_fail_does_not_call_dns(self):
"""When add_cell_peer returns False, add_cell_dns_forward must NOT be called."""
self.wg.add_cell_peer.return_value = False
with self.assertRaises(RuntimeError):
self.mgr.add_connection(SAMPLE_INVITE)
self.nm.add_cell_dns_forward.assert_not_called()
def test_wg_fail_does_not_persist_link(self):
"""When WG fails, list_connections() must still return [] (nothing persisted)."""
self.wg.add_cell_peer.return_value = False
with self.assertRaises(RuntimeError):
self.mgr.add_connection(SAMPLE_INVITE)
self.assertEqual(self.mgr.list_connections(), [])
def test_wg_fail_raises_runtime_error(self):
"""add_connection raises RuntimeError (not some other exception) when WG fails."""
self.wg.add_cell_peer.return_value = False
with self.assertRaises(RuntimeError):
self.mgr.add_connection(SAMPLE_INVITE)
def test_dns_warning_still_persists_link(self):
"""When DNS returns warnings (not a hard failure), the link IS still saved."""
self.nm.add_cell_dns_forward.return_value = {
'restarted': [],
'warnings': ['CoreDNS reload timed out'],
}
self.mgr.add_connection(SAMPLE_INVITE)
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['cell_name'], 'office')
def test_dns_warning_does_not_raise(self):
"""When DNS returns warnings, add_connection completes without raising."""
self.nm.add_cell_dns_forward.return_value = {
'restarted': [],
'warnings': ['CoreDNS reload timed out'],
}
try:
self.mgr.add_connection(SAMPLE_INVITE)
except Exception as e:
self.fail(f"add_connection raised unexpectedly with DNS warnings: {e}")
# ---------------------------------------------------------------------------
# TestAddConnectionPermissions
# ---------------------------------------------------------------------------
class TestAddConnectionPermissions(unittest.TestCase):
"""Verify that inbound_services controls the permissions field on the saved link."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _get_link(self):
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
return links[0]
def test_add_with_no_inbound_defaults_all_deny(self):
"""No inbound_services arg → all inbound permissions False."""
self.mgr.add_connection(SAMPLE_INVITE)
link = self._get_link()
inbound = link['permissions']['inbound']
for service, allowed in inbound.items():
self.assertFalse(allowed, f"Expected {service} to be False, got {allowed}")
def test_add_with_inbound_services_sets_them(self):
"""inbound_services=['calendar','files'] → those two True, others False."""
self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['calendar', 'files'])
link = self._get_link()
inbound = link['permissions']['inbound']
self.assertTrue(inbound['calendar'])
self.assertTrue(inbound['files'])
self.assertFalse(inbound['mail'])
self.assertFalse(inbound['webdav'])
def test_inbound_invalid_service_ignored(self):
"""Passing 'badservice' in inbound_services does not appear in permissions."""
self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['badservice', 'calendar'])
link = self._get_link()
inbound = link['permissions']['inbound']
self.assertNotIn('badservice', inbound)
# valid one was still applied
self.assertTrue(inbound['calendar'])
# ---------------------------------------------------------------------------
# TestUpdatePermissions
# ---------------------------------------------------------------------------
class TestUpdatePermissions(unittest.TestCase):
"""Tests for the new update_permissions / get_permissions methods."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
# Add a connection so there is something to update
self.mgr.add_connection(SAMPLE_INVITE)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_update_sets_inbound_values(self):
"""update_permissions with inbound={'calendar': True} persists correctly."""
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
mock_fm.apply_cell_rules = MagicMock()
self.mgr.update_permissions('office', {'calendar': True}, {})
# Re-read from disk to confirm persistence
mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
perms = mgr2.get_permissions('office')
self.assertTrue(perms['inbound']['calendar'])
self.assertFalse(perms['inbound']['files'])
def test_update_rejects_unknown_service_by_cleaning_it_out(self):
"""update_permissions with inbound={'bad': True} — 'bad' must not appear in saved perms."""
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
mock_fm.apply_cell_rules = MagicMock()
self.mgr.update_permissions('office', {'bad': True, 'calendar': True}, {})
perms = self.mgr.get_permissions('office')
self.assertNotIn('bad', perms['inbound'])
self.assertTrue(perms['inbound']['calendar'])
def test_update_nonexistent_cell_raises(self):
"""update_permissions on an unknown cell_name raises ValueError."""
with self.assertRaises(ValueError):
self.mgr.update_permissions('nosuchcell', {}, {})
def test_get_permissions_returns_correct(self):
"""get_permissions returns the dict that was saved by update_permissions."""
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
mock_fm.apply_cell_rules = MagicMock()
self.mgr.update_permissions(
'office',
inbound={'calendar': True, 'files': False},
outbound={'mail': True},
)
perms = self.mgr.get_permissions('office')
self.assertIn('inbound', perms)
self.assertIn('outbound', perms)
self.assertTrue(perms['inbound']['calendar'])
self.assertFalse(perms['inbound']['files'])
self.assertTrue(perms['outbound']['mail'])
def test_get_permissions_nonexistent_cell_raises(self):
"""get_permissions on an unknown cell_name raises ValueError."""
with self.assertRaises(ValueError):
self.mgr.get_permissions('nosuchcell')
# ---------------------------------------------------------------------------
# TestLoadMigration
# ---------------------------------------------------------------------------
class TestLoadMigration(unittest.TestCase):
"""Verify _load() lazily injects permissions field when it is missing."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_load_injects_permissions_if_missing(self):
"""Write cell_links.json without permissions; _load should add all-False defaults."""
links_file = os.path.join(self.test_dir, 'cell_links.json')
legacy_links = [
{
'cell_name': 'legacy-office',
'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'legacy-office.cell',
# NO 'permissions' key — simulates pre-migration data
}
]
with open(links_file, 'w') as f:
json.dump(legacy_links, f)
mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
links = mgr.list_connections()
self.assertEqual(len(links), 1)
link = links[0]
self.assertIn('permissions', link)
perms = link['permissions']
self.assertIn('inbound', perms)
self.assertIn('outbound', perms)
for service in ('calendar', 'files', 'mail', 'webdav'):
self.assertFalse(perms['inbound'][service])
self.assertFalse(perms['outbound'][service])
def test_load_migration_persists_to_disk(self):
"""After migration, re-loading the same file returns the injected permissions."""
links_file = os.path.join(self.test_dir, 'cell_links.json')
with open(links_file, 'w') as f:
json.dump([{
'cell_name': 'old-cell',
'public_key': 'somepubkey=',
'vpn_subnet': '10.2.0.0/24',
'dns_ip': '10.2.0.1',
'domain': 'old-cell.cell',
}], f)
mgr1 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
mgr1.list_connections() # triggers migration + save
# Read the file directly and confirm permissions are now on disk
with open(links_file) as f:
raw = json.load(f)
self.assertIn('permissions', raw[0])
+225
View File
@@ -291,5 +291,230 @@ class TestGetCellConnectionStatus(unittest.TestCase):
self.assertIn('error', json.loads(r.data))
class TestAddCellRuntimeError(unittest.TestCase):
"""POST /api/cells — RuntimeError from the manager must now return 400, not 500."""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.cell_link_manager')
def test_add_cell_runtime_error_returns_400(self, mock_clm):
"""When add_connection raises RuntimeError (WG failure), endpoint returns 400."""
mock_clm.add_connection.side_effect = RuntimeError('Failed to add WireGuard peer')
r = self.client.post(
'/api/cells',
data=json.dumps(_VALID_CELL_BODY),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
@patch('app.cell_link_manager')
def test_add_cell_runtime_error_body_contains_message(self, mock_clm):
"""The 400 response for a RuntimeError includes the error message."""
mock_clm.add_connection.side_effect = RuntimeError('WireGuard peer add failed')
r = self.client.post(
'/api/cells',
data=json.dumps(_VALID_CELL_BODY),
content_type='application/json',
)
data = json.loads(r.data)
self.assertIn('WireGuard', data['error'])
class TestListServices(unittest.TestCase):
"""GET /api/cells/services"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
def test_list_services_returns_200(self):
"""GET /api/cells/services returns HTTP 200."""
r = self.client.get('/api/cells/services')
self.assertEqual(r.status_code, 200)
def test_list_services_returns_services_key(self):
"""Response body has a 'services' key."""
r = self.client.get('/api/cells/services')
data = json.loads(r.data)
self.assertIn('services', data)
def test_list_services_returns_list(self):
"""'services' value is a non-empty list."""
r = self.client.get('/api/cells/services')
data = json.loads(r.data)
self.assertIsInstance(data['services'], list)
self.assertGreater(len(data['services']), 0)
def test_list_services_includes_known_services(self):
"""'services' includes the four known shareable services."""
r = self.client.get('/api/cells/services')
services = json.loads(r.data)['services']
for expected in ('calendar', 'files', 'mail', 'webdav'):
self.assertIn(expected, services)
class TestGetCellPermissions(unittest.TestCase):
"""GET /api/cells/<name>/permissions"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.cell_link_manager')
def test_get_permissions_returns_200(self, mock_clm):
"""GET /api/cells/office/permissions returns 200 when cell exists."""
mock_clm.get_permissions.return_value = {
'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {'calendar': False, 'files': False, 'mail': False, 'webdav': False},
}
r = self.client.get('/api/cells/office/permissions')
self.assertEqual(r.status_code, 200)
@patch('app.cell_link_manager')
def test_get_permissions_response_has_inbound_and_outbound(self, mock_clm):
"""Response body contains 'inbound' and 'outbound' keys."""
mock_clm.get_permissions.return_value = {
'inbound': {'calendar': False, 'files': False, 'mail': False, 'webdav': False},
'outbound': {'calendar': False, 'files': False, 'mail': False, 'webdav': False},
}
r = self.client.get('/api/cells/office/permissions')
data = json.loads(r.data)
self.assertIn('inbound', data)
self.assertIn('outbound', data)
@patch('app.cell_link_manager')
def test_get_permissions_unknown_cell_returns_404(self, mock_clm):
"""ValueError from get_permissions maps to 404."""
mock_clm.get_permissions.side_effect = ValueError('cell not found')
r = self.client.get('/api/cells/nosuchcell/permissions')
self.assertEqual(r.status_code, 404)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_get_permissions_passes_cell_name(self, mock_clm):
"""The cell_name URL segment is forwarded to get_permissions."""
mock_clm.get_permissions.return_value = {'inbound': {}, 'outbound': {}}
self.client.get('/api/cells/faraway/permissions')
mock_clm.get_permissions.assert_called_once_with('faraway')
class TestUpdateCellPermissions(unittest.TestCase):
"""PUT /api/cells/<name>/permissions"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
_VALID_PERM_BODY = {
'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {'calendar': False, 'files': False, 'mail': False, 'webdav': False},
}
@patch('app.cell_link_manager')
@patch('app.peer_registry')
@patch('app.firewall_manager')
@patch('app.config_manager')
def test_update_permissions_returns_200(self, mock_cfg, mock_fm, mock_pr, mock_clm):
"""PUT with valid inbound/outbound returns 200."""
mock_cfg.configs = {'_identity': {'domain': 'cell'}}
mock_pr.list_peers.return_value = []
mock_clm.list_connections.return_value = []
mock_clm.update_permissions.return_value = {
'cell_name': 'office',
'permissions': self._VALID_PERM_BODY,
}
mock_fm.apply_all_dns_rules.return_value = True
r = self.client.put(
'/api/cells/office/permissions',
data=json.dumps(self._VALID_PERM_BODY),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('message', data)
self.assertIn('link', data)
@patch('app.cell_link_manager')
def test_update_permissions_unknown_service_returns_400(self, mock_clm):
"""PUT body containing an unknown service name returns 400."""
body = {
'inbound': {'bad_service': True, 'calendar': True},
'outbound': {},
}
r = self.client.put(
'/api/cells/office/permissions',
data=json.dumps(body),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
# update_permissions should NOT have been called when validation fails
mock_clm.update_permissions.assert_not_called()
@patch('app.cell_link_manager')
def test_update_permissions_unknown_cell_returns_404(self, mock_clm):
"""ValueError from update_permissions (cell not found) maps to 404."""
mock_clm.update_permissions.side_effect = ValueError('cell not found')
r = self.client.put(
'/api/cells/nosuchcell/permissions',
data=json.dumps(self._VALID_PERM_BODY),
content_type='application/json',
)
self.assertEqual(r.status_code, 404)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_update_permissions_no_body_returns_400(self, mock_clm):
"""PUT with no JSON body returns 400."""
r = self.client.put('/api/cells/office/permissions')
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_clm.update_permissions.assert_not_called()
@patch('app.cell_link_manager')
def test_update_permissions_outbound_unknown_service_returns_400(self, mock_clm):
"""Unknown service in outbound (not just inbound) also returns 400."""
body = {
'inbound': {'calendar': True},
'outbound': {'hacked': True},
}
r = self.client.put(
'/api/cells/office/permissions',
data=json.dumps(body),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
@patch('app.cell_link_manager')
@patch('app.peer_registry')
@patch('app.firewall_manager')
@patch('app.config_manager')
def test_update_permissions_passes_inbound_outbound_to_manager(
self, mock_cfg, mock_fm, mock_pr, mock_clm):
"""update_permissions is called with inbound and outbound dicts from the body."""
mock_cfg.configs = {'_identity': {'domain': 'cell'}}
mock_pr.list_peers.return_value = []
mock_clm.list_connections.return_value = []
mock_clm.update_permissions.return_value = {
'cell_name': 'office', 'permissions': self._VALID_PERM_BODY
}
mock_fm.apply_all_dns_rules.return_value = True
self.client.put(
'/api/cells/office/permissions',
data=json.dumps(self._VALID_PERM_BODY),
content_type='application/json',
)
mock_clm.update_permissions.assert_called_once_with(
'office',
self._VALID_PERM_BODY['inbound'],
self._VALID_PERM_BODY['outbound'],
)
if __name__ == '__main__':
unittest.main()
+216
View File
@@ -406,5 +406,221 @@ class TestUpdateServiceIps(unittest.TestCase):
self.assertNotIn('172.20.0.21', dest_ips)
# ---------------------------------------------------------------------------
# TestCellRules
# ---------------------------------------------------------------------------
class TestCellRules(unittest.TestCase):
"""Tests for apply_cell_rules, clear_cell_rules, _cell_tag, and apply_all_cell_rules."""
# ── helpers ───────────────────────────────────────────────────────────────
def _capture_apply(self, cell_name, vpn_subnet, inbound_services):
"""Run apply_cell_rules with _wg_exec mocked; return list of captured iptables arg lists."""
calls_made = []
def fake_wg_exec(args):
calls_made.append(args)
m = MagicMock()
m.returncode = 0
m.stdout = ''
return m
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec):
firewall_manager.apply_cell_rules(cell_name, vpn_subnet, inbound_services)
return [c for c in calls_made if 'iptables' in c]
def _targets_for_dest(self, iptables_calls, dest_ip):
"""Return list of -j targets where -d matches dest_ip."""
targets = []
for c in iptables_calls:
if '-d' in c and dest_ip in c and '-j' in c:
targets.append(c[c.index('-j') + 1])
return targets
# ── _cell_tag ─────────────────────────────────────────────────────────────
def test_cell_tag_sanitises_spaces_and_punctuation(self):
"""_cell_tag replaces non-alphanumeric chars with dashes."""
tag = firewall_manager._cell_tag('my cell!')
self.assertTrue(tag.startswith('pic-cell-'))
self.assertNotIn(' ', tag)
self.assertNotIn('!', tag)
def test_cell_tag_lowercase(self):
"""_cell_tag lowercases the cell name."""
tag = firewall_manager._cell_tag('Office')
self.assertIn('office', tag)
def test_cell_tag_has_pic_cell_prefix(self):
"""_cell_tag always starts with 'pic-cell-'."""
self.assertTrue(firewall_manager._cell_tag('remote').startswith('pic-cell-'))
def test_cell_tag_distinct_from_peer_tag(self):
"""A cell tag must not equal the peer comment for the same string."""
cell_tag = firewall_manager._cell_tag('10.0.0.2')
peer_tag = firewall_manager._peer_comment('10.0.0.2')
self.assertNotEqual(cell_tag, peer_tag)
# ── apply_cell_rules — catch-all DROP ─────────────────────────────────────
def test_apply_cell_rules_sends_catch_all_drop(self):
"""apply_cell_rules always inserts a DROP for the entire vpn_subnet."""
calls = self._capture_apply('office', '10.0.1.0/24', ['calendar'])
subnet_drops = [
c for c in calls
if '-s' in c and '10.0.1.0/24' in c
and '-j' in c and c[c.index('-j') + 1] == 'DROP'
and '-d' not in c # catch-all has no destination
]
self.assertTrue(subnet_drops, "Expected a catch-all DROP rule for the subnet")
def test_apply_cell_rules_sends_accept_for_allowed_service(self):
"""apply_cell_rules inserts ACCEPT for the calendar VIP when calendar is in inbound."""
calls = self._capture_apply('office', '10.0.1.0/24', ['calendar'])
calendar_ip = firewall_manager.SERVICE_IPS['calendar']
calendar_targets = self._targets_for_dest(calls, calendar_ip)
self.assertIn('ACCEPT', calendar_targets)
def test_apply_cell_rules_sends_drop_for_disallowed_service(self):
"""apply_cell_rules inserts DROP for a service not in inbound_services."""
calls = self._capture_apply('office', '10.0.1.0/24', ['calendar'])
files_ip = firewall_manager.SERVICE_IPS['files']
files_targets = self._targets_for_dest(calls, files_ip)
self.assertIn('DROP', files_targets)
# ── apply_cell_rules — empty inbound (all-deny) ───────────────────────────
def test_apply_cell_rules_empty_inbound_all_drop(self):
"""With inbound_services=[], all per-service rules are DROP."""
calls = self._capture_apply('office', '10.0.1.0/24', [])
for service, svc_ip in firewall_manager.SERVICE_IPS.items():
svc_targets = self._targets_for_dest(calls, svc_ip)
self.assertTrue(svc_targets,
f"Expected at least one rule for {service} ({svc_ip})")
self.assertNotIn('ACCEPT', svc_targets,
f"{service} should be DROP when not in inbound_services")
# ── apply_cell_rules — all inbound (all-accept) ───────────────────────────
def test_apply_cell_rules_all_inbound_all_accept(self):
"""With all four services in inbound, all per-service rules are ACCEPT."""
all_services = list(firewall_manager.SERVICE_IPS.keys())
calls = self._capture_apply('office', '10.0.1.0/24', all_services)
for service, svc_ip in firewall_manager.SERVICE_IPS.items():
svc_targets = self._targets_for_dest(calls, svc_ip)
self.assertIn('ACCEPT', svc_targets,
f"{service} should be ACCEPT when in inbound_services")
# ── apply_cell_rules — all rules tagged ───────────────────────────────────
def test_apply_cell_rules_all_rules_tagged_with_cell_tag(self):
"""Every insertion rule must carry the cell's comment tag."""
calls = self._capture_apply('office', '10.0.1.0/24', ['calendar'])
tag = firewall_manager._cell_tag('office')
for c in calls:
if '-I' in c:
self.assertIn(tag, c, f"Rule missing cell tag: {c}")
# ── clear_cell_rules — noop when no matching rules ────────────────────────
def test_clear_cell_rules_noop_when_no_rules(self):
"""When iptables-save returns no pic-cell-office lines, iptables-restore is NOT called."""
save_output = '*filter\n:FORWARD ACCEPT [0:0]\nCOMMIT\n'
def fake_wg_exec(args):
m = MagicMock()
m.returncode = 0
m.stdout = save_output
return m
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
patch('subprocess.run') as mock_restore:
firewall_manager.clear_cell_rules('office')
mock_restore.assert_not_called()
def test_clear_cell_rules_removes_tagged_lines(self):
"""clear_cell_rules removes lines carrying the cell tag and keeps others."""
tag = firewall_manager._cell_tag('office')
save_output = (
'*filter\n'
':FORWARD ACCEPT [0:0]\n'
f'-A FORWARD -s 10.0.1.0/24 -m comment --comment "{tag}" -j DROP\n'
'-A FORWARD -s 10.0.0.2 -m comment --comment "pic-peer-10-0-0-2/32" -j ACCEPT\n'
'COMMIT\n'
)
restored = []
def fake_wg_exec(args):
m = MagicMock()
m.returncode = 0
if args == ['iptables-save']:
m.stdout = save_output
return m
def fake_restore(cmd, input, **kwargs):
restored.append(input)
m = MagicMock()
m.returncode = 0
return m
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
patch('subprocess.run', side_effect=fake_restore):
firewall_manager.clear_cell_rules('office')
self.assertEqual(len(restored), 1)
content = restored[0]
self.assertNotIn(tag, content)
# peer rule for a different entity must survive
self.assertIn('pic-peer-10-0-0-2/32', content)
# ── apply_all_cell_rules ──────────────────────────────────────────────────
def test_apply_all_cell_rules_calls_apply_for_each(self):
"""apply_all_cell_rules calls apply_cell_rules once per link with correct args."""
cell_links = [
{
'cell_name': 'office',
'vpn_subnet': '10.1.0.0/24',
'permissions': {'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {}},
},
{
'cell_name': 'cabin',
'vpn_subnet': '10.2.0.0/24',
'permissions': {'inbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False},
'outbound': {}},
},
]
with patch.object(firewall_manager, 'apply_cell_rules', return_value=True) as mock_apply:
firewall_manager.apply_all_cell_rules(cell_links)
self.assertEqual(mock_apply.call_count, 2)
call_kwargs = {c.args[0]: c.args for c in mock_apply.call_args_list}
self.assertIn('office', call_kwargs)
self.assertIn('cabin', call_kwargs)
office_args = call_kwargs['office']
self.assertEqual(office_args[1], '10.1.0.0/24')
self.assertIn('calendar', office_args[2])
self.assertNotIn('files', office_args[2])
def test_apply_all_cell_rules_skips_links_with_missing_fields(self):
"""Links without cell_name or vpn_subnet are silently skipped."""
cell_links = [
{'vpn_subnet': '10.1.0.0/24'}, # no cell_name
{'cell_name': 'broken'}, # no vpn_subnet
{'cell_name': 'office', 'vpn_subnet': '10.3.0.0/24',
'permissions': {'inbound': {}, 'outbound': {}}},
]
with patch.object(firewall_manager, 'apply_cell_rules', return_value=True) as mock_apply:
firewall_manager.apply_all_cell_rules(cell_links)
# Only the complete entry should be processed
self.assertEqual(mock_apply.call_count, 1)
self.assertEqual(mock_apply.call_args.args[0], 'office')
if __name__ == '__main__':
unittest.main()
+75
View File
@@ -629,5 +629,80 @@ class TestWireGuardSysctlAndPortCheck(unittest.TestCase):
self.assertEqual(statuses, {})
class TestAddCellPeerSubnetOverlap(unittest.TestCase):
"""Verify that add_cell_peer rejects a vpn_subnet that overlaps the local WG network."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.data_dir = os.path.join(self.test_dir, 'data')
self.config_dir = os.path.join(self.test_dir, 'config')
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.config_dir, exist_ok=True)
patcher = patch.object(WireGuardManager, '_syncconf', return_value=None)
self.mock_sync = patcher.start()
self.addCleanup(patcher.stop)
self.wg = WireGuardManager(self.data_dir, self.config_dir)
# Write a known wg0.conf so _get_configured_network() returns 10.0.0.0/24
self._write_wg_conf(address='10.0.0.1/24')
def tearDown(self):
shutil.rmtree(self.test_dir)
def _write_wg_conf(self, address='10.0.0.1/24', port=51820):
conf = (
f'[Interface]\n'
f'PrivateKey = dummykey\n'
f'Address = {address}\n'
f'ListenPort = {port}\n'
)
cf = self.wg._config_file()
os.makedirs(os.path.dirname(cf), exist_ok=True)
with open(cf, 'w') as f:
f.write(conf)
# Public key is 44 chars ending in '=' — required by validation in add_cell_peer
_CELL_PUBKEY = 'cmVtb3RlcHVia2V5X2Zvcl90ZXN0c193Z3Rlc3QxMiE='
def test_add_cell_peer_overlapping_subnet_returns_false(self):
"""vpn_subnet that exactly matches the local WG network must be rejected."""
# local is 10.0.0.0/24; remote is also 10.0.0.0/24 — clear overlap
ok = self.wg.add_cell_peer(
'remote', self._CELL_PUBKEY, '5.6.7.8:51821', '10.0.0.0/24'
)
self.assertFalse(ok)
def test_add_cell_peer_partially_overlapping_subnet_returns_false(self):
"""A remote subnet that contains the local network (e.g. /16 ⊃ /24) is rejected."""
# 10.0.0.0/16 contains 10.0.0.0/24 → overlaps
ok = self.wg.add_cell_peer(
'remote', self._CELL_PUBKEY, '5.6.7.8:51821', '10.0.0.0/16'
)
self.assertFalse(ok)
def test_add_cell_peer_non_overlapping_subnet_accepted(self):
"""A remote subnet distinct from the local WG network must be accepted."""
# local is 10.0.0.0/24; remote is 10.0.1.0/24 — no overlap
ok = self.wg.add_cell_peer(
'remote', self._CELL_PUBKEY, '5.6.7.8:51821', '10.0.1.0/24'
)
self.assertTrue(ok)
def test_add_cell_peer_no_overlap_different_class_a(self):
"""A completely different address space is accepted."""
# local is 10.0.0.0/24; remote is 192.168.5.0/24 — no overlap
ok = self.wg.add_cell_peer(
'remote', self._CELL_PUBKEY, '5.6.7.8:51821', '192.168.5.0/24'
)
self.assertTrue(ok)
def test_add_cell_peer_overlap_check_uses_configured_network(self):
"""When wg0.conf says 172.16.0.1/12, overlapping that range is rejected."""
self._write_wg_conf(address='172.16.0.1/12')
ok = self.wg.add_cell_peer(
'remote', self._CELL_PUBKEY, '5.6.7.8:51821', '172.16.0.0/12'
)
self.assertFalse(ok)
if __name__ == '__main__':
unittest.main()