Fix Phase 1 permission sync: route push via cell-wireguard + DNAT receive

cell-api has no route to remote WG tunnel IPs — only cell-wireguard does.
Fix _push_permissions_to_remote() to use 'docker exec cell-wireguard curl'
so outbound sync HTTP traverses the WG tunnel from the right namespace.

On the receive side, add ensure_cell_api_dnat() which installs three
iptables rules inside cell-wireguard on startup:
  - PREROUTING DNAT: wg0:3000 → cell-api:3000 (Docker bridge IP)
  - POSTROUTING MASQUERADE: so cell-api's reply routes back via wg0
  - FORWARD ACCEPT: allow the wg0→eth0 forwarded traffic

Called from _apply_startup_enforcement() so rules survive container restarts.
Tests updated to mock subprocess.run instead of urllib.request.urlopen.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-01 13:48:49 -04:00
parent a3d0cd5a48
commit 4ba79fd614
5 changed files with 149 additions and 28 deletions
+15 -11
View File
@@ -522,16 +522,18 @@ class TestPermissionSync(unittest.TestCase):
sent_body = {}
def fake_urlopen(req, timeout=None):
def fake_run(cmd, **kwargs):
import json as _j
sent_body.update(_j.loads(req.data))
resp = MagicMock()
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
resp.status = 200
return resp
# Extract the -d payload from curl args
d_idx = cmd.index('-d')
sent_body.update(_j.loads(cmd[d_idx + 1]))
r = MagicMock()
r.returncode = 0
r.stdout = '200'
r.stderr = ''
return r
with patch('urllib.request.urlopen', fake_urlopen):
with patch('subprocess.run', fake_run):
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertTrue(result['ok'])
@@ -544,9 +546,11 @@ class TestPermissionSync(unittest.TestCase):
def test_push_http_error_returns_not_ok(self):
self._add_office()
link = self.mgr.list_connections()[0]
with patch('urllib.request.urlopen',
side_effect=__import__('urllib.error', fromlist=['HTTPError']).HTTPError(
url='', code=503, msg='Service Unavailable', hdrs=None, fp=None)):
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '503'
mock_result.stderr = ''
with patch('subprocess.run', return_value=mock_result):
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertFalse(result['ok'])
self.assertIn('503', result['error'])
+58
View File
@@ -622,5 +622,63 @@ class TestCellRules(unittest.TestCase):
self.assertEqual(mock_apply.call_args.args[0], 'office')
class TestEnsureCellApiDnat(unittest.TestCase):
"""Tests for ensure_cell_api_dnat — DNAT wg0:3000 → cell-api:3000."""
def _wg_exec_no_existing_rules(self, args):
r = MagicMock()
r.returncode = 1 if '-C' in args else 0 # -C = check: fail = not present
r.stdout = ''
r.stderr = ''
return r
def _wg_exec_all_rules_exist(self, args):
r = MagicMock()
r.returncode = 0 # -C succeeds = rule already present
r.stdout = ''
return r
def _inspect_ok(self, api_ip='172.20.0.10'):
r = MagicMock()
r.returncode = 0
r.stdout = api_ip
return r
def test_dnat_rules_added_when_not_present(self):
with patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
patch.object(firewall_manager, '_wg_exec',
side_effect=self._wg_exec_no_existing_rules) as wg_mock:
result = firewall_manager.ensure_cell_api_dnat()
self.assertTrue(result)
calls_args = [c.args[0] for c in wg_mock.call_args_list]
dnat_adds = [a for a in calls_args if 'DNAT' in a and '-A' in a]
self.assertTrue(len(dnat_adds) >= 1, 'DNAT -A rule must be added')
def test_dnat_skipped_if_already_present(self):
with patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
patch.object(firewall_manager, '_wg_exec',
side_effect=self._wg_exec_all_rules_exist) as wg_mock:
result = firewall_manager.ensure_cell_api_dnat()
self.assertTrue(result)
calls_args = [c.args[0] for c in wg_mock.call_args_list]
add_calls = [a for a in calls_args if '-A' in a or '-I' in a]
self.assertEqual(len(add_calls), 0, 'No rules should be added when they already exist')
def test_returns_false_when_cell_api_not_found(self):
r = MagicMock()
r.returncode = 0
r.stdout = ''
with patch.object(firewall_manager, '_run', return_value=r):
result = firewall_manager.ensure_cell_api_dnat()
self.assertFalse(result)
def test_returns_false_on_exception(self):
with patch.object(firewall_manager, '_run', side_effect=RuntimeError('docker gone')):
result = firewall_manager.ensure_cell_api_dnat()
self.assertFalse(result)
if __name__ == '__main__':
unittest.main()