Add domain conflict validation when changing domain or accepting heal invite
Two gaps allowed a cell to take a domain already in use by a connected cell: 1. PUT /api/config domain change: added check against cell_link_manager's connected cells list before saving — returns 409 if the new domain collides with any connected cell's domain. 2. accept_invite healing path: a remote cell changing its domain via a re-invite was not validated against other connected cells' domains. Now calls _check_invite_conflicts(invite, exclude_cell=name) before applying any change. Also: the healing path now detects domain changes (alongside dns_ip/ vpn_subnet/endpoint), updates the stored domain, and refreshes the DNS forward rule when the domain changes. Tests: 3 new domain-conflict tests in test_config_validation.py; 3 new accept_invite healing tests in test_cell_link_manager.py. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -645,6 +645,56 @@ class TestAcceptInviteNew(unittest.TestCase):
|
||||
self.mgr.accept_invite(SAMPLE_INVITE)
|
||||
self.assertEqual(len(self.mgr.list_connections()), 1)
|
||||
|
||||
def test_accept_invite_domain_change_updates_stored_domain(self):
|
||||
"""accept_invite with a changed domain updates the stored domain."""
|
||||
with patch('firewall_manager.apply_cell_rules'):
|
||||
self.mgr.accept_invite(SAMPLE_INVITE)
|
||||
updated = {**SAMPLE_INVITE, 'domain': 'office-new.cell'}
|
||||
import sys
|
||||
fake_cfg = MagicMock()
|
||||
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
|
||||
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
|
||||
with patch('firewall_manager.apply_cell_rules'):
|
||||
result = self.mgr.accept_invite(updated)
|
||||
self.assertEqual(result['domain'], 'office-new.cell')
|
||||
|
||||
def test_accept_invite_domain_change_updates_dns_forward(self):
|
||||
"""accept_invite with a changed domain removes old DNS forward and adds new."""
|
||||
with patch('firewall_manager.apply_cell_rules'):
|
||||
self.mgr.accept_invite(SAMPLE_INVITE)
|
||||
self.nm.reset_mock()
|
||||
updated = {**SAMPLE_INVITE, 'domain': 'office-new.cell'}
|
||||
import sys
|
||||
fake_cfg = MagicMock()
|
||||
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
|
||||
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
|
||||
with patch('firewall_manager.apply_cell_rules'):
|
||||
self.mgr.accept_invite(updated)
|
||||
self.nm.remove_cell_dns_forward.assert_called_with('office.cell')
|
||||
self.nm.add_cell_dns_forward.assert_called_with(
|
||||
domain='office-new.cell', dns_ip=SAMPLE_INVITE['dns_ip'])
|
||||
|
||||
def test_accept_invite_healing_domain_conflict_raises(self):
|
||||
"""Healing must reject a domain update that conflicts with another connected cell."""
|
||||
import sys
|
||||
fake_cfg = MagicMock()
|
||||
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
|
||||
# Add two cells: 'office' and 'branch'
|
||||
branch_invite = {**SAMPLE_INVITE,
|
||||
'cell_name': 'branch', 'public_key': 'branchpubkey1234567890ABCDEFGH=',
|
||||
'vpn_subnet': '10.9.0.0/24', 'dns_ip': '10.9.0.1',
|
||||
'domain': 'branch.cell'}
|
||||
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
|
||||
with patch('firewall_manager.apply_cell_rules'):
|
||||
self.mgr.accept_invite(SAMPLE_INVITE)
|
||||
self.mgr.accept_invite(branch_invite)
|
||||
# Now 'office' tries to heal its domain to 'branch.cell' — must fail
|
||||
conflicting = {**SAMPLE_INVITE, 'domain': 'branch.cell'}
|
||||
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
self.mgr.accept_invite(conflicting)
|
||||
self.assertIn('branch.cell', str(ctx.exception))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestAddConnectionMutualPairing
|
||||
|
||||
@@ -170,5 +170,44 @@ class TestBodyValidation(unittest.TestCase):
|
||||
self.assertEqual(r.status_code, 200)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Domain conflict validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDomainConflictValidation(unittest.TestCase):
|
||||
"""Changing this cell's domain to one already used by a connected cell → 409."""
|
||||
|
||||
def setUp(self):
|
||||
self.client = _make_client()
|
||||
|
||||
def test_domain_matching_connected_cell_returns_409(self):
|
||||
"""PUT /api/config with domain='other.cell' conflicts with a connected cell."""
|
||||
connected = [{'cell_name': 'remote', 'domain': 'other.cell',
|
||||
'vpn_subnet': '10.5.0.0/24', 'dns_ip': '10.5.0.1'}]
|
||||
with patch('app.cell_link_manager') as mock_clm:
|
||||
mock_clm.list_connections.return_value = connected
|
||||
r = _put(self.client, {'domain': 'other.cell'})
|
||||
self.assertEqual(r.status_code, 409)
|
||||
import json
|
||||
data = json.loads(r.data)
|
||||
self.assertIn('remote', data['error'])
|
||||
|
||||
def test_domain_not_matching_any_cell_is_accepted(self):
|
||||
"""PUT /api/config with a domain not used by any connected cell → 200."""
|
||||
connected = [{'cell_name': 'remote', 'domain': 'other.cell',
|
||||
'vpn_subnet': '10.5.0.0/24', 'dns_ip': '10.5.0.1'}]
|
||||
with patch('app.cell_link_manager') as mock_clm:
|
||||
mock_clm.list_connections.return_value = connected
|
||||
r = _put(self.client, {'domain': 'unique.cell'})
|
||||
self.assertNotEqual(r.status_code, 409)
|
||||
|
||||
def test_domain_no_connected_cells_is_accepted(self):
|
||||
"""PUT /api/config with domain change when no cells are connected → 200."""
|
||||
with patch('app.cell_link_manager') as mock_clm:
|
||||
mock_clm.list_connections.return_value = []
|
||||
r = _put(self.client, {'domain': 'any.cell'})
|
||||
self.assertNotEqual(r.status_code, 409)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user