#!/usr/bin/env python3 """Unit tests for CellLinkManager (cell-to-cell VPN connections).""" import sys from pathlib import Path api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) import unittest import tempfile import os import json import shutil from unittest.mock import MagicMock, patch from cell_link_manager import CellLinkManager def _make_wg_mock(): wg = MagicMock() wg.get_keys.return_value = {'public_key': 'serverpubkey=', 'private_key': 'serverprivkey='} wg.get_server_config.return_value = { 'endpoint': '1.2.3.4:51820', 'port': 51820, 'dns_ip': '10.0.0.3', 'split_tunnel_ips': '10.0.0.0/24, 172.20.0.0/16', } wg._get_configured_network.return_value = '10.0.0.0/24' wg._get_configured_address.return_value = '10.0.0.1/24' wg.add_cell_peer.return_value = True wg.remove_peer.return_value = True return wg def _make_nm_mock(): nm = MagicMock() nm.add_cell_dns_forward.return_value = {'restarted': ['cell-dns (reloaded)'], 'warnings': []} nm.remove_cell_dns_forward.return_value = {'restarted': ['cell-dns (reloaded)'], 'warnings': []} return nm SAMPLE_INVITE = { 'cell_name': 'office', 'public_key': 'officepubkey=', 'endpoint': '5.6.7.8:51820', 'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1', 'domain': 'office.cell', 'version': 1, } class TestCellLinkManagerInvite(unittest.TestCase): def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) def tearDown(self): shutil.rmtree(self.test_dir) def test_generate_invite_has_required_fields(self): invite = self.mgr.generate_invite('mycell', 'home.cell') for field in ('cell_name', 'public_key', 'endpoint', 'vpn_subnet', 'dns_ip', 'domain', 'version'): self.assertIn(field, invite, f"Missing field: {field}") def test_generate_invite_uses_wg_public_key(self): invite = self.mgr.generate_invite('mycell', 'home.cell') self.assertEqual(invite['public_key'], 'serverpubkey=') def test_generate_invite_uses_configured_network(self): invite = self.mgr.generate_invite('mycell', 'home.cell') self.assertEqual(invite['vpn_subnet'], '10.0.0.0/24') def test_generate_invite_dns_ip_is_server_vpn_ip(self): invite = self.mgr.generate_invite('mycell', 'home.cell') self.assertEqual(invite['dns_ip'], '10.0.0.1') def test_generate_invite_uses_supplied_identity(self): invite = self.mgr.generate_invite('myhome', 'myhome.local') self.assertEqual(invite['cell_name'], 'myhome') self.assertEqual(invite['domain'], 'myhome.local') class TestCellLinkManagerConnections(unittest.TestCase): def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) def tearDown(self): shutil.rmtree(self.test_dir) def test_add_connection_stores_link(self): self.mgr.add_connection(SAMPLE_INVITE) links = self.mgr.list_connections() self.assertEqual(len(links), 1) self.assertEqual(links[0]['cell_name'], 'office') def test_add_connection_calls_add_cell_peer(self): self.mgr.add_connection(SAMPLE_INVITE) self.wg.add_cell_peer.assert_called_once_with( name='office', public_key='officepubkey=', endpoint='5.6.7.8:51820', vpn_subnet='10.1.0.0/24', ) def test_add_connection_calls_dns_forward(self): self.mgr.add_connection(SAMPLE_INVITE) self.nm.add_cell_dns_forward.assert_called_once_with( domain='office.cell', dns_ip='10.1.0.1' ) def test_add_connection_duplicate_raises(self): self.mgr.add_connection(SAMPLE_INVITE) with self.assertRaises(ValueError): self.mgr.add_connection(SAMPLE_INVITE) def test_add_connection_persists_to_disk(self): self.mgr.add_connection(SAMPLE_INVITE) # Create a fresh manager reading same dir mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) links = mgr2.list_connections() self.assertEqual(len(links), 1) self.assertEqual(links[0]['cell_name'], 'office') def test_remove_connection_calls_wg_remove_peer(self): self.mgr.add_connection(SAMPLE_INVITE) self.mgr.remove_connection('office') self.wg.remove_peer.assert_called_once_with('officepubkey=') def test_remove_connection_calls_dns_remove(self): self.mgr.add_connection(SAMPLE_INVITE) self.mgr.remove_connection('office') self.nm.remove_cell_dns_forward.assert_called_once_with('office.cell') def test_remove_connection_deletes_from_list(self): self.mgr.add_connection(SAMPLE_INVITE) self.mgr.remove_connection('office') self.assertEqual(len(self.mgr.list_connections()), 0) def test_remove_nonexistent_raises(self): with self.assertRaises(ValueError): self.mgr.remove_connection('nobody') def test_list_connections_empty_by_default(self): self.assertEqual(self.mgr.list_connections(), []) def test_multiple_connections(self): self.mgr.add_connection(SAMPLE_INVITE) second = {**SAMPLE_INVITE, 'cell_name': 'cabin', 'public_key': 'cabinpubkey=', 'vpn_subnet': '10.2.0.0/24', 'dns_ip': '10.2.0.1', 'domain': 'cabin.cell'} self.mgr.add_connection(second) self.assertEqual(len(self.mgr.list_connections()), 2) # accept_invite — new connection def test_accept_invite_adds_new_connection(self): with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) links = self.mgr.list_connections() self.assertEqual(len(links), 1) self.assertEqual(links[0]['cell_name'], 'office') def test_accept_invite_idempotent_no_change(self): with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.wg.reset_mock() self.mgr.accept_invite(SAMPLE_INVITE) # No WG update for identical invite self.wg.update_peer_ip.assert_not_called() def test_accept_invite_updates_dns_ip_on_existing(self): with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) updated_invite = {**SAMPLE_INVITE, 'dns_ip': '10.1.0.2'} with patch('firewall_manager.apply_cell_rules'): result = self.mgr.accept_invite(updated_invite) self.assertEqual(result['dns_ip'], '10.1.0.2') self.assertEqual(result['remote_api_url'], 'http://10.1.0.2:3000') self.nm.remove_cell_dns_forward.assert_called() self.nm.add_cell_dns_forward.assert_called_with( domain='office.cell', dns_ip='10.1.0.2') def test_accept_invite_updates_vpn_subnet_on_existing(self): with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.wg.update_peer_ip = MagicMock(return_value=True) updated_invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.5.0.0/24'} with patch('firewall_manager.apply_cell_rules'): result = self.mgr.accept_invite(updated_invite) self.assertEqual(result['vpn_subnet'], '10.5.0.0/24') self.wg.update_peer_ip.assert_called_once_with('officepubkey=', '10.5.0.0/24') def test_accept_invite_does_not_duplicate_link(self): with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.mgr.accept_invite({**SAMPLE_INVITE, 'dns_ip': '10.1.0.99'}) self.assertEqual(len(self.mgr.list_connections()), 1) if __name__ == '__main__': unittest.main() # --------------------------------------------------------------------------- # TestCheckInviteConflicts # --------------------------------------------------------------------------- class TestCheckInviteConflicts(unittest.TestCase): """Tests for CellLinkManager._check_invite_conflicts.""" def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() # wg._get_configured_network returns '10.0.0.0/24' (own subnet) self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) def tearDown(self): shutil.rmtree(self.test_dir) def _add_existing_cell(self, cell_name='cabin', vpn_subnet='10.2.0.0/24', domain='cabin.cell'): """Add a cell link directly to disk without going through add_connection.""" links = [{ 'cell_name': cell_name, 'public_key': 'cabinpubkey=', 'endpoint': '9.9.9.9:51820', 'vpn_subnet': vpn_subnet, 'dns_ip': '10.2.0.1', 'domain': domain, 'permissions': {'inbound': {}, 'outbound': {}}, 'remote_api_url': f'http://10.2.0.1:3000', 'pending_push': False, 'last_push_status': 'ok', 'last_push_at': None, 'last_push_error': None, 'last_remote_update_at': None, }] with open(os.path.join(self.test_dir, 'cell_links.json'), 'w') as f: json.dump(links, f) # --- subnet conflicts --- def test_subnet_overlaps_own_subnet_raises(self): """Invite whose vpn_subnet overlaps our own subnet raises ValueError.""" invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.0.0.0/24'} # same as own with self.assertRaises(ValueError) as ctx: self.mgr._check_invite_conflicts(invite) self.assertIn('subnet', str(ctx.exception).lower()) def test_subnet_overlaps_own_subnet_partial_raises(self): """Invite whose vpn_subnet partially overlaps our own subnet raises ValueError.""" # Own is 10.0.0.0/24; this /16 contains it invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.0.0.0/16'} with self.assertRaises(ValueError): self.mgr._check_invite_conflicts(invite) def test_subnet_overlaps_connected_cell_raises(self): """Invite whose vpn_subnet overlaps an already-connected cell raises ValueError.""" self._add_existing_cell(vpn_subnet='10.2.0.0/24') invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.2.0.0/24'} with self.assertRaises(ValueError) as ctx: self.mgr._check_invite_conflicts(invite) self.assertIn('cabin', str(ctx.exception)) def test_subnet_no_conflict_does_not_raise(self): """Invite with a non-overlapping subnet passes without error.""" self._add_existing_cell(vpn_subnet='10.2.0.0/24') invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'other.cell'} # Should not raise self.mgr._check_invite_conflicts(invite) # --- domain conflicts --- def test_domain_matches_own_domain_raises(self): """Invite with a domain equal to this cell's own domain raises ValueError.""" with patch('cell_link_manager.CellLinkManager._check_invite_conflicts', wraps=self.mgr._check_invite_conflicts): # Patch config_manager inside the function with patch('cell_link_manager.os.environ.get', return_value='home.cell'): # Use a fresh invite whose domain matches env-derived own domain invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'home.cell'} # Manually test via app import patch import sys fake_cfg = MagicMock() fake_cfg.configs = {'_identity': {'domain': 'home.cell'}} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): with self.assertRaises(ValueError) as ctx: self.mgr._check_invite_conflicts(invite) self.assertIn('domain', str(ctx.exception).lower()) def test_domain_matches_connected_cell_raises(self): """Invite with a domain already used by a connected cell raises ValueError.""" self._add_existing_cell(domain='cabin.cell', vpn_subnet='10.2.0.0/24') invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'cabin.cell'} with self.assertRaises(ValueError) as ctx: self.mgr._check_invite_conflicts(invite) self.assertIn('cabin', str(ctx.exception)) # --- exclude_cell parameter --- def test_exclude_cell_skips_that_cell_subnet_check(self): """With exclude_cell set, the named cell is skipped in subnet conflict check.""" self._add_existing_cell(cell_name='cabin', vpn_subnet='10.2.0.0/24', domain='cabin.cell') # Same subnet as cabin — normally a conflict, but excluded invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.2.0.0/24', 'domain': 'cabin.cell'} # Should not raise because 'cabin' is excluded self.mgr._check_invite_conflicts(invite, exclude_cell='cabin') def test_exclude_cell_skips_that_cell_domain_check(self): """With exclude_cell set, the named cell is skipped in domain conflict check.""" self._add_existing_cell(cell_name='cabin', vpn_subnet='10.2.0.0/24', domain='cabin.cell') invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.9.0.0/24', 'domain': 'cabin.cell'} # Should not raise — cabin excluded self.mgr._check_invite_conflicts(invite, exclude_cell='cabin') def test_exclude_cell_still_checks_other_cells(self): """Excluding 'cabin' does not suppress conflict with a different cell.""" self._add_existing_cell(cell_name='cabin', vpn_subnet='10.2.0.0/24', domain='cabin.cell') # Add a second cell manually with open(os.path.join(self.test_dir, 'cell_links.json')) as f: links = json.load(f) links.append({ 'cell_name': 'office', 'public_key': 'officepubkey=', 'vpn_subnet': '10.3.0.0/24', 'dns_ip': '10.3.0.1', 'domain': 'office.cell', 'permissions': {'inbound': {}, 'outbound': {}}, 'remote_api_url': 'http://10.3.0.1:3000', 'pending_push': False, 'last_push_status': 'ok', 'last_push_at': None, 'last_push_error': None, 'last_remote_update_at': None, }) with open(os.path.join(self.test_dir, 'cell_links.json'), 'w') as f: json.dump(links, f) # Conflicts with 'office', but we only exclude 'cabin' invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'new.cell'} with self.assertRaises(ValueError): self.mgr._check_invite_conflicts(invite, exclude_cell='cabin') # --------------------------------------------------------------------------- # TestPushInviteToRemote # --------------------------------------------------------------------------- class TestPushInviteToRemote(unittest.TestCase): """Tests for CellLinkManager._push_invite_to_remote.""" def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) def tearDown(self): shutil.rmtree(self.test_dir) def _make_link(self, endpoint='192.168.1.50:51820'): return { 'cell_name': 'office', 'public_key': 'officepubkey=', 'endpoint': endpoint, 'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1', 'domain': 'office.cell', 'remote_api_url': 'http://10.1.0.1:3000', } def _fake_identity(self): return {'cell_name': 'home', 'public_key': 'homepubkey='} def test_push_invite_success_2xx_returns_ok_true(self): """curl returning a 2xx status code → {'ok': True}.""" link = self._make_link() mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = '201' mock_result.stderr = '' with patch('cell_link_manager.CellLinkManager._local_identity', return_value=self._fake_identity()), \ patch('cell_link_manager.os.environ.get', return_value='home.cell'), \ patch('subprocess.run', return_value=mock_result): import sys fake_cfg = MagicMock() fake_cfg.configs = {'_identity': {'domain': 'home.cell'}} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): result = self.mgr._push_invite_to_remote(link) self.assertTrue(result['ok']) def test_push_invite_4xx_returns_ok_false_with_http_error(self): """curl returning a 4xx status code → {'ok': False, 'error': 'HTTP 4xx'}.""" link = self._make_link() mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = '400' mock_result.stderr = '' with patch('cell_link_manager.CellLinkManager._local_identity', return_value=self._fake_identity()), \ patch('subprocess.run', return_value=mock_result): import sys fake_cfg = MagicMock() fake_cfg.configs = {'_identity': {'domain': 'home.cell'}} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): result = self.mgr._push_invite_to_remote(link) self.assertFalse(result['ok']) self.assertIn('400', result['error']) def test_push_invite_no_endpoint_returns_ok_false(self): """Link with no endpoint → {'ok': False, 'error': 'no endpoint'}.""" link = self._make_link(endpoint='') result = self.mgr._push_invite_to_remote(link) self.assertFalse(result['ok']) self.assertIn('endpoint', result['error'].lower()) def test_push_invite_none_endpoint_returns_ok_false(self): """Link with endpoint=None → {'ok': False, 'error': 'no endpoint'}.""" link = self._make_link(endpoint='') link['endpoint'] = None result = self.mgr._push_invite_to_remote(link) self.assertFalse(result['ok']) def test_push_invite_subprocess_error_returns_ok_false(self): """subprocess.run raising an exception → {'ok': False, 'error': ...}.""" link = self._make_link() with patch('cell_link_manager.CellLinkManager._local_identity', return_value=self._fake_identity()), \ patch('subprocess.run', side_effect=OSError('command not found')): import sys fake_cfg = MagicMock() fake_cfg.configs = {'_identity': {'domain': 'home.cell'}} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): result = self.mgr._push_invite_to_remote(link) self.assertFalse(result['ok']) self.assertIsNotNone(result['error']) def test_push_invite_curl_nonzero_returncode_returns_ok_false(self): """curl subprocess returning nonzero returncode → {'ok': False}.""" link = self._make_link() mock_result = MagicMock() mock_result.returncode = 1 mock_result.stdout = '' mock_result.stderr = 'connection refused' with patch('cell_link_manager.CellLinkManager._local_identity', return_value=self._fake_identity()), \ patch('subprocess.run', return_value=mock_result): import sys fake_cfg = MagicMock() fake_cfg.configs = {'_identity': {'domain': 'home.cell'}} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): result = self.mgr._push_invite_to_remote(link) self.assertFalse(result['ok']) def test_push_invite_sends_to_correct_lan_host(self): """The curl URL must use the LAN IP from the endpoint, not the WG dns_ip.""" link = self._make_link(endpoint='192.168.31.52:51820') captured = {} def fake_run(cmd, **kw): captured['cmd'] = cmd r = MagicMock() r.returncode = 0 r.stdout = '201' r.stderr = '' return r with patch('cell_link_manager.CellLinkManager._local_identity', return_value=self._fake_identity()), \ patch('subprocess.run', fake_run): import sys fake_cfg = MagicMock() fake_cfg.configs = {'_identity': {'domain': 'home.cell'}} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): self.mgr._push_invite_to_remote(link) url_in_cmd = captured['cmd'][-1] self.assertIn('192.168.31.52', url_in_cmd) self.assertIn('accept-invite', url_in_cmd) # Must NOT use the WG dns_ip (10.1.0.1) self.assertNotIn('10.1.0.1', url_in_cmd) # --------------------------------------------------------------------------- # TestAcceptInviteNew # --------------------------------------------------------------------------- class TestAcceptInviteNew(unittest.TestCase): """Tests for CellLinkManager.accept_invite — new connection path.""" def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) def tearDown(self): shutil.rmtree(self.test_dir) def test_accept_invite_new_cell_adds_wg_peer(self): """accept_invite for a new cell calls add_cell_peer on WG manager.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.wg.add_cell_peer.assert_called_once_with( name='office', public_key='officepubkey=', endpoint='5.6.7.8:51820', vpn_subnet='10.1.0.0/24', ) def test_accept_invite_new_cell_adds_dns_forward(self): """accept_invite for a new cell calls add_cell_dns_forward on NM.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.nm.add_cell_dns_forward.assert_called_once_with( domain='office.cell', dns_ip='10.1.0.1') def test_accept_invite_new_cell_saves_link(self): """accept_invite for a new cell saves the link and returns it.""" with patch('firewall_manager.apply_cell_rules'): link = self.mgr.accept_invite(SAMPLE_INVITE) self.assertEqual(link['cell_name'], 'office') self.assertEqual(len(self.mgr.list_connections()), 1) def test_accept_invite_new_cell_sets_pending_push_true(self): """New link from accept_invite starts with pending_push=True (no push done).""" with patch('firewall_manager.apply_cell_rules'): link = self.mgr.accept_invite(SAMPLE_INVITE) self.assertTrue(link['pending_push']) def test_accept_invite_missing_cell_name_raises(self): """Invite missing 'cell_name' field raises ValueError.""" invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'cell_name'} with self.assertRaises(ValueError) as ctx: self.mgr.accept_invite(invite) self.assertIn('cell_name', str(ctx.exception)) def test_accept_invite_missing_public_key_raises(self): """Invite missing 'public_key' field raises ValueError.""" invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'public_key'} with self.assertRaises(ValueError) as ctx: self.mgr.accept_invite(invite) self.assertIn('public_key', str(ctx.exception)) def test_accept_invite_missing_vpn_subnet_raises(self): """Invite missing 'vpn_subnet' field raises ValueError.""" invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'vpn_subnet'} with self.assertRaises(ValueError) as ctx: self.mgr.accept_invite(invite) self.assertIn('vpn_subnet', str(ctx.exception)) def test_accept_invite_missing_dns_ip_raises(self): """Invite missing 'dns_ip' field raises ValueError.""" invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'dns_ip'} with self.assertRaises(ValueError) as ctx: self.mgr.accept_invite(invite) self.assertIn('dns_ip', str(ctx.exception)) def test_accept_invite_missing_domain_raises(self): """Invite missing 'domain' field raises ValueError.""" invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'domain'} with self.assertRaises(ValueError) as ctx: self.mgr.accept_invite(invite) self.assertIn('domain', str(ctx.exception)) def test_accept_invite_subnet_conflict_raises(self): """accept_invite raises ValueError when subnet conflicts with own subnet.""" conflicting = {**SAMPLE_INVITE, 'vpn_subnet': '10.0.0.0/24'} # same as own with self.assertRaises(ValueError): self.mgr.accept_invite(conflicting) def test_accept_invite_already_connected_no_change_returns_existing(self): """Calling accept_invite again with identical data returns existing link unchanged.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.wg.reset_mock() result = self.mgr.accept_invite(SAMPLE_INVITE) self.assertEqual(result['cell_name'], 'office') # No second WG peer add self.wg.add_cell_peer.assert_not_called() def test_accept_invite_already_connected_dns_ip_change_updates(self): """accept_invite with changed dns_ip updates the link and DNS forward.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) updated = {**SAMPLE_INVITE, 'dns_ip': '10.1.0.5'} with patch('firewall_manager.apply_cell_rules'): result = self.mgr.accept_invite(updated) self.assertEqual(result['dns_ip'], '10.1.0.5') self.assertEqual(result['remote_api_url'], 'http://10.1.0.5:3000') self.nm.remove_cell_dns_forward.assert_called() self.nm.add_cell_dns_forward.assert_called_with( domain='office.cell', dns_ip='10.1.0.5') def test_accept_invite_already_connected_dns_ip_change_does_not_duplicate(self): """DNS ip update via accept_invite must not create a second link.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.mgr.accept_invite({**SAMPLE_INVITE, 'dns_ip': '10.1.0.5'}) self.assertEqual(len(self.mgr.list_connections()), 1) def test_accept_invite_already_connected_vpn_subnet_change_calls_update_peer_ip(self): """accept_invite with changed vpn_subnet calls update_peer_ip on WG manager.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.wg.update_peer_ip = MagicMock(return_value=True) updated = {**SAMPLE_INVITE, 'vpn_subnet': '10.5.0.0/24'} with patch('firewall_manager.apply_cell_rules'): result = self.mgr.accept_invite(updated) self.assertEqual(result['vpn_subnet'], '10.5.0.0/24') self.wg.update_peer_ip.assert_called_once_with('officepubkey=', '10.5.0.0/24') def test_accept_invite_already_connected_vpn_subnet_change_reapplies_firewall(self): """accept_invite with changed vpn_subnet triggers apply_cell_rules.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.wg.update_peer_ip = MagicMock(return_value=True) updated = {**SAMPLE_INVITE, 'vpn_subnet': '10.5.0.0/24'} with patch('firewall_manager.apply_cell_rules') as mock_rules: self.mgr.accept_invite(updated) mock_rules.assert_called() def test_accept_invite_does_not_duplicate_on_repeated_call(self): """Multiple calls with the same invite must leave exactly one link.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.mgr.accept_invite(SAMPLE_INVITE) self.assertEqual(len(self.mgr.list_connections()), 1) def test_accept_invite_domain_change_updates_stored_domain(self): """accept_invite with a changed domain updates the stored domain.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) updated = {**SAMPLE_INVITE, 'domain': 'office-new.cell'} import sys fake_cfg = MagicMock() fake_cfg.configs = {'_identity': {'domain': 'home.cell'}} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): with patch('firewall_manager.apply_cell_rules'): result = self.mgr.accept_invite(updated) self.assertEqual(result['domain'], 'office-new.cell') def test_accept_invite_domain_change_updates_dns_forward(self): """accept_invite with a changed domain removes old DNS forward and adds new.""" with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.nm.reset_mock() updated = {**SAMPLE_INVITE, 'domain': 'office-new.cell'} import sys fake_cfg = MagicMock() fake_cfg.configs = {'_identity': {'domain': 'home.cell'}} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(updated) self.nm.remove_cell_dns_forward.assert_called_with('office.cell') self.nm.add_cell_dns_forward.assert_called_with( domain='office-new.cell', dns_ip=SAMPLE_INVITE['dns_ip']) def test_accept_invite_healing_domain_conflict_raises(self): """Healing must reject a domain update that conflicts with another connected cell.""" import sys fake_cfg = MagicMock() fake_cfg.configs = {'_identity': {'domain': 'home.cell'}} # Add two cells: 'office' and 'branch' branch_invite = {**SAMPLE_INVITE, 'cell_name': 'branch', 'public_key': 'branchpubkey1234567890ABCDEFGH=', 'vpn_subnet': '10.9.0.0/24', 'dns_ip': '10.9.0.1', 'domain': 'branch.cell'} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): with patch('firewall_manager.apply_cell_rules'): self.mgr.accept_invite(SAMPLE_INVITE) self.mgr.accept_invite(branch_invite) # Now 'office' tries to heal its domain to 'branch.cell' — must fail conflicting = {**SAMPLE_INVITE, 'domain': 'branch.cell'} with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}): with self.assertRaises(ValueError) as ctx: self.mgr.accept_invite(conflicting) self.assertIn('branch.cell', str(ctx.exception)) # --------------------------------------------------------------------------- # TestAddConnectionMutualPairing # --------------------------------------------------------------------------- class TestAddConnectionMutualPairing(unittest.TestCase): """Tests for add_connection's mutual pairing via _push_invite_to_remote.""" def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) def tearDown(self): shutil.rmtree(self.test_dir) def _add_with_push(self, push_result): push_mock = MagicMock(return_value=push_result) perm_push = MagicMock(return_value={'ok': True, 'error': None}) with patch('cell_link_manager.CellLinkManager._push_invite_to_remote', push_mock), \ patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', perm_push), \ patch('firewall_manager.apply_cell_rules'): link = self.mgr.add_connection(SAMPLE_INVITE) return link, push_mock def test_add_connection_calls_push_invite_to_remote(self): """add_connection calls _push_invite_to_remote after adding the connection.""" _, push_mock = self._add_with_push({'ok': True, 'error': None}) push_mock.assert_called_once() def test_add_connection_push_invite_failure_is_nonfatal(self): """_push_invite_to_remote failure does not prevent connection creation.""" link, _ = self._add_with_push({'ok': False, 'error': 'connection refused'}) conns = self.mgr.list_connections() self.assertEqual(len(conns), 1) self.assertEqual(conns[0]['cell_name'], 'office') def test_add_connection_push_invite_failure_link_still_stored(self): """Even when push fails, the link is persisted to disk.""" _, _ = self._add_with_push({'ok': False, 'error': 'timeout'}) mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) self.assertEqual(len(mgr2.list_connections()), 1) def test_add_connection_with_inbound_services_sets_permissions(self): """inbound_services passed to add_connection sets permissions correctly.""" perm_push = MagicMock(return_value={'ok': True, 'error': None}) push_mock = MagicMock(return_value={'ok': True, 'error': None}) with patch('cell_link_manager.CellLinkManager._push_invite_to_remote', push_mock), \ patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', perm_push), \ patch('firewall_manager.apply_cell_rules'): link = self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['calendar']) self.assertTrue(link['permissions']['inbound']['calendar']) self.assertFalse(link['permissions']['inbound']['files']) def test_add_connection_push_invite_exception_is_nonfatal(self): """Exception from _push_invite_to_remote is caught and does not raise.""" perm_push = MagicMock(return_value={'ok': True, 'error': None}) with patch('cell_link_manager.CellLinkManager._push_invite_to_remote', side_effect=RuntimeError('docker not available')), \ patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', perm_push), \ patch('firewall_manager.apply_cell_rules'): link = self.mgr.add_connection(SAMPLE_INVITE) self.assertEqual(link['cell_name'], 'office') # --------------------------------------------------------------------------- # TestAddConnectionAtomicity # --------------------------------------------------------------------------- class TestAddConnectionAtomicity(unittest.TestCase): """Verify that add_connection rolls back correctly when WG or DNS steps fail.""" def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) def tearDown(self): shutil.rmtree(self.test_dir) def test_wg_fail_does_not_call_dns(self): """When add_cell_peer returns False, add_cell_dns_forward must NOT be called.""" self.wg.add_cell_peer.return_value = False with self.assertRaises(RuntimeError): self.mgr.add_connection(SAMPLE_INVITE) self.nm.add_cell_dns_forward.assert_not_called() def test_wg_fail_does_not_persist_link(self): """When WG fails, list_connections() must still return [] (nothing persisted).""" self.wg.add_cell_peer.return_value = False with self.assertRaises(RuntimeError): self.mgr.add_connection(SAMPLE_INVITE) self.assertEqual(self.mgr.list_connections(), []) def test_wg_fail_raises_runtime_error(self): """add_connection raises RuntimeError (not some other exception) when WG fails.""" self.wg.add_cell_peer.return_value = False with self.assertRaises(RuntimeError): self.mgr.add_connection(SAMPLE_INVITE) def test_dns_warning_still_persists_link(self): """When DNS returns warnings (not a hard failure), the link IS still saved.""" self.nm.add_cell_dns_forward.return_value = { 'restarted': [], 'warnings': ['CoreDNS reload timed out'], } self.mgr.add_connection(SAMPLE_INVITE) links = self.mgr.list_connections() self.assertEqual(len(links), 1) self.assertEqual(links[0]['cell_name'], 'office') def test_dns_warning_does_not_raise(self): """When DNS returns warnings, add_connection completes without raising.""" self.nm.add_cell_dns_forward.return_value = { 'restarted': [], 'warnings': ['CoreDNS reload timed out'], } try: self.mgr.add_connection(SAMPLE_INVITE) except Exception as e: self.fail(f"add_connection raised unexpectedly with DNS warnings: {e}") # --------------------------------------------------------------------------- # TestAddConnectionPermissions # --------------------------------------------------------------------------- class TestAddConnectionPermissions(unittest.TestCase): """Verify that inbound_services controls the permissions field on the saved link.""" def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) def tearDown(self): shutil.rmtree(self.test_dir) def _get_link(self): links = self.mgr.list_connections() self.assertEqual(len(links), 1) return links[0] def test_add_with_no_inbound_defaults_all_deny(self): """No inbound_services arg → all inbound permissions False.""" self.mgr.add_connection(SAMPLE_INVITE) link = self._get_link() inbound = link['permissions']['inbound'] for service, allowed in inbound.items(): self.assertFalse(allowed, f"Expected {service} to be False, got {allowed}") def test_add_with_inbound_services_sets_them(self): """inbound_services=['calendar','files'] → those two True, others False.""" self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['calendar', 'files']) link = self._get_link() inbound = link['permissions']['inbound'] self.assertTrue(inbound['calendar']) self.assertTrue(inbound['files']) self.assertFalse(inbound['mail']) self.assertFalse(inbound['webdav']) def test_inbound_invalid_service_ignored(self): """Passing 'badservice' in inbound_services does not appear in permissions.""" self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['badservice', 'calendar']) link = self._get_link() inbound = link['permissions']['inbound'] self.assertNotIn('badservice', inbound) # valid one was still applied self.assertTrue(inbound['calendar']) # --------------------------------------------------------------------------- # TestUpdatePermissions # --------------------------------------------------------------------------- class TestUpdatePermissions(unittest.TestCase): """Tests for the new update_permissions / get_permissions methods.""" def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) # Add a connection so there is something to update self.mgr.add_connection(SAMPLE_INVITE) def tearDown(self): shutil.rmtree(self.test_dir) def test_update_sets_inbound_values(self): """update_permissions with inbound={'calendar': True} persists correctly.""" with patch('cell_link_manager.firewall_manager', create=True) as mock_fm: mock_fm.apply_cell_rules = MagicMock() self.mgr.update_permissions('office', {'calendar': True}, {}) # Re-read from disk to confirm persistence mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) perms = mgr2.get_permissions('office') self.assertTrue(perms['inbound']['calendar']) self.assertFalse(perms['inbound']['files']) def test_update_rejects_unknown_service_by_cleaning_it_out(self): """update_permissions with inbound={'bad': True} — 'bad' must not appear in saved perms.""" with patch('cell_link_manager.firewall_manager', create=True) as mock_fm: mock_fm.apply_cell_rules = MagicMock() self.mgr.update_permissions('office', {'bad': True, 'calendar': True}, {}) perms = self.mgr.get_permissions('office') self.assertNotIn('bad', perms['inbound']) self.assertTrue(perms['inbound']['calendar']) def test_update_nonexistent_cell_raises(self): """update_permissions on an unknown cell_name raises ValueError.""" with self.assertRaises(ValueError): self.mgr.update_permissions('nosuchcell', {}, {}) def test_get_permissions_returns_correct(self): """get_permissions returns the dict that was saved by update_permissions.""" with patch('cell_link_manager.firewall_manager', create=True) as mock_fm: mock_fm.apply_cell_rules = MagicMock() self.mgr.update_permissions( 'office', inbound={'calendar': True, 'files': False}, outbound={'mail': True}, ) perms = self.mgr.get_permissions('office') self.assertIn('inbound', perms) self.assertIn('outbound', perms) self.assertTrue(perms['inbound']['calendar']) self.assertFalse(perms['inbound']['files']) self.assertTrue(perms['outbound']['mail']) def test_get_permissions_nonexistent_cell_raises(self): """get_permissions on an unknown cell_name raises ValueError.""" with self.assertRaises(ValueError): self.mgr.get_permissions('nosuchcell') # --------------------------------------------------------------------------- # TestLoadMigration # --------------------------------------------------------------------------- class TestLoadMigration(unittest.TestCase): """Verify _load() lazily injects permissions field when it is missing.""" def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() def tearDown(self): shutil.rmtree(self.test_dir) def test_load_injects_permissions_if_missing(self): """Write cell_links.json without permissions; _load should add all-False defaults.""" links_file = os.path.join(self.test_dir, 'cell_links.json') legacy_links = [ { 'cell_name': 'legacy-office', 'public_key': 'officepubkey=', 'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1', 'domain': 'legacy-office.cell', # NO 'permissions' key — simulates pre-migration data } ] with open(links_file, 'w') as f: json.dump(legacy_links, f) mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) links = mgr.list_connections() self.assertEqual(len(links), 1) link = links[0] self.assertIn('permissions', link) perms = link['permissions'] self.assertIn('inbound', perms) self.assertIn('outbound', perms) for service in ('calendar', 'files', 'mail', 'webdav'): self.assertFalse(perms['inbound'][service]) self.assertFalse(perms['outbound'][service]) def test_load_migration_persists_to_disk(self): """After migration, re-loading the same file returns the injected permissions.""" links_file = os.path.join(self.test_dir, 'cell_links.json') with open(links_file, 'w') as f: json.dump([{ 'cell_name': 'old-cell', 'public_key': 'somepubkey=', 'vpn_subnet': '10.2.0.0/24', 'dns_ip': '10.2.0.1', 'domain': 'old-cell.cell', }], f) mgr1 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) mgr1.list_connections() # triggers migration + save # Read the file directly and confirm permissions are now on disk with open(links_file) as f: raw = json.load(f) self.assertIn('permissions', raw[0]) class TestPermissionSync(unittest.TestCase): """Tests for Phase 1: permission sync between connected PIC cells.""" INVITE = { 'cell_name': 'office', 'public_key': 'officepubkey=', 'endpoint': '5.6.7.8:51820', 'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1', 'domain': 'office.cell', 'version': 1, } def setUp(self): self.test_dir = tempfile.mkdtemp() self.wg = _make_wg_mock() self.nm = _make_nm_mock() self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm) def tearDown(self): shutil.rmtree(self.test_dir) def _add_office(self, push_ok=True): with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', return_value={'ok': push_ok, 'error': None if push_ok else 'conn refused'}), \ patch('firewall_manager.apply_cell_rules'): return self.mgr.add_connection(self.INVITE) # ── add_connection ──────────────────────────────────────────────────────── def test_add_connection_includes_sync_fields(self): link = self._add_office() self.assertIn('remote_api_url', link) self.assertIn('pending_push', link) self.assertIn('last_push_status', link) self.assertIn('last_push_at', link) self.assertIn('last_remote_update_at', link) def test_add_connection_sets_remote_api_url_from_dns_ip(self): link = self._add_office() self.assertEqual(link['remote_api_url'], 'http://10.1.0.1:3000') def test_add_connection_triggers_push(self): push_mock = MagicMock(return_value={'ok': True, 'error': None}) with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \ patch('firewall_manager.apply_cell_rules'): self.mgr.add_connection(self.INVITE) push_mock.assert_called_once() call_args = push_mock.call_args[0] self.assertEqual(call_args[1], 'home') # from_cell self.assertEqual(call_args[2], 'homepubkey=') # from_public_key def test_add_connection_push_failure_does_not_abort_add(self): link = self._add_office(push_ok=False) conns = self.mgr.list_connections() self.assertEqual(len(conns), 1) self.assertEqual(conns[0]['cell_name'], 'office') self.assertTrue(conns[0]['pending_push']) def test_add_connection_push_success_clears_pending(self): self._add_office(push_ok=True) link = self.mgr.list_connections()[0] self.assertFalse(link['pending_push']) self.assertEqual(link['last_push_status'], 'ok') # ── update_permissions ──────────────────────────────────────────────────── def test_update_permissions_push_succeeds_clears_pending(self): self._add_office() push_mock = MagicMock(return_value={'ok': True, 'error': None}) with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \ patch('firewall_manager.apply_cell_rules'): self.mgr.update_permissions('office', {'calendar': True}, {'files': False}) link = self.mgr.list_connections()[0] self.assertFalse(link['pending_push']) self.assertEqual(link['last_push_status'], 'ok') self.assertIsNotNone(link['last_push_at']) def test_update_permissions_push_failure_keeps_local_save(self): self._add_office() with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', return_value={'ok': False, 'error': 'timeout'}), \ patch('firewall_manager.apply_cell_rules'): result = self.mgr.update_permissions('office', {'calendar': True}, {}) # Local save must have happened — calendar is True self.assertTrue(result['permissions']['inbound']['calendar']) link = self.mgr.list_connections()[0] self.assertTrue(link['pending_push']) self.assertEqual(link['last_push_status'], 'failed') def test_update_permissions_does_not_raise_on_push_exception(self): self._add_office() with patch('cell_link_manager.CellLinkManager._local_identity', side_effect=RuntimeError('no app context')), \ patch('firewall_manager.apply_cell_rules'): # Must not raise result = self.mgr.update_permissions('office', {}, {}) self.assertIn('permissions', result) # ── _push_permissions_to_remote (unit) ──────────────────────────────────── def test_push_mirrors_inbound_outbound(self): """Our inbound (what we share) must become their outbound in the body.""" self._add_office() link = self.mgr.list_connections()[0] link['permissions'] = { 'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False}, 'outbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False}, } sent_body = {} def fake_run(cmd, **kwargs): import json as _j if '-d' not in cmd: # ip addr show wg0 — return a fake wg0 address r = MagicMock() r.returncode = 0 r.stdout = 'inet 10.0.0.1/24 scope global wg0\n' return r d_idx = cmd.index('-d') sent_body.update(_j.loads(cmd[d_idx + 1])) r = MagicMock() r.returncode = 0 r.stdout = '200' r.stderr = '' return r with patch('subprocess.run', fake_run): result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=') self.assertTrue(result['ok']) pushed_perms = sent_body['permissions'] # Our inbound=calendar:True → their outbound=calendar:True self.assertTrue(pushed_perms['outbound']['calendar']) # Our outbound=files:True → their inbound=files:True self.assertTrue(pushed_perms['inbound']['files']) def test_push_xff_header_carries_local_wg_ip(self): """Curl command must include X-Forwarded-For with local WG IP. MASQUERADE rewrites source to Docker bridge IP. Without XFF the remote can't match the sender's VPN subnet and returns 403. """ self._add_office() link = self.mgr.list_connections()[0] captured_cmd = {} def fake_run(cmd, **kwargs): if '-d' not in cmd: r = MagicMock() r.returncode = 0 r.stdout = ' inet 10.0.0.1/24 scope global wg0\n' return r captured_cmd['cmd'] = cmd r = MagicMock() r.returncode = 0 r.stdout = '200' r.stderr = '' return r with patch('subprocess.run', fake_run): self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=') cmd = captured_cmd.get('cmd', []) # X-Forwarded-For header must be in the curl command self.assertIn('-H', cmd) xff_idx = [i for i, x in enumerate(cmd) if x == '-H' and i + 1 < len(cmd) and 'X-Forwarded-For' in cmd[i + 1]] self.assertTrue(xff_idx, 'X-Forwarded-For header missing from curl command') xff_val = cmd[xff_idx[0] + 1] self.assertIn('10.0.0.1', xff_val) def test_push_http_error_returns_not_ok(self): self._add_office() link = self.mgr.list_connections()[0] mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = '503' mock_result.stderr = '' def fake_run(cmd, **kwargs): if '-d' not in cmd: r = MagicMock() r.returncode = 0 r.stdout = ' inet 10.0.0.1/24 scope global wg0\n' return r return mock_result with patch('subprocess.run', fake_run): result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=') self.assertFalse(result['ok']) self.assertIn('503', result['error']) def test_push_no_remote_api_url_returns_not_ok(self): self._add_office() link = self.mgr.list_connections()[0] link['remote_api_url'] = None result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=') self.assertFalse(result['ok']) # ── apply_remote_permissions ────────────────────────────────────────────── def test_apply_remote_permissions_stores_by_pubkey(self): self._add_office() with patch('firewall_manager.apply_cell_rules'): updated = self.mgr.apply_remote_permissions( 'officepubkey=', {'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False}, 'outbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False}}, ) self.assertTrue(updated['permissions']['inbound']['calendar']) self.assertTrue(updated['permissions']['outbound']['files']) # Persisted to disk link = self.mgr.list_connections()[0] self.assertTrue(link['permissions']['inbound']['calendar']) self.assertIsNotNone(link['last_remote_update_at']) def test_apply_remote_permissions_unknown_pubkey_raises(self): self._add_office() with self.assertRaises(ValueError): self.mgr.apply_remote_permissions('nosuchkey=', {}) def test_apply_remote_permissions_calls_apply_cell_rules(self): self._add_office() with patch('firewall_manager.apply_cell_rules') as mock_rules: self.mgr.apply_remote_permissions( 'officepubkey=', {'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False}, 'outbound': {}}, ) mock_rules.assert_called_once_with('office', '10.1.0.0/24', ['calendar'], exit_relay=False) # ── replay_pending_pushes ───────────────────────────────────────────────── def test_replay_retries_pending_links(self): self._add_office(push_ok=False) # leaves pending_push=True + next_retry_at set links = self.mgr._load() links[0]['next_retry_at'] = None # simulate backoff window elapsed self.mgr._save(links) self.assertTrue(self.mgr.list_connections()[0]['pending_push']) push_mock = MagicMock(return_value={'ok': True, 'error': None}) with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock): summary = self.mgr.replay_pending_pushes() push_mock.assert_called_once() self.assertEqual(summary['attempted'], 1) self.assertEqual(summary['ok'], 1) self.assertFalse(self.mgr.list_connections()[0]['pending_push']) def test_replay_skips_non_pending_links(self): self._add_office(push_ok=True) # pending_push=False after success push_mock = MagicMock(return_value={'ok': True, 'error': None}) with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock): summary = self.mgr.replay_pending_pushes() push_mock.assert_not_called() self.assertEqual(summary['attempted'], 0) def test_replay_push_failure_leaves_pending(self): self._add_office(push_ok=False) links = self.mgr._load() links[0]['next_retry_at'] = None # simulate backoff window elapsed self.mgr._save(links) with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', return_value={'ok': False, 'error': 'timeout'}): summary = self.mgr.replay_pending_pushes() self.assertEqual(summary['failed'], 1) self.assertTrue(self.mgr.list_connections()[0]['pending_push']) def test_replay_identity_failure_returns_empty_summary(self): self._add_office(push_ok=False) with patch('cell_link_manager.CellLinkManager._local_identity', side_effect=RuntimeError('no app context')): summary = self.mgr.replay_pending_pushes() self.assertEqual(summary['attempted'], 0) # ── _load migration ─────────────────────────────────────────────────────── def test_load_migration_injects_sync_fields_on_legacy_record(self): legacy = [{ 'cell_name': 'office', 'public_key': 'officepubkey=', 'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1', 'domain': 'office.cell', 'permissions': {'inbound': {}, 'outbound': {}}, }] links_file = os.path.join(self.test_dir, 'cell_links.json') with open(links_file, 'w') as f: json.dump(legacy, f) links = self.mgr.list_connections() link = links[0] self.assertIn('remote_api_url', link) self.assertIn('pending_push', link) self.assertIn('last_push_status', link) self.assertIn('last_push_at', link) self.assertIn('last_remote_update_at', link) self.assertEqual(link['remote_api_url'], 'http://10.1.0.1:3000') self.assertTrue(link['pending_push']) # pre-existing → marked pending self.assertEqual(link['last_push_status'], 'never') # Fields persisted to disk after migration with open(links_file) as f: raw = json.load(f) self.assertIn('pending_push', raw[0]) class TestExitOffer(unittest.TestCase): """Tests for Phase 2: exit-offer signaling.""" INVITE = { 'cell_name': 'office', 'public_key': 'officepubkey=', 'endpoint': '5.5.5.5:51820', 'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1', 'domain': 'office', } def setUp(self): self.test_dir = tempfile.mkdtemp() self.data_dir = os.path.join(self.test_dir, 'data') self.config_dir = os.path.join(self.test_dir, 'config') os.makedirs(self.data_dir, exist_ok=True) os.makedirs(self.config_dir, exist_ok=True) self.wg = _make_wg_mock() self.net = _make_nm_mock() self.mgr = CellLinkManager(self.data_dir, self.config_dir, self.wg, self.net) def tearDown(self): shutil.rmtree(self.test_dir) def _add_office(self, push_ok=True): push_result = {'ok': push_ok, 'error': None if push_ok else 'timeout'} with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', return_value=push_result), \ patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value='10.0.0.1'), \ patch('firewall_manager.apply_cell_rules'): self.mgr.add_connection(self.INVITE) def test_new_links_default_exit_offered_false(self): self._add_office() link = self.mgr.list_connections()[0] self.assertFalse(link.get('exit_offered')) def test_new_links_default_remote_exit_offered_false(self): self._add_office() link = self.mgr.list_connections()[0] self.assertFalse(link.get('remote_exit_offered')) def test_set_exit_offered_persists(self): self._add_office() with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', return_value={'ok': True, 'error': None}), \ patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value='10.0.0.1'): result = self.mgr.set_exit_offered('office', True) self.assertTrue(result['exit_offered']) link = self.mgr.list_connections()[0] self.assertTrue(link['exit_offered']) def test_set_exit_offered_triggers_push(self): self._add_office() push_mock = MagicMock(return_value={'ok': True, 'error': None}) with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \ patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value='10.0.0.1'): self.mgr.set_exit_offered('office', True) push_mock.assert_called_once() def test_set_exit_offered_unknown_cell_raises(self): with self.assertRaises(ValueError): self.mgr.set_exit_offered('nobody', True) def test_push_includes_exit_offered(self): self._add_office() link = self.mgr.list_connections()[0] link['exit_offered'] = True captured = {} def fake_run(cmd, **kwargs): if '-d' not in cmd: r = MagicMock() r.returncode = 0 r.stdout = ' inet 10.0.0.1/24 scope global wg0\n' return r import json as _j d_idx = cmd.index('-d') captured['body'] = _j.loads(cmd[d_idx + 1]) r = MagicMock() r.returncode = 0 r.stdout = '200' r.stderr = '' return r with patch('subprocess.run', fake_run): self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=') self.assertTrue(captured['body']['exit_offered']) def test_apply_remote_permissions_stores_remote_exit_offered(self): self._add_office() with patch('firewall_manager.apply_cell_rules'): self.mgr.apply_remote_permissions( 'officepubkey=', {'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False}, 'outbound': {}}, exit_offered=True, ) link = self.mgr.list_connections()[0] self.assertTrue(link['remote_exit_offered']) def test_migration_adds_exit_fields_to_existing_links(self): """Existing cell_links.json without exit fields get them on load.""" raw = [{'cell_name': 'office', 'public_key': 'officepubkey=', 'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1', 'domain': 'office', 'endpoint': '5.5.5.5:51820', 'permissions': {'inbound': {}, 'outbound': {}}, 'remote_api_url': 'http://10.1.0.1:3000', 'last_push_status': 'ok', 'last_push_at': None, 'last_push_error': None, 'pending_push': False, 'last_remote_update_at': None}] with open(os.path.join(self.data_dir, 'cell_links.json'), 'w') as f: json.dump(raw, f) links = self.mgr.list_connections() self.assertIn('exit_offered', links[0]) self.assertIn('remote_exit_offered', links[0]) self.assertFalse(links[0]['exit_offered']) self.assertFalse(links[0]['remote_exit_offered']) class TestExitRelay(unittest.TestCase): """Tests for Phase 3: per-peer internet routing via exit cell.""" INVITE = { 'cell_name': 'office', 'public_key': 'officepubkey=', 'endpoint': '5.5.5.5:51820', 'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1', 'domain': 'office', } def setUp(self): self.test_dir = tempfile.mkdtemp() self.data_dir = os.path.join(self.test_dir, 'data') self.config_dir = os.path.join(self.test_dir, 'config') os.makedirs(self.data_dir, exist_ok=True) os.makedirs(self.config_dir, exist_ok=True) self.wg = _make_wg_mock() self.net = _make_nm_mock() self.mgr = CellLinkManager(self.data_dir, self.config_dir, self.wg, self.net) def tearDown(self): shutil.rmtree(self.test_dir) def _add_office(self): with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', return_value={'ok': True, 'error': None}): self.mgr.add_connection(self.INVITE) def test_phase3_migration_adds_exit_relay_fields(self): """Existing links without Phase 3 fields get them on load.""" raw = [{'cell_name': 'office', 'public_key': 'officepubkey=', 'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1', 'domain': 'office', 'endpoint': '5.5.5.5:51820', 'permissions': {'inbound': {}, 'outbound': {}}, 'remote_api_url': 'http://10.1.0.1:3000', 'last_push_status': 'ok', 'last_push_at': None, 'last_push_error': None, 'pending_push': False, 'last_remote_update_at': None, 'exit_offered': False, 'remote_exit_offered': False}] with open(os.path.join(self.data_dir, 'cell_links.json'), 'w') as f: json.dump(raw, f) links = self.mgr.list_connections() self.assertIn('exit_relay_active', links[0]) self.assertIn('remote_exit_relay_active', links[0]) self.assertFalse(links[0]['exit_relay_active']) self.assertFalse(links[0]['remote_exit_relay_active']) def test_set_exit_relay_active_persists(self): self._add_office() with patch('cell_link_manager.CellLinkManager._try_push'): link = self.mgr.set_exit_relay_active('office', True) self.assertTrue(link['exit_relay_active']) self.assertTrue(self.mgr.list_connections()[0]['exit_relay_active']) def test_set_exit_relay_active_false_persists(self): self._add_office() with patch('cell_link_manager.CellLinkManager._try_push'): self.mgr.set_exit_relay_active('office', True) link = self.mgr.set_exit_relay_active('office', False) self.assertFalse(link['exit_relay_active']) def test_set_exit_relay_active_triggers_push(self): self._add_office() push_mock = MagicMock() with patch('cell_link_manager.CellLinkManager._try_push', push_mock): self.mgr.set_exit_relay_active('office', True) push_mock.assert_called_once() def test_set_exit_relay_active_unknown_cell_raises(self): with self.assertRaises(ValueError): self.mgr.set_exit_relay_active('nobody', True) def test_push_includes_use_as_exit_relay_true(self): self._add_office() with patch('cell_link_manager.CellLinkManager._try_push'): self.mgr.set_exit_relay_active('office', True) captured = [] def fake_run(cmd, **kw): captured.append(cmd) r = MagicMock() r.returncode = 0 r.stdout = '200' return r import json as _json with patch('cell_link_manager.CellLinkManager._local_identity', return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \ patch('cell_link_manager.subprocess.run', side_effect=fake_run), \ patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value=None): self.mgr._try_push('office', self.mgr.list_connections()[0]) flat = [arg for cmd in captured for arg in cmd] payload_str = next(a for a in flat if a.startswith('{')) body = _json.loads(payload_str) self.assertIn('use_as_exit_relay', body) self.assertTrue(body['use_as_exit_relay']) def test_apply_remote_permissions_stores_remote_exit_relay_active(self): self._add_office() with patch('firewall_manager.apply_cell_rules'): self.mgr.apply_remote_permissions('officepubkey=', {}, use_as_exit_relay=True) link = self.mgr.list_connections()[0] self.assertTrue(link['remote_exit_relay_active']) def test_apply_remote_permissions_calls_apply_cell_rules_with_exit_relay_true(self): self._add_office() with patch('firewall_manager.apply_cell_rules') as mock_rules: self.mgr.apply_remote_permissions('officepubkey=', {}, use_as_exit_relay=True) mock_rules.assert_called_once_with('office', '10.1.0.0/24', [], exit_relay=True) if __name__ == '__main__': unittest.main()