Add subnet conflict validation for wireguard.address and ip_range changes
When a cell is connected to others, changing the local WireGuard address or Docker ip_range to a subnet that overlaps a connected cell's vpn_subnet would break routing. Both now return 409 with the conflicting cell name. - wireguard.address: derive network from new address, check all connected cells' vpn_subnet for overlap (after existing format validation) - ip_range: check all connected cells' vpn_subnet for overlap (after existing RFC-1918 validation) Tests: 4 cases each (overlap → 409, no overlap → ok, no cells → ok, format error still fires first → 400). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+26
-1
@@ -184,6 +184,18 @@ def update_config():
|
|||||||
)}), 400
|
)}), 400
|
||||||
except ValueError as _e:
|
except ValueError as _e:
|
||||||
return jsonify({'error': f'Invalid ip_range: {_e}'}), 400
|
return jsonify({'error': f'Invalid ip_range: {_e}'}), 400
|
||||||
|
from app import cell_link_manager as _clm
|
||||||
|
for _link in _clm.list_connections():
|
||||||
|
try:
|
||||||
|
_cell_net = ipaddress.ip_network(_link['vpn_subnet'], strict=False)
|
||||||
|
if _net.overlaps(_cell_net):
|
||||||
|
return jsonify({'error': (
|
||||||
|
f"ip_range {str(_net)!r} overlaps connected cell "
|
||||||
|
f"'{_link['cell_name']}' ({_link['vpn_subnet']!r}) — "
|
||||||
|
f"use a non-overlapping range"
|
||||||
|
)}), 409
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
_port_fields = {
|
_port_fields = {
|
||||||
'network': ['dns_port'],
|
'network': ['dns_port'],
|
||||||
@@ -221,9 +233,22 @@ def update_config():
|
|||||||
if '/' not in str(_addr):
|
if '/' not in str(_addr):
|
||||||
return jsonify({'error': 'wireguard.address must include a prefix length (e.g. 10.0.0.1/24)'}), 400
|
return jsonify({'error': 'wireguard.address must include a prefix length (e.g. 10.0.0.1/24)'}), 400
|
||||||
try:
|
try:
|
||||||
ipaddress.ip_interface(_addr)
|
_iface = ipaddress.ip_interface(_addr)
|
||||||
except ValueError as _e:
|
except ValueError as _e:
|
||||||
return jsonify({'error': f'wireguard.address is not a valid IP/CIDR: {_e}'}), 400
|
return jsonify({'error': f'wireguard.address is not a valid IP/CIDR: {_e}'}), 400
|
||||||
|
_new_net = _iface.network
|
||||||
|
from app import cell_link_manager as _clm
|
||||||
|
for _link in _clm.list_connections():
|
||||||
|
try:
|
||||||
|
_cell_net = ipaddress.ip_network(_link['vpn_subnet'], strict=False)
|
||||||
|
if _new_net.overlaps(_cell_net):
|
||||||
|
return jsonify({'error': (
|
||||||
|
f"WireGuard subnet {str(_new_net)!r} overlaps connected cell "
|
||||||
|
f"'{_link['cell_name']}' ({_link['vpn_subnet']!r}) — "
|
||||||
|
f"use a non-overlapping address"
|
||||||
|
)}), 409
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
old_identity = dict(config_manager.configs.get('_identity', {}))
|
old_identity = dict(config_manager.configs.get('_identity', {}))
|
||||||
old_svc_configs = {
|
old_svc_configs = {
|
||||||
|
|||||||
@@ -209,5 +209,85 @@ class TestDomainConflictValidation(unittest.TestCase):
|
|||||||
self.assertNotEqual(r.status_code, 409)
|
self.assertNotEqual(r.status_code, 409)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# WireGuard address subnet conflict validation
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestWireGuardAddressConflictValidation(unittest.TestCase):
|
||||||
|
"""Changing wireguard.address to a subnet overlapping a connected cell → 409."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.client = _make_client()
|
||||||
|
self._connected = [{'cell_name': 'remote', 'domain': 'remote.cell',
|
||||||
|
'vpn_subnet': '10.5.0.0/24', 'dns_ip': '10.5.0.1'}]
|
||||||
|
|
||||||
|
def test_wg_address_overlapping_connected_cell_returns_409(self):
|
||||||
|
with patch('app.cell_link_manager') as mock_clm:
|
||||||
|
mock_clm.list_connections.return_value = self._connected
|
||||||
|
r = _put(self.client, {'wireguard': {'address': '10.5.0.1/24'}})
|
||||||
|
self.assertEqual(r.status_code, 409)
|
||||||
|
data = json.loads(r.data)
|
||||||
|
self.assertIn('remote', data['error'])
|
||||||
|
|
||||||
|
def test_wg_address_non_overlapping_connected_cell_accepted(self):
|
||||||
|
with patch('app.cell_link_manager') as mock_clm:
|
||||||
|
mock_clm.list_connections.return_value = self._connected
|
||||||
|
r = _put(self.client, {'wireguard': {'address': '10.6.0.1/24'}})
|
||||||
|
self.assertNotEqual(r.status_code, 409)
|
||||||
|
|
||||||
|
def test_wg_address_no_connected_cells_accepted(self):
|
||||||
|
with patch('app.cell_link_manager') as mock_clm:
|
||||||
|
mock_clm.list_connections.return_value = []
|
||||||
|
r = _put(self.client, {'wireguard': {'address': '10.5.0.1/24'}})
|
||||||
|
self.assertNotEqual(r.status_code, 409)
|
||||||
|
|
||||||
|
def test_wg_address_missing_prefix_still_returns_400(self):
|
||||||
|
"""Format check fires before the conflict check."""
|
||||||
|
with patch('app.cell_link_manager') as mock_clm:
|
||||||
|
mock_clm.list_connections.return_value = self._connected
|
||||||
|
r = _put(self.client, {'wireguard': {'address': '10.5.0.1'}})
|
||||||
|
self.assertEqual(r.status_code, 400)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ip_range subnet conflict validation
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestIpRangeConflictValidation(unittest.TestCase):
|
||||||
|
"""Changing ip_range to one overlapping a connected cell's vpn_subnet → 409."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.client = _make_client()
|
||||||
|
self._connected = [{'cell_name': 'remote', 'domain': 'remote.cell',
|
||||||
|
'vpn_subnet': '10.5.0.0/24', 'dns_ip': '10.5.0.1'}]
|
||||||
|
|
||||||
|
def test_ip_range_overlapping_connected_cell_returns_409(self):
|
||||||
|
with patch('app.cell_link_manager') as mock_clm:
|
||||||
|
mock_clm.list_connections.return_value = self._connected
|
||||||
|
r = _put(self.client, {'ip_range': '10.5.0.0/16'})
|
||||||
|
self.assertEqual(r.status_code, 409)
|
||||||
|
data = json.loads(r.data)
|
||||||
|
self.assertIn('remote', data['error'])
|
||||||
|
|
||||||
|
def test_ip_range_non_overlapping_connected_cell_accepted(self):
|
||||||
|
with patch('app.cell_link_manager') as mock_clm:
|
||||||
|
mock_clm.list_connections.return_value = self._connected
|
||||||
|
r = _put(self.client, {'ip_range': '10.6.0.0/16'})
|
||||||
|
self.assertNotEqual(r.status_code, 409)
|
||||||
|
|
||||||
|
def test_ip_range_no_connected_cells_accepted(self):
|
||||||
|
with patch('app.cell_link_manager') as mock_clm:
|
||||||
|
mock_clm.list_connections.return_value = []
|
||||||
|
r = _put(self.client, {'ip_range': '10.5.0.0/16'})
|
||||||
|
self.assertNotEqual(r.status_code, 409)
|
||||||
|
|
||||||
|
def test_ip_range_non_rfc1918_still_returns_400(self):
|
||||||
|
"""RFC-1918 check fires before the conflict check."""
|
||||||
|
with patch('app.cell_link_manager') as mock_clm:
|
||||||
|
mock_clm.list_connections.return_value = self._connected
|
||||||
|
r = _put(self.client, {'ip_range': '8.8.8.0/24'})
|
||||||
|
self.assertEqual(r.status_code, 400)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user