Files
pic/tests/test_cell_link_manager.py
T
roof dc2606541c feat: Phase 4 hardening — retry/backoff, loop detection, sync status UI + tests
Phase 4.1 — Retry/backoff for failed permission pushes:
- _compute_next_retry(): capped exponential backoff with jitter (60s–1h)
- _record_push_result(): tracks push_attempts and next_retry_at per link
- replay_pending_pushes(): skips links still in backoff window, logs deferred count
- _load() migration: adds push_attempts/next_retry_at to existing records

Phase 4.2 — Loop detection (A→B→A routing cycle):
- set_peer_route_via(): returns 409 if target cell already routes peers through us
- apply_remote_permissions(): soft warning when accepting exit-relay that would cycle

Phase 4.3 — Sync staleness indicator in Cell Network UI:
- SyncBadge component: green (synced), amber (pending/failed), gray (never)
- Shows relativeTime of last sync + error message + next retry estimate
- Injected into CellPanel header alongside tunnel online/handshake status

Tests (54 new):
- TestCheckInviteConflicts: subnet overlap, domain conflict, exclude_cell (9 tests)
- TestPushInviteToRemote: success, 4xx, no endpoint, subprocess errors (7 tests)
- TestAcceptInviteNew: new cell, idempotent, healing dns/subnet changes (16 tests)
- TestAddConnectionMutualPairing: push-invite call, non-fatal failure (5 tests)
- TestPeerSyncAcceptInvite endpoint: happy path, field validation, error propagation (16 tests)
- Fixed 2 existing replay tests to clear backoff gate (simulates elapsed window)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 04:18:36 -04:00

1534 lines
68 KiB
Python

#!/usr/bin/env python3
"""Unit tests for CellLinkManager (cell-to-cell VPN connections)."""
import sys
from pathlib import Path
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
import unittest
import tempfile
import os
import json
import shutil
from unittest.mock import MagicMock, patch
from cell_link_manager import CellLinkManager
def _make_wg_mock():
wg = MagicMock()
wg.get_keys.return_value = {'public_key': 'serverpubkey=', 'private_key': 'serverprivkey='}
wg.get_server_config.return_value = {
'endpoint': '1.2.3.4:51820', 'port': 51820,
'dns_ip': '10.0.0.3', 'split_tunnel_ips': '10.0.0.0/24, 172.20.0.0/16',
}
wg._get_configured_network.return_value = '10.0.0.0/24'
wg._get_configured_address.return_value = '10.0.0.1/24'
wg.add_cell_peer.return_value = True
wg.remove_peer.return_value = True
return wg
def _make_nm_mock():
nm = MagicMock()
nm.add_cell_dns_forward.return_value = {'restarted': ['cell-dns (reloaded)'], 'warnings': []}
nm.remove_cell_dns_forward.return_value = {'restarted': ['cell-dns (reloaded)'], 'warnings': []}
return nm
SAMPLE_INVITE = {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': '5.6.7.8:51820',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office.cell',
'version': 1,
}
class TestCellLinkManagerInvite(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_generate_invite_has_required_fields(self):
invite = self.mgr.generate_invite('mycell', 'home.cell')
for field in ('cell_name', 'public_key', 'endpoint', 'vpn_subnet', 'dns_ip', 'domain', 'version'):
self.assertIn(field, invite, f"Missing field: {field}")
def test_generate_invite_uses_wg_public_key(self):
invite = self.mgr.generate_invite('mycell', 'home.cell')
self.assertEqual(invite['public_key'], 'serverpubkey=')
def test_generate_invite_uses_configured_network(self):
invite = self.mgr.generate_invite('mycell', 'home.cell')
self.assertEqual(invite['vpn_subnet'], '10.0.0.0/24')
def test_generate_invite_dns_ip_is_server_vpn_ip(self):
invite = self.mgr.generate_invite('mycell', 'home.cell')
self.assertEqual(invite['dns_ip'], '10.0.0.1')
def test_generate_invite_uses_supplied_identity(self):
invite = self.mgr.generate_invite('myhome', 'myhome.local')
self.assertEqual(invite['cell_name'], 'myhome')
self.assertEqual(invite['domain'], 'myhome.local')
class TestCellLinkManagerConnections(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_add_connection_stores_link(self):
self.mgr.add_connection(SAMPLE_INVITE)
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['cell_name'], 'office')
def test_add_connection_calls_add_cell_peer(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.wg.add_cell_peer.assert_called_once_with(
name='office',
public_key='officepubkey=',
endpoint='5.6.7.8:51820',
vpn_subnet='10.1.0.0/24',
)
def test_add_connection_calls_dns_forward(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.nm.add_cell_dns_forward.assert_called_once_with(
domain='office.cell', dns_ip='10.1.0.1'
)
def test_add_connection_duplicate_raises(self):
self.mgr.add_connection(SAMPLE_INVITE)
with self.assertRaises(ValueError):
self.mgr.add_connection(SAMPLE_INVITE)
def test_add_connection_persists_to_disk(self):
self.mgr.add_connection(SAMPLE_INVITE)
# Create a fresh manager reading same dir
mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
links = mgr2.list_connections()
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['cell_name'], 'office')
def test_remove_connection_calls_wg_remove_peer(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.mgr.remove_connection('office')
self.wg.remove_peer.assert_called_once_with('officepubkey=')
def test_remove_connection_calls_dns_remove(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.mgr.remove_connection('office')
self.nm.remove_cell_dns_forward.assert_called_once_with('office.cell')
def test_remove_connection_deletes_from_list(self):
self.mgr.add_connection(SAMPLE_INVITE)
self.mgr.remove_connection('office')
self.assertEqual(len(self.mgr.list_connections()), 0)
def test_remove_nonexistent_raises(self):
with self.assertRaises(ValueError):
self.mgr.remove_connection('nobody')
def test_list_connections_empty_by_default(self):
self.assertEqual(self.mgr.list_connections(), [])
def test_multiple_connections(self):
self.mgr.add_connection(SAMPLE_INVITE)
second = {**SAMPLE_INVITE, 'cell_name': 'cabin', 'public_key': 'cabinpubkey=',
'vpn_subnet': '10.2.0.0/24', 'dns_ip': '10.2.0.1', 'domain': 'cabin.cell'}
self.mgr.add_connection(second)
self.assertEqual(len(self.mgr.list_connections()), 2)
# accept_invite — new connection
def test_accept_invite_adds_new_connection(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['cell_name'], 'office')
def test_accept_invite_idempotent_no_change(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.reset_mock()
self.mgr.accept_invite(SAMPLE_INVITE)
# No WG update for identical invite
self.wg.update_peer_ip.assert_not_called()
def test_accept_invite_updates_dns_ip_on_existing(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
updated_invite = {**SAMPLE_INVITE, 'dns_ip': '10.1.0.2'}
with patch('firewall_manager.apply_cell_rules'):
result = self.mgr.accept_invite(updated_invite)
self.assertEqual(result['dns_ip'], '10.1.0.2')
self.assertEqual(result['remote_api_url'], 'http://10.1.0.2:3000')
self.nm.remove_cell_dns_forward.assert_called()
self.nm.add_cell_dns_forward.assert_called_with(
domain='office.cell', dns_ip='10.1.0.2')
def test_accept_invite_updates_vpn_subnet_on_existing(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.update_peer_ip = MagicMock(return_value=True)
updated_invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.5.0.0/24'}
with patch('firewall_manager.apply_cell_rules'):
result = self.mgr.accept_invite(updated_invite)
self.assertEqual(result['vpn_subnet'], '10.5.0.0/24')
self.wg.update_peer_ip.assert_called_once_with('officepubkey=', '10.5.0.0/24')
def test_accept_invite_does_not_duplicate_link(self):
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.mgr.accept_invite({**SAMPLE_INVITE, 'dns_ip': '10.1.0.99'})
self.assertEqual(len(self.mgr.list_connections()), 1)
if __name__ == '__main__':
unittest.main()
# ---------------------------------------------------------------------------
# TestCheckInviteConflicts
# ---------------------------------------------------------------------------
class TestCheckInviteConflicts(unittest.TestCase):
"""Tests for CellLinkManager._check_invite_conflicts."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
# wg._get_configured_network returns '10.0.0.0/24' (own subnet)
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_existing_cell(self, cell_name='cabin', vpn_subnet='10.2.0.0/24',
domain='cabin.cell'):
"""Add a cell link directly to disk without going through add_connection."""
links = [{
'cell_name': cell_name,
'public_key': 'cabinpubkey=',
'endpoint': '9.9.9.9:51820',
'vpn_subnet': vpn_subnet,
'dns_ip': '10.2.0.1',
'domain': domain,
'permissions': {'inbound': {}, 'outbound': {}},
'remote_api_url': f'http://10.2.0.1:3000',
'pending_push': False,
'last_push_status': 'ok',
'last_push_at': None,
'last_push_error': None,
'last_remote_update_at': None,
}]
with open(os.path.join(self.test_dir, 'cell_links.json'), 'w') as f:
json.dump(links, f)
# --- subnet conflicts ---
def test_subnet_overlaps_own_subnet_raises(self):
"""Invite whose vpn_subnet overlaps our own subnet raises ValueError."""
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.0.0.0/24'} # same as own
with self.assertRaises(ValueError) as ctx:
self.mgr._check_invite_conflicts(invite)
self.assertIn('subnet', str(ctx.exception).lower())
def test_subnet_overlaps_own_subnet_partial_raises(self):
"""Invite whose vpn_subnet partially overlaps our own subnet raises ValueError."""
# Own is 10.0.0.0/24; this /16 contains it
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.0.0.0/16'}
with self.assertRaises(ValueError):
self.mgr._check_invite_conflicts(invite)
def test_subnet_overlaps_connected_cell_raises(self):
"""Invite whose vpn_subnet overlaps an already-connected cell raises ValueError."""
self._add_existing_cell(vpn_subnet='10.2.0.0/24')
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.2.0.0/24'}
with self.assertRaises(ValueError) as ctx:
self.mgr._check_invite_conflicts(invite)
self.assertIn('cabin', str(ctx.exception))
def test_subnet_no_conflict_does_not_raise(self):
"""Invite with a non-overlapping subnet passes without error."""
self._add_existing_cell(vpn_subnet='10.2.0.0/24')
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'other.cell'}
# Should not raise
self.mgr._check_invite_conflicts(invite)
# --- domain conflicts ---
def test_domain_matches_own_domain_raises(self):
"""Invite with a domain equal to this cell's own domain raises ValueError."""
with patch('cell_link_manager.CellLinkManager._check_invite_conflicts',
wraps=self.mgr._check_invite_conflicts):
# Patch config_manager inside the function
with patch('cell_link_manager.os.environ.get', return_value='home.cell'):
# Use a fresh invite whose domain matches env-derived own domain
invite = {**SAMPLE_INVITE,
'vpn_subnet': '10.3.0.0/24',
'domain': 'home.cell'}
# Manually test via app import patch
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
with self.assertRaises(ValueError) as ctx:
self.mgr._check_invite_conflicts(invite)
self.assertIn('domain', str(ctx.exception).lower())
def test_domain_matches_connected_cell_raises(self):
"""Invite with a domain already used by a connected cell raises ValueError."""
self._add_existing_cell(domain='cabin.cell', vpn_subnet='10.2.0.0/24')
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'cabin.cell'}
with self.assertRaises(ValueError) as ctx:
self.mgr._check_invite_conflicts(invite)
self.assertIn('cabin', str(ctx.exception))
# --- exclude_cell parameter ---
def test_exclude_cell_skips_that_cell_subnet_check(self):
"""With exclude_cell set, the named cell is skipped in subnet conflict check."""
self._add_existing_cell(cell_name='cabin', vpn_subnet='10.2.0.0/24',
domain='cabin.cell')
# Same subnet as cabin — normally a conflict, but excluded
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.2.0.0/24', 'domain': 'cabin.cell'}
# Should not raise because 'cabin' is excluded
self.mgr._check_invite_conflicts(invite, exclude_cell='cabin')
def test_exclude_cell_skips_that_cell_domain_check(self):
"""With exclude_cell set, the named cell is skipped in domain conflict check."""
self._add_existing_cell(cell_name='cabin', vpn_subnet='10.2.0.0/24',
domain='cabin.cell')
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.9.0.0/24', 'domain': 'cabin.cell'}
# Should not raise — cabin excluded
self.mgr._check_invite_conflicts(invite, exclude_cell='cabin')
def test_exclude_cell_still_checks_other_cells(self):
"""Excluding 'cabin' does not suppress conflict with a different cell."""
self._add_existing_cell(cell_name='cabin', vpn_subnet='10.2.0.0/24',
domain='cabin.cell')
# Add a second cell manually
with open(os.path.join(self.test_dir, 'cell_links.json')) as f:
links = json.load(f)
links.append({
'cell_name': 'office',
'public_key': 'officepubkey=',
'vpn_subnet': '10.3.0.0/24',
'dns_ip': '10.3.0.1',
'domain': 'office.cell',
'permissions': {'inbound': {}, 'outbound': {}},
'remote_api_url': 'http://10.3.0.1:3000',
'pending_push': False,
'last_push_status': 'ok',
'last_push_at': None,
'last_push_error': None,
'last_remote_update_at': None,
})
with open(os.path.join(self.test_dir, 'cell_links.json'), 'w') as f:
json.dump(links, f)
# Conflicts with 'office', but we only exclude 'cabin'
invite = {**SAMPLE_INVITE, 'vpn_subnet': '10.3.0.0/24', 'domain': 'new.cell'}
with self.assertRaises(ValueError):
self.mgr._check_invite_conflicts(invite, exclude_cell='cabin')
# ---------------------------------------------------------------------------
# TestPushInviteToRemote
# ---------------------------------------------------------------------------
class TestPushInviteToRemote(unittest.TestCase):
"""Tests for CellLinkManager._push_invite_to_remote."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _make_link(self, endpoint='192.168.1.50:51820'):
return {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': endpoint,
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office.cell',
'remote_api_url': 'http://10.1.0.1:3000',
}
def _fake_identity(self):
return {'cell_name': 'home', 'public_key': 'homepubkey='}
def test_push_invite_success_2xx_returns_ok_true(self):
"""curl returning a 2xx status code → {'ok': True}."""
link = self._make_link()
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '201'
mock_result.stderr = ''
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('cell_link_manager.os.environ.get', return_value='home.cell'), \
patch('subprocess.run', return_value=mock_result):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
result = self.mgr._push_invite_to_remote(link)
self.assertTrue(result['ok'])
def test_push_invite_4xx_returns_ok_false_with_http_error(self):
"""curl returning a 4xx status code → {'ok': False, 'error': 'HTTP 4xx'}."""
link = self._make_link()
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '400'
mock_result.stderr = ''
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('subprocess.run', return_value=mock_result):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
self.assertIn('400', result['error'])
def test_push_invite_no_endpoint_returns_ok_false(self):
"""Link with no endpoint → {'ok': False, 'error': 'no endpoint'}."""
link = self._make_link(endpoint='')
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
self.assertIn('endpoint', result['error'].lower())
def test_push_invite_none_endpoint_returns_ok_false(self):
"""Link with endpoint=None → {'ok': False, 'error': 'no endpoint'}."""
link = self._make_link(endpoint='')
link['endpoint'] = None
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
def test_push_invite_subprocess_error_returns_ok_false(self):
"""subprocess.run raising an exception → {'ok': False, 'error': ...}."""
link = self._make_link()
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('subprocess.run', side_effect=OSError('command not found')):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
self.assertIsNotNone(result['error'])
def test_push_invite_curl_nonzero_returncode_returns_ok_false(self):
"""curl subprocess returning nonzero returncode → {'ok': False}."""
link = self._make_link()
mock_result = MagicMock()
mock_result.returncode = 1
mock_result.stdout = ''
mock_result.stderr = 'connection refused'
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('subprocess.run', return_value=mock_result):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
result = self.mgr._push_invite_to_remote(link)
self.assertFalse(result['ok'])
def test_push_invite_sends_to_correct_lan_host(self):
"""The curl URL must use the LAN IP from the endpoint, not the WG dns_ip."""
link = self._make_link(endpoint='192.168.31.52:51820')
captured = {}
def fake_run(cmd, **kw):
captured['cmd'] = cmd
r = MagicMock()
r.returncode = 0
r.stdout = '201'
r.stderr = ''
return r
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value=self._fake_identity()), \
patch('subprocess.run', fake_run):
import sys
fake_cfg = MagicMock()
fake_cfg.configs = {'_identity': {'domain': 'home.cell'}}
with patch.dict(sys.modules, {'app': MagicMock(config_manager=fake_cfg)}):
self.mgr._push_invite_to_remote(link)
url_in_cmd = captured['cmd'][-1]
self.assertIn('192.168.31.52', url_in_cmd)
self.assertIn('accept-invite', url_in_cmd)
# Must NOT use the WG dns_ip (10.1.0.1)
self.assertNotIn('10.1.0.1', url_in_cmd)
# ---------------------------------------------------------------------------
# TestAcceptInviteNew
# ---------------------------------------------------------------------------
class TestAcceptInviteNew(unittest.TestCase):
"""Tests for CellLinkManager.accept_invite — new connection path."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_accept_invite_new_cell_adds_wg_peer(self):
"""accept_invite for a new cell calls add_cell_peer on WG manager."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.add_cell_peer.assert_called_once_with(
name='office',
public_key='officepubkey=',
endpoint='5.6.7.8:51820',
vpn_subnet='10.1.0.0/24',
)
def test_accept_invite_new_cell_adds_dns_forward(self):
"""accept_invite for a new cell calls add_cell_dns_forward on NM."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.nm.add_cell_dns_forward.assert_called_once_with(
domain='office.cell', dns_ip='10.1.0.1')
def test_accept_invite_new_cell_saves_link(self):
"""accept_invite for a new cell saves the link and returns it."""
with patch('firewall_manager.apply_cell_rules'):
link = self.mgr.accept_invite(SAMPLE_INVITE)
self.assertEqual(link['cell_name'], 'office')
self.assertEqual(len(self.mgr.list_connections()), 1)
def test_accept_invite_new_cell_sets_pending_push_true(self):
"""New link from accept_invite starts with pending_push=True (no push done)."""
with patch('firewall_manager.apply_cell_rules'):
link = self.mgr.accept_invite(SAMPLE_INVITE)
self.assertTrue(link['pending_push'])
def test_accept_invite_missing_cell_name_raises(self):
"""Invite missing 'cell_name' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'cell_name'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('cell_name', str(ctx.exception))
def test_accept_invite_missing_public_key_raises(self):
"""Invite missing 'public_key' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'public_key'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('public_key', str(ctx.exception))
def test_accept_invite_missing_vpn_subnet_raises(self):
"""Invite missing 'vpn_subnet' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'vpn_subnet'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('vpn_subnet', str(ctx.exception))
def test_accept_invite_missing_dns_ip_raises(self):
"""Invite missing 'dns_ip' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'dns_ip'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('dns_ip', str(ctx.exception))
def test_accept_invite_missing_domain_raises(self):
"""Invite missing 'domain' field raises ValueError."""
invite = {k: v for k, v in SAMPLE_INVITE.items() if k != 'domain'}
with self.assertRaises(ValueError) as ctx:
self.mgr.accept_invite(invite)
self.assertIn('domain', str(ctx.exception))
def test_accept_invite_subnet_conflict_raises(self):
"""accept_invite raises ValueError when subnet conflicts with own subnet."""
conflicting = {**SAMPLE_INVITE, 'vpn_subnet': '10.0.0.0/24'} # same as own
with self.assertRaises(ValueError):
self.mgr.accept_invite(conflicting)
def test_accept_invite_already_connected_no_change_returns_existing(self):
"""Calling accept_invite again with identical data returns existing link unchanged."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.reset_mock()
result = self.mgr.accept_invite(SAMPLE_INVITE)
self.assertEqual(result['cell_name'], 'office')
# No second WG peer add
self.wg.add_cell_peer.assert_not_called()
def test_accept_invite_already_connected_dns_ip_change_updates(self):
"""accept_invite with changed dns_ip updates the link and DNS forward."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
updated = {**SAMPLE_INVITE, 'dns_ip': '10.1.0.5'}
with patch('firewall_manager.apply_cell_rules'):
result = self.mgr.accept_invite(updated)
self.assertEqual(result['dns_ip'], '10.1.0.5')
self.assertEqual(result['remote_api_url'], 'http://10.1.0.5:3000')
self.nm.remove_cell_dns_forward.assert_called()
self.nm.add_cell_dns_forward.assert_called_with(
domain='office.cell', dns_ip='10.1.0.5')
def test_accept_invite_already_connected_dns_ip_change_does_not_duplicate(self):
"""DNS ip update via accept_invite must not create a second link."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.mgr.accept_invite({**SAMPLE_INVITE, 'dns_ip': '10.1.0.5'})
self.assertEqual(len(self.mgr.list_connections()), 1)
def test_accept_invite_already_connected_vpn_subnet_change_calls_update_peer_ip(self):
"""accept_invite with changed vpn_subnet calls update_peer_ip on WG manager."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.update_peer_ip = MagicMock(return_value=True)
updated = {**SAMPLE_INVITE, 'vpn_subnet': '10.5.0.0/24'}
with patch('firewall_manager.apply_cell_rules'):
result = self.mgr.accept_invite(updated)
self.assertEqual(result['vpn_subnet'], '10.5.0.0/24')
self.wg.update_peer_ip.assert_called_once_with('officepubkey=', '10.5.0.0/24')
def test_accept_invite_already_connected_vpn_subnet_change_reapplies_firewall(self):
"""accept_invite with changed vpn_subnet triggers apply_cell_rules."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.wg.update_peer_ip = MagicMock(return_value=True)
updated = {**SAMPLE_INVITE, 'vpn_subnet': '10.5.0.0/24'}
with patch('firewall_manager.apply_cell_rules') as mock_rules:
self.mgr.accept_invite(updated)
mock_rules.assert_called()
def test_accept_invite_does_not_duplicate_on_repeated_call(self):
"""Multiple calls with the same invite must leave exactly one link."""
with patch('firewall_manager.apply_cell_rules'):
self.mgr.accept_invite(SAMPLE_INVITE)
self.mgr.accept_invite(SAMPLE_INVITE)
self.assertEqual(len(self.mgr.list_connections()), 1)
# ---------------------------------------------------------------------------
# TestAddConnectionMutualPairing
# ---------------------------------------------------------------------------
class TestAddConnectionMutualPairing(unittest.TestCase):
"""Tests for add_connection's mutual pairing via _push_invite_to_remote."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_with_push(self, push_result):
push_mock = MagicMock(return_value=push_result)
perm_push = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._push_invite_to_remote', push_mock), \
patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', perm_push), \
patch('firewall_manager.apply_cell_rules'):
link = self.mgr.add_connection(SAMPLE_INVITE)
return link, push_mock
def test_add_connection_calls_push_invite_to_remote(self):
"""add_connection calls _push_invite_to_remote after adding the connection."""
_, push_mock = self._add_with_push({'ok': True, 'error': None})
push_mock.assert_called_once()
def test_add_connection_push_invite_failure_is_nonfatal(self):
"""_push_invite_to_remote failure does not prevent connection creation."""
link, _ = self._add_with_push({'ok': False, 'error': 'connection refused'})
conns = self.mgr.list_connections()
self.assertEqual(len(conns), 1)
self.assertEqual(conns[0]['cell_name'], 'office')
def test_add_connection_push_invite_failure_link_still_stored(self):
"""Even when push fails, the link is persisted to disk."""
_, _ = self._add_with_push({'ok': False, 'error': 'timeout'})
mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
self.assertEqual(len(mgr2.list_connections()), 1)
def test_add_connection_with_inbound_services_sets_permissions(self):
"""inbound_services passed to add_connection sets permissions correctly."""
perm_push = MagicMock(return_value={'ok': True, 'error': None})
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._push_invite_to_remote', push_mock), \
patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', perm_push), \
patch('firewall_manager.apply_cell_rules'):
link = self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['calendar'])
self.assertTrue(link['permissions']['inbound']['calendar'])
self.assertFalse(link['permissions']['inbound']['files'])
def test_add_connection_push_invite_exception_is_nonfatal(self):
"""Exception from _push_invite_to_remote is caught and does not raise."""
perm_push = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._push_invite_to_remote',
side_effect=RuntimeError('docker not available')), \
patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', perm_push), \
patch('firewall_manager.apply_cell_rules'):
link = self.mgr.add_connection(SAMPLE_INVITE)
self.assertEqual(link['cell_name'], 'office')
# ---------------------------------------------------------------------------
# TestAddConnectionAtomicity
# ---------------------------------------------------------------------------
class TestAddConnectionAtomicity(unittest.TestCase):
"""Verify that add_connection rolls back correctly when WG or DNS steps fail."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_wg_fail_does_not_call_dns(self):
"""When add_cell_peer returns False, add_cell_dns_forward must NOT be called."""
self.wg.add_cell_peer.return_value = False
with self.assertRaises(RuntimeError):
self.mgr.add_connection(SAMPLE_INVITE)
self.nm.add_cell_dns_forward.assert_not_called()
def test_wg_fail_does_not_persist_link(self):
"""When WG fails, list_connections() must still return [] (nothing persisted)."""
self.wg.add_cell_peer.return_value = False
with self.assertRaises(RuntimeError):
self.mgr.add_connection(SAMPLE_INVITE)
self.assertEqual(self.mgr.list_connections(), [])
def test_wg_fail_raises_runtime_error(self):
"""add_connection raises RuntimeError (not some other exception) when WG fails."""
self.wg.add_cell_peer.return_value = False
with self.assertRaises(RuntimeError):
self.mgr.add_connection(SAMPLE_INVITE)
def test_dns_warning_still_persists_link(self):
"""When DNS returns warnings (not a hard failure), the link IS still saved."""
self.nm.add_cell_dns_forward.return_value = {
'restarted': [],
'warnings': ['CoreDNS reload timed out'],
}
self.mgr.add_connection(SAMPLE_INVITE)
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
self.assertEqual(links[0]['cell_name'], 'office')
def test_dns_warning_does_not_raise(self):
"""When DNS returns warnings, add_connection completes without raising."""
self.nm.add_cell_dns_forward.return_value = {
'restarted': [],
'warnings': ['CoreDNS reload timed out'],
}
try:
self.mgr.add_connection(SAMPLE_INVITE)
except Exception as e:
self.fail(f"add_connection raised unexpectedly with DNS warnings: {e}")
# ---------------------------------------------------------------------------
# TestAddConnectionPermissions
# ---------------------------------------------------------------------------
class TestAddConnectionPermissions(unittest.TestCase):
"""Verify that inbound_services controls the permissions field on the saved link."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _get_link(self):
links = self.mgr.list_connections()
self.assertEqual(len(links), 1)
return links[0]
def test_add_with_no_inbound_defaults_all_deny(self):
"""No inbound_services arg → all inbound permissions False."""
self.mgr.add_connection(SAMPLE_INVITE)
link = self._get_link()
inbound = link['permissions']['inbound']
for service, allowed in inbound.items():
self.assertFalse(allowed, f"Expected {service} to be False, got {allowed}")
def test_add_with_inbound_services_sets_them(self):
"""inbound_services=['calendar','files'] → those two True, others False."""
self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['calendar', 'files'])
link = self._get_link()
inbound = link['permissions']['inbound']
self.assertTrue(inbound['calendar'])
self.assertTrue(inbound['files'])
self.assertFalse(inbound['mail'])
self.assertFalse(inbound['webdav'])
def test_inbound_invalid_service_ignored(self):
"""Passing 'badservice' in inbound_services does not appear in permissions."""
self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['badservice', 'calendar'])
link = self._get_link()
inbound = link['permissions']['inbound']
self.assertNotIn('badservice', inbound)
# valid one was still applied
self.assertTrue(inbound['calendar'])
# ---------------------------------------------------------------------------
# TestUpdatePermissions
# ---------------------------------------------------------------------------
class TestUpdatePermissions(unittest.TestCase):
"""Tests for the new update_permissions / get_permissions methods."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
# Add a connection so there is something to update
self.mgr.add_connection(SAMPLE_INVITE)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_update_sets_inbound_values(self):
"""update_permissions with inbound={'calendar': True} persists correctly."""
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
mock_fm.apply_cell_rules = MagicMock()
self.mgr.update_permissions('office', {'calendar': True}, {})
# Re-read from disk to confirm persistence
mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
perms = mgr2.get_permissions('office')
self.assertTrue(perms['inbound']['calendar'])
self.assertFalse(perms['inbound']['files'])
def test_update_rejects_unknown_service_by_cleaning_it_out(self):
"""update_permissions with inbound={'bad': True} — 'bad' must not appear in saved perms."""
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
mock_fm.apply_cell_rules = MagicMock()
self.mgr.update_permissions('office', {'bad': True, 'calendar': True}, {})
perms = self.mgr.get_permissions('office')
self.assertNotIn('bad', perms['inbound'])
self.assertTrue(perms['inbound']['calendar'])
def test_update_nonexistent_cell_raises(self):
"""update_permissions on an unknown cell_name raises ValueError."""
with self.assertRaises(ValueError):
self.mgr.update_permissions('nosuchcell', {}, {})
def test_get_permissions_returns_correct(self):
"""get_permissions returns the dict that was saved by update_permissions."""
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
mock_fm.apply_cell_rules = MagicMock()
self.mgr.update_permissions(
'office',
inbound={'calendar': True, 'files': False},
outbound={'mail': True},
)
perms = self.mgr.get_permissions('office')
self.assertIn('inbound', perms)
self.assertIn('outbound', perms)
self.assertTrue(perms['inbound']['calendar'])
self.assertFalse(perms['inbound']['files'])
self.assertTrue(perms['outbound']['mail'])
def test_get_permissions_nonexistent_cell_raises(self):
"""get_permissions on an unknown cell_name raises ValueError."""
with self.assertRaises(ValueError):
self.mgr.get_permissions('nosuchcell')
# ---------------------------------------------------------------------------
# TestLoadMigration
# ---------------------------------------------------------------------------
class TestLoadMigration(unittest.TestCase):
"""Verify _load() lazily injects permissions field when it is missing."""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_load_injects_permissions_if_missing(self):
"""Write cell_links.json without permissions; _load should add all-False defaults."""
links_file = os.path.join(self.test_dir, 'cell_links.json')
legacy_links = [
{
'cell_name': 'legacy-office',
'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'legacy-office.cell',
# NO 'permissions' key — simulates pre-migration data
}
]
with open(links_file, 'w') as f:
json.dump(legacy_links, f)
mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
links = mgr.list_connections()
self.assertEqual(len(links), 1)
link = links[0]
self.assertIn('permissions', link)
perms = link['permissions']
self.assertIn('inbound', perms)
self.assertIn('outbound', perms)
for service in ('calendar', 'files', 'mail', 'webdav'):
self.assertFalse(perms['inbound'][service])
self.assertFalse(perms['outbound'][service])
def test_load_migration_persists_to_disk(self):
"""After migration, re-loading the same file returns the injected permissions."""
links_file = os.path.join(self.test_dir, 'cell_links.json')
with open(links_file, 'w') as f:
json.dump([{
'cell_name': 'old-cell',
'public_key': 'somepubkey=',
'vpn_subnet': '10.2.0.0/24',
'dns_ip': '10.2.0.1',
'domain': 'old-cell.cell',
}], f)
mgr1 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
mgr1.list_connections() # triggers migration + save
# Read the file directly and confirm permissions are now on disk
with open(links_file) as f:
raw = json.load(f)
self.assertIn('permissions', raw[0])
class TestPermissionSync(unittest.TestCase):
"""Tests for Phase 1: permission sync between connected PIC cells."""
INVITE = {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': '5.6.7.8:51820',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office.cell',
'version': 1,
}
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.wg = _make_wg_mock()
self.nm = _make_nm_mock()
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_office(self, push_ok=True):
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': push_ok, 'error': None if push_ok else 'conn refused'}), \
patch('firewall_manager.apply_cell_rules'):
return self.mgr.add_connection(self.INVITE)
# ── add_connection ────────────────────────────────────────────────────────
def test_add_connection_includes_sync_fields(self):
link = self._add_office()
self.assertIn('remote_api_url', link)
self.assertIn('pending_push', link)
self.assertIn('last_push_status', link)
self.assertIn('last_push_at', link)
self.assertIn('last_remote_update_at', link)
def test_add_connection_sets_remote_api_url_from_dns_ip(self):
link = self._add_office()
self.assertEqual(link['remote_api_url'], 'http://10.1.0.1:3000')
def test_add_connection_triggers_push(self):
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \
patch('firewall_manager.apply_cell_rules'):
self.mgr.add_connection(self.INVITE)
push_mock.assert_called_once()
call_args = push_mock.call_args[0]
self.assertEqual(call_args[1], 'home') # from_cell
self.assertEqual(call_args[2], 'homepubkey=') # from_public_key
def test_add_connection_push_failure_does_not_abort_add(self):
link = self._add_office(push_ok=False)
conns = self.mgr.list_connections()
self.assertEqual(len(conns), 1)
self.assertEqual(conns[0]['cell_name'], 'office')
self.assertTrue(conns[0]['pending_push'])
def test_add_connection_push_success_clears_pending(self):
self._add_office(push_ok=True)
link = self.mgr.list_connections()[0]
self.assertFalse(link['pending_push'])
self.assertEqual(link['last_push_status'], 'ok')
# ── update_permissions ────────────────────────────────────────────────────
def test_update_permissions_push_succeeds_clears_pending(self):
self._add_office()
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \
patch('firewall_manager.apply_cell_rules'):
self.mgr.update_permissions('office',
{'calendar': True}, {'files': False})
link = self.mgr.list_connections()[0]
self.assertFalse(link['pending_push'])
self.assertEqual(link['last_push_status'], 'ok')
self.assertIsNotNone(link['last_push_at'])
def test_update_permissions_push_failure_keeps_local_save(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': False, 'error': 'timeout'}), \
patch('firewall_manager.apply_cell_rules'):
result = self.mgr.update_permissions('office',
{'calendar': True}, {})
# Local save must have happened — calendar is True
self.assertTrue(result['permissions']['inbound']['calendar'])
link = self.mgr.list_connections()[0]
self.assertTrue(link['pending_push'])
self.assertEqual(link['last_push_status'], 'failed')
def test_update_permissions_does_not_raise_on_push_exception(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._local_identity',
side_effect=RuntimeError('no app context')), \
patch('firewall_manager.apply_cell_rules'):
# Must not raise
result = self.mgr.update_permissions('office', {}, {})
self.assertIn('permissions', result)
# ── _push_permissions_to_remote (unit) ────────────────────────────────────
def test_push_mirrors_inbound_outbound(self):
"""Our inbound (what we share) must become their outbound in the body."""
self._add_office()
link = self.mgr.list_connections()[0]
link['permissions'] = {
'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False},
}
sent_body = {}
def fake_run(cmd, **kwargs):
import json as _j
if '-d' not in cmd:
# ip addr show wg0 — return a fake wg0 address
r = MagicMock()
r.returncode = 0
r.stdout = 'inet 10.0.0.1/24 scope global wg0\n'
return r
d_idx = cmd.index('-d')
sent_body.update(_j.loads(cmd[d_idx + 1]))
r = MagicMock()
r.returncode = 0
r.stdout = '200'
r.stderr = ''
return r
with patch('subprocess.run', fake_run):
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertTrue(result['ok'])
pushed_perms = sent_body['permissions']
# Our inbound=calendar:True → their outbound=calendar:True
self.assertTrue(pushed_perms['outbound']['calendar'])
# Our outbound=files:True → their inbound=files:True
self.assertTrue(pushed_perms['inbound']['files'])
def test_push_xff_header_carries_local_wg_ip(self):
"""Curl command must include X-Forwarded-For with local WG IP.
MASQUERADE rewrites source to Docker bridge IP. Without XFF the remote
can't match the sender's VPN subnet and returns 403.
"""
self._add_office()
link = self.mgr.list_connections()[0]
captured_cmd = {}
def fake_run(cmd, **kwargs):
if '-d' not in cmd:
r = MagicMock()
r.returncode = 0
r.stdout = ' inet 10.0.0.1/24 scope global wg0\n'
return r
captured_cmd['cmd'] = cmd
r = MagicMock()
r.returncode = 0
r.stdout = '200'
r.stderr = ''
return r
with patch('subprocess.run', fake_run):
self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
cmd = captured_cmd.get('cmd', [])
# X-Forwarded-For header must be in the curl command
self.assertIn('-H', cmd)
xff_idx = [i for i, x in enumerate(cmd) if x == '-H' and i + 1 < len(cmd) and 'X-Forwarded-For' in cmd[i + 1]]
self.assertTrue(xff_idx, 'X-Forwarded-For header missing from curl command')
xff_val = cmd[xff_idx[0] + 1]
self.assertIn('10.0.0.1', xff_val)
def test_push_http_error_returns_not_ok(self):
self._add_office()
link = self.mgr.list_connections()[0]
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '503'
mock_result.stderr = ''
def fake_run(cmd, **kwargs):
if '-d' not in cmd:
r = MagicMock()
r.returncode = 0
r.stdout = ' inet 10.0.0.1/24 scope global wg0\n'
return r
return mock_result
with patch('subprocess.run', fake_run):
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertFalse(result['ok'])
self.assertIn('503', result['error'])
def test_push_no_remote_api_url_returns_not_ok(self):
self._add_office()
link = self.mgr.list_connections()[0]
link['remote_api_url'] = None
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertFalse(result['ok'])
# ── apply_remote_permissions ──────────────────────────────────────────────
def test_apply_remote_permissions_stores_by_pubkey(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules'):
updated = self.mgr.apply_remote_permissions(
'officepubkey=',
{'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False}},
)
self.assertTrue(updated['permissions']['inbound']['calendar'])
self.assertTrue(updated['permissions']['outbound']['files'])
# Persisted to disk
link = self.mgr.list_connections()[0]
self.assertTrue(link['permissions']['inbound']['calendar'])
self.assertIsNotNone(link['last_remote_update_at'])
def test_apply_remote_permissions_unknown_pubkey_raises(self):
self._add_office()
with self.assertRaises(ValueError):
self.mgr.apply_remote_permissions('nosuchkey=', {})
def test_apply_remote_permissions_calls_apply_cell_rules(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules') as mock_rules:
self.mgr.apply_remote_permissions(
'officepubkey=',
{'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {}},
)
mock_rules.assert_called_once_with('office', '10.1.0.0/24', ['calendar'],
exit_relay=False)
# ── replay_pending_pushes ─────────────────────────────────────────────────
def test_replay_retries_pending_links(self):
self._add_office(push_ok=False) # leaves pending_push=True + next_retry_at set
links = self.mgr._load()
links[0]['next_retry_at'] = None # simulate backoff window elapsed
self.mgr._save(links)
self.assertTrue(self.mgr.list_connections()[0]['pending_push'])
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock):
summary = self.mgr.replay_pending_pushes()
push_mock.assert_called_once()
self.assertEqual(summary['attempted'], 1)
self.assertEqual(summary['ok'], 1)
self.assertFalse(self.mgr.list_connections()[0]['pending_push'])
def test_replay_skips_non_pending_links(self):
self._add_office(push_ok=True) # pending_push=False after success
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock):
summary = self.mgr.replay_pending_pushes()
push_mock.assert_not_called()
self.assertEqual(summary['attempted'], 0)
def test_replay_push_failure_leaves_pending(self):
self._add_office(push_ok=False)
links = self.mgr._load()
links[0]['next_retry_at'] = None # simulate backoff window elapsed
self.mgr._save(links)
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': False, 'error': 'timeout'}):
summary = self.mgr.replay_pending_pushes()
self.assertEqual(summary['failed'], 1)
self.assertTrue(self.mgr.list_connections()[0]['pending_push'])
def test_replay_identity_failure_returns_empty_summary(self):
self._add_office(push_ok=False)
with patch('cell_link_manager.CellLinkManager._local_identity',
side_effect=RuntimeError('no app context')):
summary = self.mgr.replay_pending_pushes()
self.assertEqual(summary['attempted'], 0)
# ── _load migration ───────────────────────────────────────────────────────
def test_load_migration_injects_sync_fields_on_legacy_record(self):
legacy = [{
'cell_name': 'office',
'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office.cell',
'permissions': {'inbound': {}, 'outbound': {}},
}]
links_file = os.path.join(self.test_dir, 'cell_links.json')
with open(links_file, 'w') as f:
json.dump(legacy, f)
links = self.mgr.list_connections()
link = links[0]
self.assertIn('remote_api_url', link)
self.assertIn('pending_push', link)
self.assertIn('last_push_status', link)
self.assertIn('last_push_at', link)
self.assertIn('last_remote_update_at', link)
self.assertEqual(link['remote_api_url'], 'http://10.1.0.1:3000')
self.assertTrue(link['pending_push']) # pre-existing → marked pending
self.assertEqual(link['last_push_status'], 'never')
# Fields persisted to disk after migration
with open(links_file) as f:
raw = json.load(f)
self.assertIn('pending_push', raw[0])
class TestExitOffer(unittest.TestCase):
"""Tests for Phase 2: exit-offer signaling."""
INVITE = {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': '5.5.5.5:51820',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office',
}
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.data_dir = os.path.join(self.test_dir, 'data')
self.config_dir = os.path.join(self.test_dir, 'config')
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.config_dir, exist_ok=True)
self.wg = _make_wg_mock()
self.net = _make_nm_mock()
self.mgr = CellLinkManager(self.data_dir, self.config_dir, self.wg, self.net)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_office(self, push_ok=True):
push_result = {'ok': push_ok, 'error': None if push_ok else 'timeout'}
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value=push_result), \
patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value='10.0.0.1'), \
patch('firewall_manager.apply_cell_rules'):
self.mgr.add_connection(self.INVITE)
def test_new_links_default_exit_offered_false(self):
self._add_office()
link = self.mgr.list_connections()[0]
self.assertFalse(link.get('exit_offered'))
def test_new_links_default_remote_exit_offered_false(self):
self._add_office()
link = self.mgr.list_connections()[0]
self.assertFalse(link.get('remote_exit_offered'))
def test_set_exit_offered_persists(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': True, 'error': None}), \
patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value='10.0.0.1'):
result = self.mgr.set_exit_offered('office', True)
self.assertTrue(result['exit_offered'])
link = self.mgr.list_connections()[0]
self.assertTrue(link['exit_offered'])
def test_set_exit_offered_triggers_push(self):
self._add_office()
push_mock = MagicMock(return_value={'ok': True, 'error': None})
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \
patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value='10.0.0.1'):
self.mgr.set_exit_offered('office', True)
push_mock.assert_called_once()
def test_set_exit_offered_unknown_cell_raises(self):
with self.assertRaises(ValueError):
self.mgr.set_exit_offered('nobody', True)
def test_push_includes_exit_offered(self):
self._add_office()
link = self.mgr.list_connections()[0]
link['exit_offered'] = True
captured = {}
def fake_run(cmd, **kwargs):
if '-d' not in cmd:
r = MagicMock()
r.returncode = 0
r.stdout = ' inet 10.0.0.1/24 scope global wg0\n'
return r
import json as _j
d_idx = cmd.index('-d')
captured['body'] = _j.loads(cmd[d_idx + 1])
r = MagicMock()
r.returncode = 0
r.stdout = '200'
r.stderr = ''
return r
with patch('subprocess.run', fake_run):
self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
self.assertTrue(captured['body']['exit_offered'])
def test_apply_remote_permissions_stores_remote_exit_offered(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules'):
self.mgr.apply_remote_permissions(
'officepubkey=',
{'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
'outbound': {}},
exit_offered=True,
)
link = self.mgr.list_connections()[0]
self.assertTrue(link['remote_exit_offered'])
def test_migration_adds_exit_fields_to_existing_links(self):
"""Existing cell_links.json without exit fields get them on load."""
raw = [{'cell_name': 'office', 'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1',
'domain': 'office', 'endpoint': '5.5.5.5:51820',
'permissions': {'inbound': {}, 'outbound': {}},
'remote_api_url': 'http://10.1.0.1:3000',
'last_push_status': 'ok', 'last_push_at': None,
'last_push_error': None, 'pending_push': False,
'last_remote_update_at': None}]
with open(os.path.join(self.data_dir, 'cell_links.json'), 'w') as f:
json.dump(raw, f)
links = self.mgr.list_connections()
self.assertIn('exit_offered', links[0])
self.assertIn('remote_exit_offered', links[0])
self.assertFalse(links[0]['exit_offered'])
self.assertFalse(links[0]['remote_exit_offered'])
class TestExitRelay(unittest.TestCase):
"""Tests for Phase 3: per-peer internet routing via exit cell."""
INVITE = {
'cell_name': 'office',
'public_key': 'officepubkey=',
'endpoint': '5.5.5.5:51820',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'office',
}
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.data_dir = os.path.join(self.test_dir, 'data')
self.config_dir = os.path.join(self.test_dir, 'config')
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.config_dir, exist_ok=True)
self.wg = _make_wg_mock()
self.net = _make_nm_mock()
self.mgr = CellLinkManager(self.data_dir, self.config_dir, self.wg, self.net)
def tearDown(self):
shutil.rmtree(self.test_dir)
def _add_office(self):
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
return_value={'ok': True, 'error': None}):
self.mgr.add_connection(self.INVITE)
def test_phase3_migration_adds_exit_relay_fields(self):
"""Existing links without Phase 3 fields get them on load."""
raw = [{'cell_name': 'office', 'public_key': 'officepubkey=',
'vpn_subnet': '10.1.0.0/24', 'dns_ip': '10.1.0.1',
'domain': 'office', 'endpoint': '5.5.5.5:51820',
'permissions': {'inbound': {}, 'outbound': {}},
'remote_api_url': 'http://10.1.0.1:3000',
'last_push_status': 'ok', 'last_push_at': None,
'last_push_error': None, 'pending_push': False,
'last_remote_update_at': None,
'exit_offered': False, 'remote_exit_offered': False}]
with open(os.path.join(self.data_dir, 'cell_links.json'), 'w') as f:
json.dump(raw, f)
links = self.mgr.list_connections()
self.assertIn('exit_relay_active', links[0])
self.assertIn('remote_exit_relay_active', links[0])
self.assertFalse(links[0]['exit_relay_active'])
self.assertFalse(links[0]['remote_exit_relay_active'])
def test_set_exit_relay_active_persists(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._try_push'):
link = self.mgr.set_exit_relay_active('office', True)
self.assertTrue(link['exit_relay_active'])
self.assertTrue(self.mgr.list_connections()[0]['exit_relay_active'])
def test_set_exit_relay_active_false_persists(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._try_push'):
self.mgr.set_exit_relay_active('office', True)
link = self.mgr.set_exit_relay_active('office', False)
self.assertFalse(link['exit_relay_active'])
def test_set_exit_relay_active_triggers_push(self):
self._add_office()
push_mock = MagicMock()
with patch('cell_link_manager.CellLinkManager._try_push', push_mock):
self.mgr.set_exit_relay_active('office', True)
push_mock.assert_called_once()
def test_set_exit_relay_active_unknown_cell_raises(self):
with self.assertRaises(ValueError):
self.mgr.set_exit_relay_active('nobody', True)
def test_push_includes_use_as_exit_relay_true(self):
self._add_office()
with patch('cell_link_manager.CellLinkManager._try_push'):
self.mgr.set_exit_relay_active('office', True)
captured = []
def fake_run(cmd, **kw):
captured.append(cmd)
r = MagicMock()
r.returncode = 0
r.stdout = '200'
return r
import json as _json
with patch('cell_link_manager.CellLinkManager._local_identity',
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
patch('cell_link_manager.subprocess.run', side_effect=fake_run), \
patch('cell_link_manager.CellLinkManager._local_wg_ip', return_value=None):
self.mgr._try_push('office', self.mgr.list_connections()[0])
flat = [arg for cmd in captured for arg in cmd]
payload_str = next(a for a in flat if a.startswith('{'))
body = _json.loads(payload_str)
self.assertIn('use_as_exit_relay', body)
self.assertTrue(body['use_as_exit_relay'])
def test_apply_remote_permissions_stores_remote_exit_relay_active(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules'):
self.mgr.apply_remote_permissions('officepubkey=', {},
use_as_exit_relay=True)
link = self.mgr.list_connections()[0]
self.assertTrue(link['remote_exit_relay_active'])
def test_apply_remote_permissions_calls_apply_cell_rules_with_exit_relay_true(self):
self._add_office()
with patch('firewall_manager.apply_cell_rules') as mock_rules:
self.mgr.apply_remote_permissions('officepubkey=', {},
use_as_exit_relay=True)
mock_rules.assert_called_once_with('office', '10.1.0.0/24', [],
exit_relay=True)
if __name__ == '__main__':
unittest.main()