Files
pic/tests/test_cell_link_manager.py
T
roof 1bb8a5eb59
Unit Tests / test (push) Successful in 9m50s
fix: advertise WireGuard endpoint by domain, and reach linked cells over HTTPS
Three related cell-link/peer-config fixes (the peer and cell endpoints were
showing the raw external IP, which confused public-vs-internal addressing):

1. Peer WireGuard configs now embed the cell's effective domain (DDNS/ACME
   modes) instead of the detected external IP, via the new
   WireGuardManager.get_advertised_endpoint(). A name that resolves to the
   public IP survives IP changes and lets the datacenter forward each cell's
   WG port to the right host. LAN mode still falls back to the IP; an admin
   wireguard_endpoint override still wins.

2. Cell invites advertise <effective-domain>:<this cell's WG port> (was the
   external IP + a default/possibly-wrong port), so a remote cell pairs to the
   right host and port over the public path.

3. Cross-cell peer-sync no longer targets http://<ip>:3000 (the API binds
   127.0.0.1 and is unreachable across cells). It targets the remote's Caddy on
   HTTPS/443 — which the WireGuard server already DNATs over the tunnel — and the
   initial pre-tunnel invite push goes to https://<endpoint-host>/... ; legacy
   http://<ip>:3000 link URLs migrate to https on load.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-16 04:21:16 -04:00

1613 lines
72 KiB
Python

#!/usr/bin/env python3
"""Unit tests for CellLinkManager (cell-to-cell VPN connections)."""
import sys
from pathlib import Path
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
import unittest
import tempfile
import os
import json
import shutil
from unittest.mock import MagicMock, patch
from cell_link_manager import CellLinkManager
def _make_wg_mock():
wg = MagicMock()
wg.get_keys.return_value = {'public_key': 'serverpubkey=', 'private_key': 'serverprivkey='}
wg.get_server_config.return_value = {
'endpoint': '1.2.3.4:51820', 'port': 51820,
'dns_ip': '10.0.0.3', 'split_tunnel_ips': '10.0.0.0/24, 172.20.0.0/16',
}
wg._get_configured_network.return_value = '10.0.0.0/24'
wg._get_configured_address.return_value = '10.0.0.1/24'
wg.get_advertised_endpoint.return_value = '1.2.3.4:51820'
wg.add_cell_peer.return_value = True
wg.remove_peer.return_value = True
return wg
def _make_nm_mock():
nm = MagicMock()
nm.add_cell_dns_forward.return_value = {'restarted': ['cell-dns (reloaded)'], 'warnings': []}
nm.remove_cell_dns_forward.return_value = {'restarted': ['cell-dns (reloaded)'], 'warnings': []}
return nm
SAMPLE_INVITE = {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': '5.6.7.8:51820',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office.cell',
'version': 1,
}
class TestCellLinkManagerInvite(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_generate_invite_has_required_fields(self):
invite = self.mgr.generate_invite('mycell', 'home.cell')
for field in ('cell_name', 'public_key', 'endpoint', 'vpn_subnet', 'dns_ip', 'domain', 'version'):
self.assertIn(field, invite, f"Missing field: {field}")
def test_generate_invite_uses_wg_public_key(self):
invite = self.mgr.generate_invite('mycell', 'home.cell')
self.assertEqual(invite['public_key'], 'serverpubkey=')
def test_generate_invite_uses_configured_network(self):
invite = self.mgr.generate_invite('mycell', 'home.cell')
self.assertEqual(invite['vpn_subnet'], '10.0.0.0/24')
def test_generate_invite_dns_ip_is_server_vpn_ip(self):
invite = self.mgr.generate_invite('mycell', 'home.cell')
self.assertEqual(invite['dns_ip'], '10.0.0.1')
def test_generate_invite_uses_supplied_identity(self):
invite = self.mgr.generate_invite('myhome', 'myhome.local')
self.assertEqual(invite['cell_name'], 'myhome')
self.assertEqual(invite['domain'], 'myhome.local')
def test_generate_invite_endpoint_from_advertised_endpoint(self):
"""The invite endpoint comes from get_advertised_endpoint (domain-aware),
not a raw external IP — so the remote cell reaches us by name + our port."""
self.wg.get_advertised_endpoint.return_value = 'myhome.pic.ngo:51821'
invite = self.mgr.generate_invite('myhome', 'myhome.pic.ngo')
self.assertEqual(invite['endpoint'], 'myhome.pic.ngo:51821')
class TestCellLinkManagerConnections(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_add_connection_stores_link(self):
self.mgr.add_connection(SAMPLE_INVITE)
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['cell_name'], 'office')
def test_add_connection_calls_add_cell_peer(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.wg.add_cell_peer.assert_called_once_with(
name='office',
public_key='officepubkey=',
endpoint='5.6.7.8:51820',
vpn_subnet='10.1.0.0/24',
)
def test_add_connection_calls_dns_forward(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.nm.add_cell_dns_forward.assert_called_once_with(
domain='office.cell', dns_ip='10.1.0.1'
)
def test_add_connection_duplicate_raises(self):
self.mgr.add_connection(SAMPLE_INVITE)
with self.assertRaises(ValueError):
self.mgr.add_connection(SAMPLE_INVITE)
def test_add_connection_persists_to_disk(self):
self.mgr.add_connection(SAMPLE_INVITE)
# Create a fresh manager reading same dir
mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
links = mgr2.list_connections()
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['cell_name'], 'office')
def test_remove_connection_calls_wg_remove_peer(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.mgr.remove_connection('office')
self.wg.remove_peer.assert_called_once_with('officepubkey=')
def test_remove_connection_calls_dns_remove(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.mgr.remove_connection('office')
self.nm.remove_cell_dns_forward.assert_called_once_with('office.cell')
def test_remove_connection_deletes_from_list(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.mgr.remove_connection('office')
self.assertEqual(len(self.mgr.list_connections()), 0)
def test_remove_nonexistent_raises(self):
with self.assertRaises(ValueError):
self.mgr.remove_connection('nobody')
def test_list_connections_empty_by_default(self):
self.assertEqual(self.mgr.list_connections(), [])
def test_multiple_connections(self):
self.mgr.add_connection(SAMPLE_INVITE)
second = {**SAMPLE_INVITE, 'cell_name': 'cabin', 'public_key': 'cabinpubkey=',
'vpn_subnet': '10.2.0.0/24', 'dns_ip': '10.2.0.1', 'domain': 'cabin.cell'}
self.mgr.add_connection(second)
self.assertEqual(len(self.mgr.list_connections()), 2)
# accept_invite — new connection
def test_accept_invite_adds_new_connection(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['cell_name'], 'office')
def test_accept_invite_idempotent_no_change(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.reset_mock()
self.mgr.accept_invite(SAMPLE_INVITE)
# No WG update for identical invite
self.wg.update_peer_ip.assert_not_called()
def test_accept_invite_updates_dns_ip_on_existing(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
updated_invite = {**SAMPLE_INVITE, 'dns_ip': '10.1.0.2'}
with patch('firewall_manager.apply_cell_rules'):
result = self.mgr.accept_invite(updated_invite)
self.assertEqual(result['dns_ip'], '10.1.0.2')
self.assertEqual(result['remote_api_url'], 'https://10.1.0.2')
self.nm.remove_cell_dns_forward.assert_called()
self.nm.add_cell_dns_forward.assert_called_with(
domain='office.cell', dns_ip='10.1.0.2')
def test_accept_invite_updates_vpn_subnet_on_existing(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.update_peer_ip = MagicMock(return_value=True)
updated_invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.5.0.0/24'}
with patch('firewall_manager.apply_cell_rules'):
result = self.mgr.accept_invite(updated_invite)
self.assertEqual(result['vpn_subnet'], '10.5.0.0/24')
self.wg.update_peer_ip.assert_called_once_with('officepubkey=', '10.5.0.0/24')
def test_accept_invite_does_not_duplicate_link(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.mgr.accept_invite({**SAMPLE_INVITE, 'dns_ip': '10.1.0.99'})
self.assertEqual(len(self.mgr.list_connections()), 1)
if __name__ == '__main__':
unittest.main()
# ---------------------------------------------------------------------------
# TestCheckInviteConflicts
# ---------------------------------------------------------------------------
class TestCheckInviteConflicts(unittest.TestCase):
"""Tests for CellLinkManager._check_invite_conflicts."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
# wg._get_configured_network returns '10.0.0.0/24' (own subnet)
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_existing_cell(self, cell_name='cabin', vpn_subnet='10.2.0.0/24',
domain='cabin.cell'):
"""Add a cell link directly to disk without going through add_connection."""
links = [{
'cell_name': cell_name,
'public_key': 'cabinpubkey=',
'endpoint': '9.9.9.9:51820',
'vpn_subnet': vpn_subnet,
'dns_ip': '10.2.0.1',
'domain': domain,
'permissions': {'inbound': {}, 'outbound': {}},
'remote_api_url': f'http://10.2.0.1:3000',
'pending_push': False,
'last_push_status': 'ok',
'last_push_at': None,
'last_push_error': None,
'last_remote_update_at': None,
}]
with open(os.path.join(self.test_dir, 'cell_links.json'), 'w') as f:
json.dump(links, f)
# --- subnet conflicts ---
def test_subnet_overlaps_own_subnet_raises(self):
"""Invite whose vpn_subnet overlaps our own subnet raises ValueError."""
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.0.0.0/24'} # same as own
with self.assertRaises(ValueError) as ctx:
self.mgr._check_invite_conflicts(invite)
self.assertIn('subnet', str(ctx.exception).lower())
def test_subnet_overlaps_own_subnet_partial_raises(self):
"""Invite whose vpn_subnet partially overlaps our own subnet raises ValueError."""
# Own is 10.0.0.0/24; this /16 contains it
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.0.0.0/16'}
with self.assertRaises(ValueError):
self.mgr._check_invite_conflicts(invite)
def test_subnet_overlaps_connected_cell_raises(self):
"""Invite whose vpn_subnet overlaps an already-connected cell raises ValueError."""
self._add_existing_cell(vpn_subnet='10.2.0.0/24')
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.2.0.0/24'}
with self.assertRaises(ValueError) as ctx:
self.mgr._check_invite_conflicts(invite)
self.assertIn('cabin', str(ctx.exception))
def test_subnet_no_conflict_does_not_raise(self):
"""Invite with a non-overlapping subnet passes without error."""
self._add_existing_cell(vpn_subnet='10.2.0.0/24')
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'other.cell'}
# Should not raise
self.mgr._check_invite_conflicts(invite)
# --- domain conflicts ---
def test_domain_matches_own_domain_raises(self):
"""Invite with a domain equal to this cell's own domain raises ValueError."""
with patch('cell_link_manager.CellLinkManager._check_invite_conflicts',
wraps=self.mgr._check_invite_conflicts):
# Patch config_manager inside the function
with patch('cell_link_manager.os.environ.get', return_value='home.cell'):
# Use a fresh invite whose domain matches env-derived own domain
invite = {**SAMPLE_INVITE,
'vpn_subnet': '10.3.0.0/24',
'domain': 'home.cell'}
# Manually test via app import patch
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
with self.assertRaises(ValueError) as ctx:
self.mgr._check_invite_conflicts(invite)
self.assertIn('domain', str(ctx.exception).lower())
def test_domain_matches_connected_cell_raises(self):
"""Invite with a domain already used by a connected cell raises ValueError."""
self._add_existing_cell(domain='cabin.cell', vpn_subnet='10.2.0.0/24')
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'cabin.cell'}
with self.assertRaises(ValueError) as ctx:
self.mgr._check_invite_conflicts(invite)
self.assertIn('cabin', str(ctx.exception))
# --- exclude_cell parameter ---
def test_exclude_cell_skips_that_cell_subnet_check(self):
"""With exclude_cell set, the named cell is skipped in subnet conflict check."""
self._add_existing_cell(cell_name='cabin', vpn_subnet='10.2.0.0/24',
domain='cabin.cell')
# Same subnet as cabin — normally a conflict, but excluded
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.2.0.0/24', 'domain': 'cabin.cell'}
# Should not raise because 'cabin' is excluded
self.mgr._check_invite_conflicts(invite, exclude_cell='cabin')
def test_exclude_cell_skips_that_cell_domain_check(self):
"""With exclude_cell set, the named cell is skipped in domain conflict check."""
self._add_existing_cell(cell_name='cabin', vpn_subnet='10.2.0.0/24',
domain='cabin.cell')
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.9.0.0/24', 'domain': 'cabin.cell'}
# Should not raise — cabin excluded
self.mgr._check_invite_conflicts(invite, exclude_cell='cabin')
def test_exclude_cell_still_checks_other_cells(self):
"""Excluding 'cabin' does not suppress conflict with a different cell."""
self._add_existing_cell(cell_name='cabin', vpn_subnet='10.2.0.0/24',
domain='cabin.cell')
# Add a second cell manually
with open(os.path.join(self.test_dir, 'cell_links.json')) as f:
links = json.load(f)
links.append({
'cell_name': 'office',
'public_key': 'officepubkey=',
'vpn_subnet': '10.3.0.0/24',
'dns_ip': '10.3.0.1',
'domain': 'office.cell',
'permissions': {'inbound': {}, 'outbound': {}},
'remote_api_url': 'http://10.3.0.1:3000',
'pending_push': False,
'last_push_status': 'ok',
'last_push_at': None,
'last_push_error': None,
'last_remote_update_at': None,
})
with open(os.path.join(self.test_dir, 'cell_links.json'), 'w') as f:
json.dump(links, f)
# Conflicts with 'office', but we only exclude 'cabin'
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'new.cell'}
with self.assertRaises(ValueError):
self.mgr._check_invite_conflicts(invite, exclude_cell='cabin')
# ---------------------------------------------------------------------------
# TestPushInviteToRemote
# ---------------------------------------------------------------------------
class TestPushInviteToRemote(unittest.TestCase):
"""Tests for CellLinkManager._push_invite_to_remote."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _make_link(self, endpoint='192.168.1.50:51820'):
return {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': endpoint,
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office.cell',
'remote_api_url': 'http://10.1.0.1:3000',
}
def _fake_identity(self):
return {'cell_name': 'home', 'public_key': 'homepubkey='}
def test_push_invite_success_2xx_returns_ok_true(self):
"""curl returning a 2xx status code → {'ok': True}."""
link = self._make_link()
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '201'
mock_result.stderr = ''
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('cell_link_manager.os.environ.get', return_value='home.cell'), \
patch('subprocess.run', return_value=mock_result):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
result = self.mgr._push_invite_to_remote(link)
self.assertTrue(result['ok'])
def test_push_invite_4xx_returns_ok_false_with_http_error(self):
"""curl returning a 4xx status code → {'ok': False, 'error': 'HTTP 4xx'}."""
link = self._make_link()
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '400'
mock_result.stderr = ''
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('subprocess.run', return_value=mock_result):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
self.assertIn('400', result['error'])
def test_push_invite_no_endpoint_returns_ok_false(self):
"""Link with no endpoint → {'ok': False, 'error': 'no endpoint'}."""
link = self._make_link(endpoint='')
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
self.assertIn('endpoint', result['error'].lower())
def test_push_invite_none_endpoint_returns_ok_false(self):
"""Link with endpoint=None → {'ok': False, 'error': 'no endpoint'}."""
link = self._make_link(endpoint='')
link['endpoint'] = None
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
def test_push_invite_subprocess_error_returns_ok_false(self):
"""subprocess.run raising an exception → {'ok': False, 'error': ...}."""
link = self._make_link()
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('subprocess.run', side_effect=OSError('command not found')):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
self.assertIsNotNone(result['error'])
def test_push_invite_curl_nonzero_returncode_returns_ok_false(self):
"""curl subprocess returning nonzero returncode → {'ok': False}."""
link = self._make_link()
mock_result = MagicMock()
mock_result.returncode = 1
mock_result.stdout = ''
mock_result.stderr = 'connection refused'
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('subprocess.run', return_value=mock_result):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
def test_push_invite_sends_to_endpoint_host_over_https(self):
"""The curl targets the endpoint host on Caddy/HTTPS (443), not the WG
dns_ip and not the internal :3000 API port."""
link = self._make_link(endpoint='alice.pic.ngo:51821')
captured = {}
def fake_run(cmd, **kw):
captured['cmd'] = cmd
r = MagicMock()
r.returncode = 0
r.stdout = '201'
r.stderr = ''
return r
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('subprocess.run', fake_run):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
self.mgr._push_invite_to_remote(link)
url_in_cmd = captured['cmd'][-1]
self.assertEqual(url_in_cmd,
'https://alice.pic.ngo/api/cells/peer-sync/accept-invite')
self.assertNotIn(':3000', url_in_cmd)
self.assertNotIn('10.1.0.1', url_in_cmd) # not the WG dns_ip
self.assertIn('-k', captured['cmd']) # cert may not match a bare IP
# ---------------------------------------------------------------------------
# TestAcceptInviteNew
# ---------------------------------------------------------------------------
class TestAcceptInviteNew(unittest.TestCase):
"""Tests for CellLinkManager.accept_invite — new connection path."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_accept_invite_new_cell_adds_wg_peer(self):
"""accept_invite for a new cell calls add_cell_peer on WG manager."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.add_cell_peer.assert_called_once_with(
name='office',
public_key='officepubkey=',
endpoint='5.6.7.8:51820',
vpn_subnet='10.1.0.0/24',
)
def test_accept_invite_new_cell_adds_dns_forward(self):
"""accept_invite for a new cell calls add_cell_dns_forward on NM."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.nm.add_cell_dns_forward.assert_called_once_with(
domain='office.cell', dns_ip='10.1.0.1')
def test_accept_invite_new_cell_saves_link(self):
"""accept_invite for a new cell saves the link and returns it."""
with patch('firewall_manager.apply_cell_rules'):
link = self.mgr.accept_invite(SAMPLE_INVITE)
self.assertEqual(link['cell_name'], 'office')
self.assertEqual(len(self.mgr.list_connections()), 1)
def test_accept_invite_new_cell_sets_pending_push_true(self):
"""New link from accept_invite starts with pending_push=True (no push done)."""
with patch('firewall_manager.apply_cell_rules'):
link = self.mgr.accept_invite(SAMPLE_INVITE)
self.assertTrue(link['pending_push'])
def test_accept_invite_missing_cell_name_raises(self):
"""Invite missing 'cell_name' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'cell_name'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('cell_name', str(ctx.exception))
def test_accept_invite_missing_public_key_raises(self):
"""Invite missing 'public_key' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'public_key'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('public_key', str(ctx.exception))
def test_accept_invite_missing_vpn_subnet_raises(self):
"""Invite missing 'vpn_subnet' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'vpn_subnet'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('vpn_subnet', str(ctx.exception))
def test_accept_invite_missing_dns_ip_raises(self):
"""Invite missing 'dns_ip' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'dns_ip'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('dns_ip', str(ctx.exception))
def test_accept_invite_missing_domain_raises(self):
"""Invite missing 'domain' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'domain'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('domain', str(ctx.exception))
def test_accept_invite_subnet_conflict_raises(self):
"""accept_invite raises ValueError when subnet conflicts with own subnet."""
conflicting = {**SAMPLE_INVITE, 'vpn_subnet': '10.0.0.0/24'} # same as own
with self.assertRaises(ValueError):
self.mgr.accept_invite(conflicting)
def test_accept_invite_already_connected_no_change_returns_existing(self):
"""Calling accept_invite again with identical data returns existing link unchanged."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.reset_mock()
result = self.mgr.accept_invite(SAMPLE_INVITE)
self.assertEqual(result['cell_name'], 'office')
# No second WG peer add
self.wg.add_cell_peer.assert_not_called()
def test_accept_invite_already_connected_dns_ip_change_updates(self):
"""accept_invite with changed dns_ip updates the link and DNS forward."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
updated = {**SAMPLE_INVITE, 'dns_ip': '10.1.0.5'}
with patch('firewall_manager.apply_cell_rules'):
result = self.mgr.accept_invite(updated)
self.assertEqual(result['dns_ip'], '10.1.0.5')
self.assertEqual(result['remote_api_url'], 'https://10.1.0.5')
self.nm.remove_cell_dns_forward.assert_called()
self.nm.add_cell_dns_forward.assert_called_with(
domain='office.cell', dns_ip='10.1.0.5')
def test_accept_invite_already_connected_dns_ip_change_does_not_duplicate(self):
"""DNS ip update via accept_invite must not create a second link."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.mgr.accept_invite({**SAMPLE_INVITE, 'dns_ip': '10.1.0.5'})
self.assertEqual(len(self.mgr.list_connections()), 1)
def test_accept_invite_already_connected_vpn_subnet_change_calls_update_peer_ip(self):
"""accept_invite with changed vpn_subnet calls update_peer_ip on WG manager."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.update_peer_ip = MagicMock(return_value=True)
updated = {**SAMPLE_INVITE, 'vpn_subnet': '10.5.0.0/24'}
with patch('firewall_manager.apply_cell_rules'):
result = self.mgr.accept_invite(updated)
self.assertEqual(result['vpn_subnet'], '10.5.0.0/24')
self.wg.update_peer_ip.assert_called_once_with('officepubkey=', '10.5.0.0/24')
def test_accept_invite_already_connected_vpn_subnet_change_reapplies_firewall(self):
"""accept_invite with changed vpn_subnet triggers apply_cell_rules."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.update_peer_ip = MagicMock(return_value=True)
updated = {**SAMPLE_INVITE, 'vpn_subnet': '10.5.0.0/24'}
with patch('firewall_manager.apply_cell_rules') as mock_rules:
self.mgr.accept_invite(updated)
mock_rules.assert_called()
def test_accept_invite_does_not_duplicate_on_repeated_call(self):
"""Multiple calls with the same invite must leave exactly one link."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.mgr.accept_invite(SAMPLE_INVITE)
self.assertEqual(len(self.mgr.list_connections()), 1)
def test_accept_invite_domain_change_updates_stored_domain(self):
"""accept_invite with a changed domain updates the stored domain."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
updated = {**SAMPLE_INVITE, 'domain': 'office-new.cell'}
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
with patch('firewall_manager.apply_cell_rules'):
result = self.mgr.accept_invite(updated)
self.assertEqual(result['domain'], 'office-new.cell')
def test_accept_invite_domain_change_updates_dns_forward(self):
"""accept_invite with a changed domain removes old DNS forward and adds new."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.nm.reset_mock()
updated = {**SAMPLE_INVITE, 'domain': 'office-new.cell'}
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(updated)
self.nm.remove_cell_dns_forward.assert_called_with('office.cell')
self.nm.add_cell_dns_forward.assert_called_with(
domain='office-new.cell', dns_ip=SAMPLE_INVITE['dns_ip'])
def test_accept_invite_healing_domain_conflict_raises(self):
"""Healing must reject a domain update that conflicts with another connected cell."""
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
# Add two cells: 'office' and 'branch'
branch_invite = {**SAMPLE_INVITE,
'cell_name': 'branch', 'public_key': 'branchpubkey1234567890ABCDEFGH=',
'vpn_subnet': '10.9.0.0/24', 'dns_ip': '10.9.0.1',
'domain': 'branch.cell'}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.mgr.accept_invite(branch_invite)
# Now 'office' tries to heal its domain to 'branch.cell' — must fail
conflicting = {**SAMPLE_INVITE, 'domain': 'branch.cell'}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(conflicting)
self.assertIn('branch.cell', str(ctx.exception))
# ---------------------------------------------------------------------------
# TestAddConnectionMutualPairing
# ---------------------------------------------------------------------------
class TestAddConnectionMutualPairing(unittest.TestCase):
"""Tests for add_connection's mutual pairing via _push_invite_to_remote."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_with_push(self, push_result):
push_mock = MagicMock(return_value=push_result)
perm_push = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._push_invite_to_remote', push_mock), \
patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', perm_push), \
patch('firewall_manager.apply_cell_rules'):
link = self.mgr.add_connection(SAMPLE_INVITE)
return link, push_mock
def test_add_connection_calls_push_invite_to_remote(self):
"""add_connection calls _push_invite_to_remote after adding the connection."""
_, push_mock = self._add_with_push({'ok': True, 'error': None})
push_mock.assert_called_once()
def test_add_connection_push_invite_failure_is_nonfatal(self):
"""_push_invite_to_remote failure does not prevent connection creation."""
link, _ = self._add_with_push({'ok': False, 'error': 'connection refused'})
conns = self.mgr.list_connections()
self.assertEqual(len(conns), 1)
self.assertEqual(conns[0]['cell_name'], 'office')
def test_add_connection_push_invite_failure_link_still_stored(self):
"""Even when push fails, the link is persisted to disk."""
_, _ = self._add_with_push({'ok': False, 'error': 'timeout'})
mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
self.assertEqual(len(mgr2.list_connections()), 1)
def test_add_connection_with_inbound_services_sets_permissions(self):
"""inbound_services passed to add_connection sets permissions correctly."""
perm_push = MagicMock(return_value={'ok': True, 'error': None})
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._push_invite_to_remote', push_mock), \
patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', perm_push), \
patch('firewall_manager.apply_cell_rules'):
link = self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['calendar'])
self.assertTrue(link['permissions']['inbound']['calendar'])
self.assertFalse(link['permissions']['inbound']['files'])
def test_add_connection_push_invite_exception_is_nonfatal(self):
"""Exception from _push_invite_to_remote is caught and does not raise."""
perm_push = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._push_invite_to_remote',
side_effect=RuntimeError('docker not available')), \
patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', perm_push), \
patch('firewall_manager.apply_cell_rules'):
link = self.mgr.add_connection(SAMPLE_INVITE)
self.assertEqual(link['cell_name'], 'office')
# ---------------------------------------------------------------------------
# TestAddConnectionAtomicity
# ---------------------------------------------------------------------------
class TestAddConnectionAtomicity(unittest.TestCase):
"""Verify that add_connection rolls back correctly when WG or DNS steps fail."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_wg_fail_does_not_call_dns(self):
"""When add_cell_peer returns False, add_cell_dns_forward must NOT be called."""
self.wg.add_cell_peer.return_value = False
with self.assertRaises(RuntimeError):
self.mgr.add_connection(SAMPLE_INVITE)
self.nm.add_cell_dns_forward.assert_not_called()
def test_wg_fail_does_not_persist_link(self):
"""When WG fails, list_connections() must still return [] (nothing persisted)."""
self.wg.add_cell_peer.return_value = False
with self.assertRaises(RuntimeError):
self.mgr.add_connection(SAMPLE_INVITE)
self.assertEqual(self.mgr.list_connections(), [])
def test_wg_fail_raises_runtime_error(self):
"""add_connection raises RuntimeError (not some other exception) when WG fails."""
self.wg.add_cell_peer.return_value = False
with self.assertRaises(RuntimeError):
self.mgr.add_connection(SAMPLE_INVITE)
def test_dns_warning_still_persists_link(self):
"""When DNS returns warnings (not a hard failure), the link IS still saved."""
self.nm.add_cell_dns_forward.return_value = {
'restarted': [],
'warnings': ['CoreDNS reload timed out'],
}
self.mgr.add_connection(SAMPLE_INVITE)
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['cell_name'], 'office')
def test_dns_warning_does_not_raise(self):
"""When DNS returns warnings, add_connection completes without raising."""
self.nm.add_cell_dns_forward.return_value = {
'restarted': [],
'warnings': ['CoreDNS reload timed out'],
}
try:
self.mgr.add_connection(SAMPLE_INVITE)
except Exception as e:
self.fail(f"add_connection raised unexpectedly with DNS warnings: {e}")
# ---------------------------------------------------------------------------
# TestAddConnectionPermissions
# ---------------------------------------------------------------------------
class TestAddConnectionPermissions(unittest.TestCase):
"""Verify that inbound_services controls the permissions field on the saved link."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _get_link(self):
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
return links[0]
def test_add_with_no_inbound_defaults_all_deny(self):
"""No inbound_services arg → all inbound permissions False."""
self.mgr.add_connection(SAMPLE_INVITE)
link = self._get_link()
inbound = link['permissions']['inbound']
for service, allowed in inbound.items():
self.assertFalse(allowed, f"Expected {service} to be False, got {allowed}")
def test_add_with_inbound_services_sets_them(self):
"""inbound_services=['calendar','files'] → those two True, others False."""
self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['calendar', 'files'])
link = self._get_link()
inbound = link['permissions']['inbound']
self.assertTrue(inbound['calendar'])
self.assertTrue(inbound['files'])
self.assertFalse(inbound['mail'])
self.assertFalse(inbound['webdav'])
def test_inbound_invalid_service_ignored(self):
"""Passing 'badservice' in inbound_services does not appear in permissions."""
self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['badservice', 'calendar'])
link = self._get_link()
inbound = link['permissions']['inbound']
self.assertNotIn('badservice', inbound)
# valid one was still applied
self.assertTrue(inbound['calendar'])
# ---------------------------------------------------------------------------
# TestUpdatePermissions
# ---------------------------------------------------------------------------
class TestUpdatePermissions(unittest.TestCase):
"""Tests for the new update_permissions / get_permissions methods."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
# Add a connection so there is something to update
self.mgr.add_connection(SAMPLE_INVITE)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_update_sets_inbound_values(self):
"""update_permissions with inbound={'calendar': True} persists correctly."""
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
mock_fm.apply_cell_rules = MagicMock()
self.mgr.update_permissions('office', {'calendar': True}, {})
# Re-read from disk to confirm persistence
mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
perms = mgr2.get_permissions('office')
self.assertTrue(perms['inbound']['calendar'])
self.assertFalse(perms['inbound']['files'])
def test_update_rejects_unknown_service_by_cleaning_it_out(self):
"""update_permissions with inbound={'bad': True} — 'bad' must not appear in saved perms."""
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
mock_fm.apply_cell_rules = MagicMock()
self.mgr.update_permissions('office', {'bad': True, 'calendar': True}, {})
perms = self.mgr.get_permissions('office')
self.assertNotIn('bad', perms['inbound'])
self.assertTrue(perms['inbound']['calendar'])
def test_update_nonexistent_cell_raises(self):
"""update_permissions on an unknown cell_name raises ValueError."""
with self.assertRaises(ValueError):
self.mgr.update_permissions('nosuchcell', {}, {})
def test_get_permissions_returns_correct(self):
"""get_permissions returns the dict that was saved by update_permissions."""
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
mock_fm.apply_cell_rules = MagicMock()
self.mgr.update_permissions(
'office',
inbound={'calendar': True, 'files': False},
outbound={'mail': True},
)
perms = self.mgr.get_permissions('office')
self.assertIn('inbound', perms)
self.assertIn('outbound', perms)
self.assertTrue(perms['inbound']['calendar'])
self.assertFalse(perms['inbound']['files'])
self.assertTrue(perms['outbound']['mail'])
def test_get_permissions_nonexistent_cell_raises(self):
"""get_permissions on an unknown cell_name raises ValueError."""
with self.assertRaises(ValueError):
self.mgr.get_permissions('nosuchcell')
# ---------------------------------------------------------------------------
# TestLoadMigration
# ---------------------------------------------------------------------------
class TestLoadMigration(unittest.TestCase):
"""Verify _load() lazily injects permissions field when it is missing."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_load_injects_permissions_if_missing(self):
"""Write cell_links.json without permissions; _load should add all-False defaults."""
links_file = os.path.join(self.test_dir, 'cell_links.json')
legacy_links = [
{
'cell_name': 'legacy-office',
'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'legacy-office.cell',
# NO 'permissions' key — simulates pre-migration data
}
]
with open(links_file, 'w') as f:
json.dump(legacy_links, f)
mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
links = mgr.list_connections()
self.assertEqual(len(links), 1)
link = links[0]
self.assertIn('permissions', link)
perms = link['permissions']
self.assertIn('inbound', perms)
self.assertIn('outbound', perms)
for service in ('calendar', 'files', 'mail', 'webdav'):
self.assertFalse(perms['inbound'][service])
self.assertFalse(perms['outbound'][service])
def test_load_migration_persists_to_disk(self):
"""After migration, re-loading the same file returns the injected permissions."""
links_file = os.path.join(self.test_dir, 'cell_links.json')
with open(links_file, 'w') as f:
json.dump([{
'cell_name': 'old-cell',
'public_key': 'somepubkey=',
'vpn_subnet': '10.2.0.0/24',
'dns_ip': '10.2.0.1',
'domain': 'old-cell.cell',
}], f)
mgr1 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
mgr1.list_connections() # triggers migration + save
# Read the file directly and confirm permissions are now on disk
with open(links_file) as f:
raw = json.load(f)
self.assertIn('permissions', raw[0])
class TestPermissionSync(unittest.TestCase):
"""Tests for Phase 1: permission sync between connected PIC cells."""
INVITE = {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': '5.6.7.8:51820',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office.cell',
'version': 1,
}
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_office(self, push_ok=True):
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': push_ok, 'error': None if push_ok else 'conn refused'}), \
patch('firewall_manager.apply_cell_rules'):
return self.mgr.add_connection(self.INVITE)
# ── add_connection ────────────────────────────────────────────────────────
def test_add_connection_includes_sync_fields(self):
link = self._add_office()
self.assertIn('remote_api_url', link)
self.assertIn('pending_push', link)
self.assertIn('last_push_status', link)
self.assertIn('last_push_at', link)
self.assertIn('last_remote_update_at', link)
def test_add_connection_sets_remote_api_url_from_dns_ip(self):
link = self._add_office()
# Cross-cell API is reached over the tunnel via Caddy/443, not :3000.
self.assertEqual(link['remote_api_url'], 'https://10.1.0.1')
def test_add_connection_triggers_push(self):
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \
patch('firewall_manager.apply_cell_rules'):
self.mgr.add_connection(self.INVITE)
push_mock.assert_called_once()
call_args = push_mock.call_args[0]
self.assertEqual(call_args[1], 'home') # from_cell
self.assertEqual(call_args[2], 'homepubkey=') # from_public_key
def test_add_connection_push_failure_does_not_abort_add(self):
link = self._add_office(push_ok=False)
conns = self.mgr.list_connections()
self.assertEqual(len(conns), 1)
self.assertEqual(conns[0]['cell_name'], 'office')
self.assertTrue(conns[0]['pending_push'])
def test_add_connection_push_success_clears_pending(self):
self._add_office(push_ok=True)
link = self.mgr.list_connections()[0]
self.assertFalse(link['pending_push'])
self.assertEqual(link['last_push_status'], 'ok')
# ── update_permissions ────────────────────────────────────────────────────
def test_update_permissions_push_succeeds_clears_pending(self):
self._add_office()
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \
patch('firewall_manager.apply_cell_rules'):
self.mgr.update_permissions('office',
{'calendar': True}, {'files': False})
link = self.mgr.list_connections()[0]
self.assertFalse(link['pending_push'])
self.assertEqual(link['last_push_status'], 'ok')
self.assertIsNotNone(link['last_push_at'])
def test_update_permissions_push_failure_keeps_local_save(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': False, 'error': 'timeout'}), \
patch('firewall_manager.apply_cell_rules'):
result = self.mgr.update_permissions('office',
{'calendar': True}, {})
# Local save must have happened — calendar is True
self.assertTrue(result['permissions']['inbound']['calendar'])
link = self.mgr.list_connections()[0]
self.assertTrue(link['pending_push'])
self.assertEqual(link['last_push_status'], 'failed')
def test_update_permissions_does_not_raise_on_push_exception(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._local_identity',
side_effect=RuntimeError('no app context')), \
patch('firewall_manager.apply_cell_rules'):
# Must not raise
result = self.mgr.update_permissions('office', {}, {})
self.assertIn('permissions', result)
# ── _push_permissions_to_remote (unit) ────────────────────────────────────
def test_push_mirrors_inbound_outbound(self):
"""Our inbound (what we share) must become their outbound in the body."""
self._add_office()
link = self.mgr.list_connections()[0]
link['permissions'] = {
'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False},
}
sent_body = {}
def fake_run(cmd, **kwargs):
import json as _j
if '-d' not in cmd:
# ip addr show wg0 — return a fake wg0 address
r = MagicMock()
r.returncode = 0
r.stdout = 'inet 10.0.0.1/24 scope global wg0\n'
return r
d_idx = cmd.index('-d')
sent_body.update(_j.loads(cmd[d_idx + 1]))
r = MagicMock()
r.returncode = 0
r.stdout = '200'
r.stderr = ''
return r
with patch('subprocess.run', fake_run):
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertTrue(result['ok'])
pushed_perms = sent_body['permissions']
# Our inbound=calendar:True → their outbound=calendar:True
self.assertTrue(pushed_perms['outbound']['calendar'])
# Our outbound=files:True → their inbound=files:True
self.assertTrue(pushed_perms['inbound']['files'])
def test_push_xff_header_carries_local_wg_ip(self):
"""Curl command must include X-Forwarded-For with local WG IP.
MASQUERADE rewrites source to Docker bridge IP. Without XFF the remote
can't match the sender's VPN subnet and returns 403.
"""
self._add_office()
link = self.mgr.list_connections()[0]
captured_cmd = {}
def fake_run(cmd, **kwargs):
if '-d' not in cmd:
r = MagicMock()
r.returncode = 0
r.stdout = ' inet 10.0.0.1/24 scope global wg0\n'
return r
captured_cmd['cmd'] = cmd
r = MagicMock()
r.returncode = 0
r.stdout = '200'
r.stderr = ''
return r
with patch('subprocess.run', fake_run):
self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
cmd = captured_cmd.get('cmd', [])
# X-Forwarded-For header must be in the curl command
self.assertIn('-H', cmd)
xff_idx = [i for i, x in enumerate(cmd) if x == '-H' and i + 1 < len(cmd) and 'X-Forwarded-For' in cmd[i + 1]]
self.assertTrue(xff_idx, 'X-Forwarded-For header missing from curl command')
xff_val = cmd[xff_idx[0] + 1]
self.assertIn('10.0.0.1', xff_val)
def test_push_http_error_returns_not_ok(self):
self._add_office()
link = self.mgr.list_connections()[0]
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '503'
mock_result.stderr = ''
def fake_run(cmd, **kwargs):
if '-d' not in cmd:
r = MagicMock()
r.returncode = 0
r.stdout = ' inet 10.0.0.1/24 scope global wg0\n'
return r
return mock_result
with patch('subprocess.run', fake_run):
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertFalse(result['ok'])
self.assertIn('503', result['error'])
def test_push_no_remote_api_url_returns_not_ok(self):
self._add_office()
link = self.mgr.list_connections()[0]
link['remote_api_url'] = None
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertFalse(result['ok'])
# ── apply_remote_permissions ──────────────────────────────────────────────
def test_apply_remote_permissions_stores_by_pubkey(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules'):
updated = self.mgr.apply_remote_permissions(
'officepubkey=',
{'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False}},
)
self.assertTrue(updated['permissions']['inbound']['calendar'])
self.assertTrue(updated['permissions']['outbound']['files'])
# Persisted to disk
link = self.mgr.list_connections()[0]
self.assertTrue(link['permissions']['inbound']['calendar'])
self.assertIsNotNone(link['last_remote_update_at'])
def test_apply_remote_permissions_unknown_pubkey_raises(self):
self._add_office()
with self.assertRaises(ValueError):
self.mgr.apply_remote_permissions('nosuchkey=', {})
def test_apply_remote_permissions_calls_apply_cell_rules(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules') as mock_rules:
self.mgr.apply_remote_permissions(
'officepubkey=',
{'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {}},
)
mock_rules.assert_called_once_with('office', '10.1.0.0/24', ['calendar'],
exit_relay=False)
# ── replay_pending_pushes ─────────────────────────────────────────────────
def test_replay_retries_pending_links(self):
self._add_office(push_ok=False) # leaves pending_push=True + next_retry_at set
links = self.mgr._load()
links[0]['next_retry_at'] = None # simulate backoff window elapsed
self.mgr._save(links)
self.assertTrue(self.mgr.list_connections()[0]['pending_push'])
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock):
summary = self.mgr.replay_pending_pushes()
push_mock.assert_called_once()
self.assertEqual(summary['attempted'], 1)
self.assertEqual(summary['ok'], 1)
self.assertFalse(self.mgr.list_connections()[0]['pending_push'])
def test_replay_skips_non_pending_links(self):
self._add_office(push_ok=True) # pending_push=False after success
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock):
summary = self.mgr.replay_pending_pushes()
push_mock.assert_not_called()
self.assertEqual(summary['attempted'], 0)
def test_replay_push_failure_leaves_pending(self):
self._add_office(push_ok=False)
links = self.mgr._load()
links[0]['next_retry_at'] = None # simulate backoff window elapsed
self.mgr._save(links)
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': False, 'error': 'timeout'}):
summary = self.mgr.replay_pending_pushes()
self.assertEqual(summary['failed'], 1)
self.assertTrue(self.mgr.list_connections()[0]['pending_push'])
def test_replay_identity_failure_returns_empty_summary(self):
self._add_office(push_ok=False)
with patch('cell_link_manager.CellLinkManager._local_identity',
side_effect=RuntimeError('no app context')):
summary = self.mgr.replay_pending_pushes()
self.assertEqual(summary['attempted'], 0)
# ── _load migration ───────────────────────────────────────────────────────
def test_load_migration_injects_sync_fields_on_legacy_record(self):
legacy = [{
'cell_name': 'office',
'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office.cell',
'permissions': {'inbound': {}, 'outbound': {}},
}]
links_file = os.path.join(self.test_dir, 'cell_links.json')
with open(links_file, 'w') as f:
json.dump(legacy, f)
links = self.mgr.list_connections()
link = links[0]
self.assertIn('remote_api_url', link)
self.assertIn('pending_push', link)
self.assertIn('last_push_status', link)
self.assertIn('last_push_at', link)
self.assertIn('last_remote_update_at', link)
self.assertEqual(link['remote_api_url'], 'https://10.1.0.1')
self.assertTrue(link['pending_push']) # pre-existing → marked pending
self.assertEqual(link['last_push_status'], 'never')
# Fields persisted to disk after migration
with open(links_file) as f:
raw = json.load(f)
self.assertIn('pending_push', raw[0])
def test_load_migrates_legacy_http_3000_url_to_https(self):
"""An existing link with the old http://<ip>:3000 URL (unreachable across
cells) is rewritten to the HTTPS/Caddy form on load."""
legacy = [{
'cell_name': 'office',
'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.9',
'domain': 'office.cell',
'permissions': {'inbound': {}, 'outbound': {}},
'remote_api_url': 'http://10.1.0.9:3000',
}]
links_file = os.path.join(self.test_dir, 'cell_links.json')
with open(links_file, 'w') as f:
json.dump(legacy, f)
link = self.mgr.list_connections()[0]
self.assertEqual(link['remote_api_url'], 'https://10.1.0.9')
class TestExitOffer(unittest.TestCase):
"""Tests for Phase 2: exit-offer signaling."""
INVITE = {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': '5.5.5.5:51820',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office',
}
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.data_dir = os.path.join(self.test_dir, 'data')
self.config_dir = os.path.join(self.test_dir, 'config')
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.config_dir, exist_ok=True)
self.wg = _make_wg_mock()
self.net = _make_nm_mock()
self.mgr = CellLinkManager(self.data_dir, self.config_dir, self.wg, self.net)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_office(self, push_ok=True):
push_result = {'ok': push_ok, 'error': None if push_ok else 'timeout'}
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value=push_result), \
patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value='10.0.0.1'), \
patch('firewall_manager.apply_cell_rules'):
self.mgr.add_connection(self.INVITE)
def test_new_links_default_exit_offered_false(self):
self._add_office()
link = self.mgr.list_connections()[0]
self.assertFalse(link.get('exit_offered'))
def test_new_links_default_remote_exit_offered_false(self):
self._add_office()
link = self.mgr.list_connections()[0]
self.assertFalse(link.get('remote_exit_offered'))
def test_set_exit_offered_persists(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': True, 'error': None}), \
patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value='10.0.0.1'):
result = self.mgr.set_exit_offered('office', True)
self.assertTrue(result['exit_offered'])
link = self.mgr.list_connections()[0]
self.assertTrue(link['exit_offered'])
def test_set_exit_offered_triggers_push(self):
self._add_office()
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \
patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value='10.0.0.1'):
self.mgr.set_exit_offered('office', True)
push_mock.assert_called_once()
def test_set_exit_offered_unknown_cell_raises(self):
with self.assertRaises(ValueError):
self.mgr.set_exit_offered('nobody', True)
def test_push_includes_exit_offered(self):
self._add_office()
link = self.mgr.list_connections()[0]
link['exit_offered'] = True
captured = {}
def fake_run(cmd, **kwargs):
if '-d' not in cmd:
r = MagicMock()
r.returncode = 0
r.stdout = ' inet 10.0.0.1/24 scope global wg0\n'
return r
import json as _j
d_idx = cmd.index('-d')
captured['body'] = _j.loads(cmd[d_idx + 1])
r = MagicMock()
r.returncode = 0
r.stdout = '200'
r.stderr = ''
return r
with patch('subprocess.run', fake_run):
self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertTrue(captured['body']['exit_offered'])
def test_apply_remote_permissions_stores_remote_exit_offered(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules'):
self.mgr.apply_remote_permissions(
'officepubkey=',
{'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {}},
exit_offered=True,
)
link = self.mgr.list_connections()[0]
self.assertTrue(link['remote_exit_offered'])
def test_migration_adds_exit_fields_to_existing_links(self):
"""Existing cell_links.json without exit fields get them on load."""
raw = [{'cell_name': 'office', 'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1',
'domain': 'office', 'endpoint': '5.5.5.5:51820',
'permissions': {'inbound': {}, 'outbound': {}},
'remote_api_url': 'http://10.1.0.1:3000',
'last_push_status': 'ok', 'last_push_at': None,
'last_push_error': None, 'pending_push': False,
'last_remote_update_at': None}]
with open(os.path.join(self.data_dir, 'cell_links.json'), 'w') as f:
json.dump(raw, f)
links = self.mgr.list_connections()
self.assertIn('exit_offered', links[0])
self.assertIn('remote_exit_offered', links[0])
self.assertFalse(links[0]['exit_offered'])
self.assertFalse(links[0]['remote_exit_offered'])
class TestExitRelay(unittest.TestCase):
"""Tests for Phase 3: per-peer internet routing via exit cell."""
INVITE = {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': '5.5.5.5:51820',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office',
}
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.data_dir = os.path.join(self.test_dir, 'data')
self.config_dir = os.path.join(self.test_dir, 'config')
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.config_dir, exist_ok=True)
self.wg = _make_wg_mock()
self.net = _make_nm_mock()
self.mgr = CellLinkManager(self.data_dir, self.config_dir, self.wg, self.net)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_office(self):
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': True, 'error': None}):
self.mgr.add_connection(self.INVITE)
def test_phase3_migration_adds_exit_relay_fields(self):
"""Existing links without Phase 3 fields get them on load."""
raw = [{'cell_name': 'office', 'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1',
'domain': 'office', 'endpoint': '5.5.5.5:51820',
'permissions': {'inbound': {}, 'outbound': {}},
'remote_api_url': 'http://10.1.0.1:3000',
'last_push_status': 'ok', 'last_push_at': None,
'last_push_error': None, 'pending_push': False,
'last_remote_update_at': None,
'exit_offered': False, 'remote_exit_offered': False}]
with open(os.path.join(self.data_dir, 'cell_links.json'), 'w') as f:
json.dump(raw, f)
links = self.mgr.list_connections()
self.assertIn('exit_relay_active', links[0])
self.assertIn('remote_exit_relay_active', links[0])
self.assertFalse(links[0]['exit_relay_active'])
self.assertFalse(links[0]['remote_exit_relay_active'])
def test_set_exit_relay_active_persists(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._try_push'):
link = self.mgr.set_exit_relay_active('office', True)
self.assertTrue(link['exit_relay_active'])
self.assertTrue(self.mgr.list_connections()[0]['exit_relay_active'])
def test_set_exit_relay_active_false_persists(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._try_push'):
self.mgr.set_exit_relay_active('office', True)
link = self.mgr.set_exit_relay_active('office', False)
self.assertFalse(link['exit_relay_active'])
def test_set_exit_relay_active_triggers_push(self):
self._add_office()
push_mock = MagicMock()
with patch('cell_link_manager.CellLinkManager._try_push', push_mock):
self.mgr.set_exit_relay_active('office', True)
push_mock.assert_called_once()
def test_set_exit_relay_active_unknown_cell_raises(self):
with self.assertRaises(ValueError):
self.mgr.set_exit_relay_active('nobody', True)
def test_push_includes_use_as_exit_relay_true(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._try_push'):
self.mgr.set_exit_relay_active('office', True)
captured = []
def fake_run(cmd, **kw):
captured.append(cmd)
r = MagicMock()
r.returncode = 0
r.stdout = '200'
return r
import json as _json
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.subprocess.run', side_effect=fake_run), \
patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value=None):
self.mgr._try_push('office', self.mgr.list_connections()[0])
flat = [arg for cmd in captured for arg in cmd]
payload_str = next(a for a in flat if a.startswith('{'))
body = _json.loads(payload_str)
self.assertIn('use_as_exit_relay', body)
self.assertTrue(body['use_as_exit_relay'])
def test_apply_remote_permissions_stores_remote_exit_relay_active(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules'):
self.mgr.apply_remote_permissions('officepubkey=', {},
use_as_exit_relay=True)
link = self.mgr.list_connections()[0]
self.assertTrue(link['remote_exit_relay_active'])
def test_apply_remote_permissions_calls_apply_cell_rules_with_exit_relay_true(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules') as mock_rules:
self.mgr.apply_remote_permissions('officepubkey=', {},
use_as_exit_relay=True)
mock_rules.assert_called_once_with('office', '10.1.0.0/24', [],
exit_relay=True)
if __name__ == '__main__':
unittest.main()