Fix cross-cell domain access: scope DNAT rules, add Docker→wg0 routing
- firewall_manager: add _get_wg_server_ip() helper; scope ensure_cell_api_dnat(), ensure_dns_dnat(), ensure_service_dnat() DNAT rules with -d server_ip; add ensure_wg_masquerade() (Docker→wg0 MASQUERADE+FORWARD) and ensure_cell_subnet_routes() (host routes via docker run busybox) - wireguard_manager: scope PostUp DNAT rules with -d server_ip in generate_config() and ensure_postup_dnat(); add Docker→wg0 MASQUERADE+FORWARD rules - app.py: call ensure_wg_masquerade() and ensure_cell_subnet_routes() in _apply_startup_enforcement() - tests/test_firewall_manager.py: mock _get_wg_server_ip, add test_dnat_is_scoped_to_server_ip and test_returns_false_when_wg_server_ip_not_found - tests/e2e/wg/test_cell_to_cell_routing.py: rewrite to use dynamic config (no hardcoded IPs/ports), add latency and domain access tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -793,7 +793,7 @@ class TestCellRules(unittest.TestCase):
|
||||
|
||||
|
||||
class TestEnsureCellApiDnat(unittest.TestCase):
|
||||
"""Tests for ensure_cell_api_dnat — DNAT wg0:3000 → cell-api:3000."""
|
||||
"""Tests for ensure_cell_api_dnat — DNAT wg0:3000 (scoped) → cell-api:3000."""
|
||||
|
||||
def _wg_exec_no_existing_rules(self, args):
|
||||
r = MagicMock()
|
||||
@@ -815,7 +815,8 @@ class TestEnsureCellApiDnat(unittest.TestCase):
|
||||
return r
|
||||
|
||||
def test_dnat_rules_added_when_not_present(self):
|
||||
with patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
|
||||
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
||||
patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
|
||||
patch.object(firewall_manager, '_wg_exec',
|
||||
side_effect=self._wg_exec_no_existing_rules) as wg_mock:
|
||||
result = firewall_manager.ensure_cell_api_dnat()
|
||||
@@ -825,8 +826,23 @@ class TestEnsureCellApiDnat(unittest.TestCase):
|
||||
dnat_adds = [a for a in calls_args if 'DNAT' in a and '-A' in a]
|
||||
self.assertTrue(len(dnat_adds) >= 1, 'DNAT -A rule must be added')
|
||||
|
||||
def test_dnat_is_scoped_to_server_ip(self):
|
||||
"""DNAT rule must include -d <server_ip> to avoid intercepting cross-cell traffic."""
|
||||
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
||||
patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
|
||||
patch.object(firewall_manager, '_wg_exec',
|
||||
side_effect=self._wg_exec_no_existing_rules) as wg_mock:
|
||||
firewall_manager.ensure_cell_api_dnat()
|
||||
|
||||
all_args = [c.args[0] for c in wg_mock.call_args_list]
|
||||
dnat_adds = [a for a in all_args if 'DNAT' in a and '-A' in a]
|
||||
for rule in dnat_adds:
|
||||
self.assertIn('10.0.0.1', rule, 'DNAT rule must be scoped to server IP')
|
||||
self.assertIn('-d', rule, 'DNAT rule must use -d to scope to server IP')
|
||||
|
||||
def test_dnat_skipped_if_already_present(self):
|
||||
with patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
|
||||
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
||||
patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
|
||||
patch.object(firewall_manager, '_wg_exec',
|
||||
side_effect=self._wg_exec_all_rules_exist) as wg_mock:
|
||||
result = firewall_manager.ensure_cell_api_dnat()
|
||||
@@ -836,16 +852,23 @@ class TestEnsureCellApiDnat(unittest.TestCase):
|
||||
add_calls = [a for a in calls_args if '-A' in a or '-I' in a]
|
||||
self.assertEqual(len(add_calls), 0, 'No rules should be added when they already exist')
|
||||
|
||||
def test_returns_false_when_wg_server_ip_not_found(self):
|
||||
with patch.object(firewall_manager, '_get_wg_server_ip', return_value=None):
|
||||
result = firewall_manager.ensure_cell_api_dnat()
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_returns_false_when_cell_api_not_found(self):
|
||||
r = MagicMock()
|
||||
r.returncode = 0
|
||||
r.stdout = ''
|
||||
with patch.object(firewall_manager, '_run', return_value=r):
|
||||
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
||||
patch.object(firewall_manager, '_run', return_value=r):
|
||||
result = firewall_manager.ensure_cell_api_dnat()
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_returns_false_on_exception(self):
|
||||
with patch.object(firewall_manager, '_run', side_effect=RuntimeError('docker gone')):
|
||||
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
||||
patch.object(firewall_manager, '_run', side_effect=RuntimeError('docker gone')):
|
||||
result = firewall_manager.ensure_cell_api_dnat()
|
||||
self.assertFalse(result)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user