a3d0cd5a48
When PIC A updates service sharing permissions, it immediately pushes the mirrored state to PIC B over the WireGuard tunnel so B's UI shows what A is sharing with it in real time. Architecture: - Push model: update_permissions() → _push_permissions_to_remote() → POST /api/cells/peer-sync/permissions on remote cell - Auth: source IP must be inside a known cell's vpn_subnet (WireGuard tunnel proves identity) + body's from_public_key must match stored key - Mirror semantics: our inbound (what we share) → their outbound view - Non-fatal: push failures set pending_push=True; replay_pending_pushes() retries at startup so offline cells catch up on reconnect - add_connection() also pushes initial state so remote sees permissions immediately on the first connect New fields on cell_links.json records (lazy-migrated): remote_api_url, last_push_status, last_push_at, last_push_error, pending_push, last_remote_update_at New endpoint: POST /api/cells/peer-sync/permissions 30 new tests (1101 total). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
671 lines
28 KiB
Python
671 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
"""Unit tests for CellLinkManager (cell-to-cell VPN connections)."""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
import unittest
|
|
import tempfile
|
|
import os
|
|
import json
|
|
import shutil
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from cell_link_manager import CellLinkManager
|
|
|
|
|
|
def _make_wg_mock():
|
|
wg = MagicMock()
|
|
wg.get_keys.return_value = {'public_key': 'serverpubkey=', 'private_key': 'serverprivkey='}
|
|
wg.get_server_config.return_value = {
|
|
'endpoint': '1.2.3.4:51820', 'port': 51820,
|
|
'dns_ip': '10.0.0.3', 'split_tunnel_ips': '10.0.0.0/24, 172.20.0.0/16',
|
|
}
|
|
wg._get_configured_network.return_value = '10.0.0.0/24'
|
|
wg._get_configured_address.return_value = '10.0.0.1/24'
|
|
wg.add_cell_peer.return_value = True
|
|
wg.remove_peer.return_value = True
|
|
return wg
|
|
|
|
|
|
def _make_nm_mock():
|
|
nm = MagicMock()
|
|
nm.add_cell_dns_forward.return_value = {'restarted': ['cell-dns (reloaded)'], 'warnings': []}
|
|
nm.remove_cell_dns_forward.return_value = {'restarted': ['cell-dns (reloaded)'], 'warnings': []}
|
|
return nm
|
|
|
|
|
|
SAMPLE_INVITE = {
|
|
'cell_name': 'office',
|
|
'public_key': 'officepubkey=',
|
|
'endpoint': '5.6.7.8:51820',
|
|
'vpn_subnet': '10.1.0.0/24',
|
|
'dns_ip': '10.1.0.1',
|
|
'domain': 'office.cell',
|
|
'version': 1,
|
|
}
|
|
|
|
|
|
class TestCellLinkManagerInvite(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.wg = _make_wg_mock()
|
|
self.nm = _make_nm_mock()
|
|
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_generate_invite_has_required_fields(self):
|
|
invite = self.mgr.generate_invite('mycell', 'home.cell')
|
|
for field in ('cell_name', 'public_key', 'endpoint', 'vpn_subnet', 'dns_ip', 'domain', 'version'):
|
|
self.assertIn(field, invite, f"Missing field: {field}")
|
|
|
|
def test_generate_invite_uses_wg_public_key(self):
|
|
invite = self.mgr.generate_invite('mycell', 'home.cell')
|
|
self.assertEqual(invite['public_key'], 'serverpubkey=')
|
|
|
|
def test_generate_invite_uses_configured_network(self):
|
|
invite = self.mgr.generate_invite('mycell', 'home.cell')
|
|
self.assertEqual(invite['vpn_subnet'], '10.0.0.0/24')
|
|
|
|
def test_generate_invite_dns_ip_is_server_vpn_ip(self):
|
|
invite = self.mgr.generate_invite('mycell', 'home.cell')
|
|
self.assertEqual(invite['dns_ip'], '10.0.0.1')
|
|
|
|
def test_generate_invite_uses_supplied_identity(self):
|
|
invite = self.mgr.generate_invite('myhome', 'myhome.local')
|
|
self.assertEqual(invite['cell_name'], 'myhome')
|
|
self.assertEqual(invite['domain'], 'myhome.local')
|
|
|
|
|
|
class TestCellLinkManagerConnections(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.wg = _make_wg_mock()
|
|
self.nm = _make_nm_mock()
|
|
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_add_connection_stores_link(self):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
links = self.mgr.list_connections()
|
|
self.assertEqual(len(links), 1)
|
|
self.assertEqual(links[0]['cell_name'], 'office')
|
|
|
|
def test_add_connection_calls_add_cell_peer(self):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
self.wg.add_cell_peer.assert_called_once_with(
|
|
name='office',
|
|
public_key='officepubkey=',
|
|
endpoint='5.6.7.8:51820',
|
|
vpn_subnet='10.1.0.0/24',
|
|
)
|
|
|
|
def test_add_connection_calls_dns_forward(self):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
self.nm.add_cell_dns_forward.assert_called_once_with(
|
|
domain='office.cell', dns_ip='10.1.0.1'
|
|
)
|
|
|
|
def test_add_connection_duplicate_raises(self):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
with self.assertRaises(ValueError):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
|
|
def test_add_connection_persists_to_disk(self):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
# Create a fresh manager reading same dir
|
|
mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
links = mgr2.list_connections()
|
|
self.assertEqual(len(links), 1)
|
|
self.assertEqual(links[0]['cell_name'], 'office')
|
|
|
|
def test_remove_connection_calls_wg_remove_peer(self):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
self.mgr.remove_connection('office')
|
|
self.wg.remove_peer.assert_called_once_with('officepubkey=')
|
|
|
|
def test_remove_connection_calls_dns_remove(self):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
self.mgr.remove_connection('office')
|
|
self.nm.remove_cell_dns_forward.assert_called_once_with('office.cell')
|
|
|
|
def test_remove_connection_deletes_from_list(self):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
self.mgr.remove_connection('office')
|
|
self.assertEqual(len(self.mgr.list_connections()), 0)
|
|
|
|
def test_remove_nonexistent_raises(self):
|
|
with self.assertRaises(ValueError):
|
|
self.mgr.remove_connection('nobody')
|
|
|
|
def test_list_connections_empty_by_default(self):
|
|
self.assertEqual(self.mgr.list_connections(), [])
|
|
|
|
def test_multiple_connections(self):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
second = {**SAMPLE_INVITE, 'cell_name': 'cabin', 'public_key': 'cabinpubkey=',
|
|
'vpn_subnet': '10.2.0.0/24', 'dns_ip': '10.2.0.1', 'domain': 'cabin.cell'}
|
|
self.mgr.add_connection(second)
|
|
self.assertEqual(len(self.mgr.list_connections()), 2)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestAddConnectionAtomicity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAddConnectionAtomicity(unittest.TestCase):
|
|
"""Verify that add_connection rolls back correctly when WG or DNS steps fail."""
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.wg = _make_wg_mock()
|
|
self.nm = _make_nm_mock()
|
|
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_wg_fail_does_not_call_dns(self):
|
|
"""When add_cell_peer returns False, add_cell_dns_forward must NOT be called."""
|
|
self.wg.add_cell_peer.return_value = False
|
|
with self.assertRaises(RuntimeError):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
self.nm.add_cell_dns_forward.assert_not_called()
|
|
|
|
def test_wg_fail_does_not_persist_link(self):
|
|
"""When WG fails, list_connections() must still return [] (nothing persisted)."""
|
|
self.wg.add_cell_peer.return_value = False
|
|
with self.assertRaises(RuntimeError):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
self.assertEqual(self.mgr.list_connections(), [])
|
|
|
|
def test_wg_fail_raises_runtime_error(self):
|
|
"""add_connection raises RuntimeError (not some other exception) when WG fails."""
|
|
self.wg.add_cell_peer.return_value = False
|
|
with self.assertRaises(RuntimeError):
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
|
|
def test_dns_warning_still_persists_link(self):
|
|
"""When DNS returns warnings (not a hard failure), the link IS still saved."""
|
|
self.nm.add_cell_dns_forward.return_value = {
|
|
'restarted': [],
|
|
'warnings': ['CoreDNS reload timed out'],
|
|
}
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
links = self.mgr.list_connections()
|
|
self.assertEqual(len(links), 1)
|
|
self.assertEqual(links[0]['cell_name'], 'office')
|
|
|
|
def test_dns_warning_does_not_raise(self):
|
|
"""When DNS returns warnings, add_connection completes without raising."""
|
|
self.nm.add_cell_dns_forward.return_value = {
|
|
'restarted': [],
|
|
'warnings': ['CoreDNS reload timed out'],
|
|
}
|
|
try:
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
except Exception as e:
|
|
self.fail(f"add_connection raised unexpectedly with DNS warnings: {e}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestAddConnectionPermissions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAddConnectionPermissions(unittest.TestCase):
|
|
"""Verify that inbound_services controls the permissions field on the saved link."""
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.wg = _make_wg_mock()
|
|
self.nm = _make_nm_mock()
|
|
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def _get_link(self):
|
|
links = self.mgr.list_connections()
|
|
self.assertEqual(len(links), 1)
|
|
return links[0]
|
|
|
|
def test_add_with_no_inbound_defaults_all_deny(self):
|
|
"""No inbound_services arg → all inbound permissions False."""
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
link = self._get_link()
|
|
inbound = link['permissions']['inbound']
|
|
for service, allowed in inbound.items():
|
|
self.assertFalse(allowed, f"Expected {service} to be False, got {allowed}")
|
|
|
|
def test_add_with_inbound_services_sets_them(self):
|
|
"""inbound_services=['calendar','files'] → those two True, others False."""
|
|
self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['calendar', 'files'])
|
|
link = self._get_link()
|
|
inbound = link['permissions']['inbound']
|
|
self.assertTrue(inbound['calendar'])
|
|
self.assertTrue(inbound['files'])
|
|
self.assertFalse(inbound['mail'])
|
|
self.assertFalse(inbound['webdav'])
|
|
|
|
def test_inbound_invalid_service_ignored(self):
|
|
"""Passing 'badservice' in inbound_services does not appear in permissions."""
|
|
self.mgr.add_connection(SAMPLE_INVITE, inbound_services=['badservice', 'calendar'])
|
|
link = self._get_link()
|
|
inbound = link['permissions']['inbound']
|
|
self.assertNotIn('badservice', inbound)
|
|
# valid one was still applied
|
|
self.assertTrue(inbound['calendar'])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestUpdatePermissions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUpdatePermissions(unittest.TestCase):
|
|
"""Tests for the new update_permissions / get_permissions methods."""
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.wg = _make_wg_mock()
|
|
self.nm = _make_nm_mock()
|
|
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
# Add a connection so there is something to update
|
|
self.mgr.add_connection(SAMPLE_INVITE)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_update_sets_inbound_values(self):
|
|
"""update_permissions with inbound={'calendar': True} persists correctly."""
|
|
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
|
|
mock_fm.apply_cell_rules = MagicMock()
|
|
self.mgr.update_permissions('office', {'calendar': True}, {})
|
|
# Re-read from disk to confirm persistence
|
|
mgr2 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
perms = mgr2.get_permissions('office')
|
|
self.assertTrue(perms['inbound']['calendar'])
|
|
self.assertFalse(perms['inbound']['files'])
|
|
|
|
def test_update_rejects_unknown_service_by_cleaning_it_out(self):
|
|
"""update_permissions with inbound={'bad': True} — 'bad' must not appear in saved perms."""
|
|
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
|
|
mock_fm.apply_cell_rules = MagicMock()
|
|
self.mgr.update_permissions('office', {'bad': True, 'calendar': True}, {})
|
|
perms = self.mgr.get_permissions('office')
|
|
self.assertNotIn('bad', perms['inbound'])
|
|
self.assertTrue(perms['inbound']['calendar'])
|
|
|
|
def test_update_nonexistent_cell_raises(self):
|
|
"""update_permissions on an unknown cell_name raises ValueError."""
|
|
with self.assertRaises(ValueError):
|
|
self.mgr.update_permissions('nosuchcell', {}, {})
|
|
|
|
def test_get_permissions_returns_correct(self):
|
|
"""get_permissions returns the dict that was saved by update_permissions."""
|
|
with patch('cell_link_manager.firewall_manager', create=True) as mock_fm:
|
|
mock_fm.apply_cell_rules = MagicMock()
|
|
self.mgr.update_permissions(
|
|
'office',
|
|
inbound={'calendar': True, 'files': False},
|
|
outbound={'mail': True},
|
|
)
|
|
perms = self.mgr.get_permissions('office')
|
|
self.assertIn('inbound', perms)
|
|
self.assertIn('outbound', perms)
|
|
self.assertTrue(perms['inbound']['calendar'])
|
|
self.assertFalse(perms['inbound']['files'])
|
|
self.assertTrue(perms['outbound']['mail'])
|
|
|
|
def test_get_permissions_nonexistent_cell_raises(self):
|
|
"""get_permissions on an unknown cell_name raises ValueError."""
|
|
with self.assertRaises(ValueError):
|
|
self.mgr.get_permissions('nosuchcell')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestLoadMigration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestLoadMigration(unittest.TestCase):
|
|
"""Verify _load() lazily injects permissions field when it is missing."""
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.wg = _make_wg_mock()
|
|
self.nm = _make_nm_mock()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_load_injects_permissions_if_missing(self):
|
|
"""Write cell_links.json without permissions; _load should add all-False defaults."""
|
|
links_file = os.path.join(self.test_dir, 'cell_links.json')
|
|
legacy_links = [
|
|
{
|
|
'cell_name': 'legacy-office',
|
|
'public_key': 'officepubkey=',
|
|
'vpn_subnet': '10.1.0.0/24',
|
|
'dns_ip': '10.1.0.1',
|
|
'domain': 'legacy-office.cell',
|
|
# NO 'permissions' key — simulates pre-migration data
|
|
}
|
|
]
|
|
with open(links_file, 'w') as f:
|
|
json.dump(legacy_links, f)
|
|
|
|
mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
links = mgr.list_connections()
|
|
|
|
self.assertEqual(len(links), 1)
|
|
link = links[0]
|
|
self.assertIn('permissions', link)
|
|
perms = link['permissions']
|
|
self.assertIn('inbound', perms)
|
|
self.assertIn('outbound', perms)
|
|
for service in ('calendar', 'files', 'mail', 'webdav'):
|
|
self.assertFalse(perms['inbound'][service])
|
|
self.assertFalse(perms['outbound'][service])
|
|
|
|
def test_load_migration_persists_to_disk(self):
|
|
"""After migration, re-loading the same file returns the injected permissions."""
|
|
links_file = os.path.join(self.test_dir, 'cell_links.json')
|
|
with open(links_file, 'w') as f:
|
|
json.dump([{
|
|
'cell_name': 'old-cell',
|
|
'public_key': 'somepubkey=',
|
|
'vpn_subnet': '10.2.0.0/24',
|
|
'dns_ip': '10.2.0.1',
|
|
'domain': 'old-cell.cell',
|
|
}], f)
|
|
|
|
mgr1 = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
mgr1.list_connections() # triggers migration + save
|
|
|
|
# Read the file directly and confirm permissions are now on disk
|
|
with open(links_file) as f:
|
|
raw = json.load(f)
|
|
self.assertIn('permissions', raw[0])
|
|
|
|
|
|
class TestPermissionSync(unittest.TestCase):
|
|
"""Tests for Phase 1: permission sync between connected PIC cells."""
|
|
|
|
INVITE = {
|
|
'cell_name': 'office',
|
|
'public_key': 'officepubkey=',
|
|
'endpoint': '5.6.7.8:51820',
|
|
'vpn_subnet': '10.1.0.0/24',
|
|
'dns_ip': '10.1.0.1',
|
|
'domain': 'office.cell',
|
|
'version': 1,
|
|
}
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.wg = _make_wg_mock()
|
|
self.nm = _make_nm_mock()
|
|
self.mgr = CellLinkManager(self.test_dir, self.test_dir, self.wg, self.nm)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def _add_office(self, push_ok=True):
|
|
with patch('cell_link_manager.CellLinkManager._local_identity',
|
|
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
|
|
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
|
|
return_value={'ok': push_ok, 'error': None if push_ok else 'conn refused'}), \
|
|
patch('firewall_manager.apply_cell_rules'):
|
|
return self.mgr.add_connection(self.INVITE)
|
|
|
|
# ── add_connection ────────────────────────────────────────────────────────
|
|
|
|
def test_add_connection_includes_sync_fields(self):
|
|
link = self._add_office()
|
|
self.assertIn('remote_api_url', link)
|
|
self.assertIn('pending_push', link)
|
|
self.assertIn('last_push_status', link)
|
|
self.assertIn('last_push_at', link)
|
|
self.assertIn('last_remote_update_at', link)
|
|
|
|
def test_add_connection_sets_remote_api_url_from_dns_ip(self):
|
|
link = self._add_office()
|
|
self.assertEqual(link['remote_api_url'], 'http://10.1.0.1:3000')
|
|
|
|
def test_add_connection_triggers_push(self):
|
|
push_mock = MagicMock(return_value={'ok': True, 'error': None})
|
|
with patch('cell_link_manager.CellLinkManager._local_identity',
|
|
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
|
|
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \
|
|
patch('firewall_manager.apply_cell_rules'):
|
|
self.mgr.add_connection(self.INVITE)
|
|
push_mock.assert_called_once()
|
|
call_args = push_mock.call_args[0]
|
|
self.assertEqual(call_args[1], 'home') # from_cell
|
|
self.assertEqual(call_args[2], 'homepubkey=') # from_public_key
|
|
|
|
def test_add_connection_push_failure_does_not_abort_add(self):
|
|
link = self._add_office(push_ok=False)
|
|
conns = self.mgr.list_connections()
|
|
self.assertEqual(len(conns), 1)
|
|
self.assertEqual(conns[0]['cell_name'], 'office')
|
|
self.assertTrue(conns[0]['pending_push'])
|
|
|
|
def test_add_connection_push_success_clears_pending(self):
|
|
self._add_office(push_ok=True)
|
|
link = self.mgr.list_connections()[0]
|
|
self.assertFalse(link['pending_push'])
|
|
self.assertEqual(link['last_push_status'], 'ok')
|
|
|
|
# ── update_permissions ────────────────────────────────────────────────────
|
|
|
|
def test_update_permissions_push_succeeds_clears_pending(self):
|
|
self._add_office()
|
|
push_mock = MagicMock(return_value={'ok': True, 'error': None})
|
|
with patch('cell_link_manager.CellLinkManager._local_identity',
|
|
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
|
|
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock), \
|
|
patch('firewall_manager.apply_cell_rules'):
|
|
self.mgr.update_permissions('office',
|
|
{'calendar': True}, {'files': False})
|
|
link = self.mgr.list_connections()[0]
|
|
self.assertFalse(link['pending_push'])
|
|
self.assertEqual(link['last_push_status'], 'ok')
|
|
self.assertIsNotNone(link['last_push_at'])
|
|
|
|
def test_update_permissions_push_failure_keeps_local_save(self):
|
|
self._add_office()
|
|
with patch('cell_link_manager.CellLinkManager._local_identity',
|
|
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
|
|
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
|
|
return_value={'ok': False, 'error': 'timeout'}), \
|
|
patch('firewall_manager.apply_cell_rules'):
|
|
result = self.mgr.update_permissions('office',
|
|
{'calendar': True}, {})
|
|
# Local save must have happened — calendar is True
|
|
self.assertTrue(result['permissions']['inbound']['calendar'])
|
|
link = self.mgr.list_connections()[0]
|
|
self.assertTrue(link['pending_push'])
|
|
self.assertEqual(link['last_push_status'], 'failed')
|
|
|
|
def test_update_permissions_does_not_raise_on_push_exception(self):
|
|
self._add_office()
|
|
with patch('cell_link_manager.CellLinkManager._local_identity',
|
|
side_effect=RuntimeError('no app context')), \
|
|
patch('firewall_manager.apply_cell_rules'):
|
|
# Must not raise
|
|
result = self.mgr.update_permissions('office', {}, {})
|
|
self.assertIn('permissions', result)
|
|
|
|
# ── _push_permissions_to_remote (unit) ────────────────────────────────────
|
|
|
|
def test_push_mirrors_inbound_outbound(self):
|
|
"""Our inbound (what we share) must become their outbound in the body."""
|
|
self._add_office()
|
|
link = self.mgr.list_connections()[0]
|
|
link['permissions'] = {
|
|
'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
|
|
'outbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False},
|
|
}
|
|
|
|
sent_body = {}
|
|
|
|
def fake_urlopen(req, timeout=None):
|
|
import json as _j
|
|
sent_body.update(_j.loads(req.data))
|
|
resp = MagicMock()
|
|
resp.__enter__ = lambda s: s
|
|
resp.__exit__ = MagicMock(return_value=False)
|
|
resp.status = 200
|
|
return resp
|
|
|
|
with patch('urllib.request.urlopen', fake_urlopen):
|
|
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
|
|
|
|
self.assertTrue(result['ok'])
|
|
pushed_perms = sent_body['permissions']
|
|
# Our inbound=calendar:True → their outbound=calendar:True
|
|
self.assertTrue(pushed_perms['outbound']['calendar'])
|
|
# Our outbound=files:True → their inbound=files:True
|
|
self.assertTrue(pushed_perms['inbound']['files'])
|
|
|
|
def test_push_http_error_returns_not_ok(self):
|
|
self._add_office()
|
|
link = self.mgr.list_connections()[0]
|
|
with patch('urllib.request.urlopen',
|
|
side_effect=__import__('urllib.error', fromlist=['HTTPError']).HTTPError(
|
|
url='', code=503, msg='Service Unavailable', hdrs=None, fp=None)):
|
|
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('503', result['error'])
|
|
|
|
def test_push_no_remote_api_url_returns_not_ok(self):
|
|
self._add_office()
|
|
link = self.mgr.list_connections()[0]
|
|
link['remote_api_url'] = None
|
|
result = self.mgr._push_permissions_to_remote(link, 'home', 'homepubkey=')
|
|
self.assertFalse(result['ok'])
|
|
|
|
# ── apply_remote_permissions ──────────────────────────────────────────────
|
|
|
|
def test_apply_remote_permissions_stores_by_pubkey(self):
|
|
self._add_office()
|
|
with patch('firewall_manager.apply_cell_rules'):
|
|
updated = self.mgr.apply_remote_permissions(
|
|
'officepubkey=',
|
|
{'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
|
|
'outbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False}},
|
|
)
|
|
self.assertTrue(updated['permissions']['inbound']['calendar'])
|
|
self.assertTrue(updated['permissions']['outbound']['files'])
|
|
# Persisted to disk
|
|
link = self.mgr.list_connections()[0]
|
|
self.assertTrue(link['permissions']['inbound']['calendar'])
|
|
self.assertIsNotNone(link['last_remote_update_at'])
|
|
|
|
def test_apply_remote_permissions_unknown_pubkey_raises(self):
|
|
self._add_office()
|
|
with self.assertRaises(ValueError):
|
|
self.mgr.apply_remote_permissions('nosuchkey=', {})
|
|
|
|
def test_apply_remote_permissions_calls_apply_cell_rules(self):
|
|
self._add_office()
|
|
with patch('firewall_manager.apply_cell_rules') as mock_rules:
|
|
self.mgr.apply_remote_permissions(
|
|
'officepubkey=',
|
|
{'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
|
|
'outbound': {}},
|
|
)
|
|
mock_rules.assert_called_once_with('office', '10.1.0.0/24', ['calendar'])
|
|
|
|
# ── replay_pending_pushes ─────────────────────────────────────────────────
|
|
|
|
def test_replay_retries_pending_links(self):
|
|
self._add_office(push_ok=False) # leaves pending_push=True
|
|
self.assertTrue(self.mgr.list_connections()[0]['pending_push'])
|
|
|
|
push_mock = MagicMock(return_value={'ok': True, 'error': None})
|
|
with patch('cell_link_manager.CellLinkManager._local_identity',
|
|
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
|
|
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock):
|
|
summary = self.mgr.replay_pending_pushes()
|
|
|
|
push_mock.assert_called_once()
|
|
self.assertEqual(summary['attempted'], 1)
|
|
self.assertEqual(summary['ok'], 1)
|
|
self.assertFalse(self.mgr.list_connections()[0]['pending_push'])
|
|
|
|
def test_replay_skips_non_pending_links(self):
|
|
self._add_office(push_ok=True) # pending_push=False after success
|
|
push_mock = MagicMock(return_value={'ok': True, 'error': None})
|
|
with patch('cell_link_manager.CellLinkManager._local_identity',
|
|
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
|
|
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote', push_mock):
|
|
summary = self.mgr.replay_pending_pushes()
|
|
push_mock.assert_not_called()
|
|
self.assertEqual(summary['attempted'], 0)
|
|
|
|
def test_replay_push_failure_leaves_pending(self):
|
|
self._add_office(push_ok=False)
|
|
with patch('cell_link_manager.CellLinkManager._local_identity',
|
|
return_value={'cell_name': 'home', 'public_key': 'homepubkey='}), \
|
|
patch('cell_link_manager.CellLinkManager._push_permissions_to_remote',
|
|
return_value={'ok': False, 'error': 'timeout'}):
|
|
summary = self.mgr.replay_pending_pushes()
|
|
self.assertEqual(summary['failed'], 1)
|
|
self.assertTrue(self.mgr.list_connections()[0]['pending_push'])
|
|
|
|
def test_replay_identity_failure_returns_empty_summary(self):
|
|
self._add_office(push_ok=False)
|
|
with patch('cell_link_manager.CellLinkManager._local_identity',
|
|
side_effect=RuntimeError('no app context')):
|
|
summary = self.mgr.replay_pending_pushes()
|
|
self.assertEqual(summary['attempted'], 0)
|
|
|
|
# ── _load migration ───────────────────────────────────────────────────────
|
|
|
|
def test_load_migration_injects_sync_fields_on_legacy_record(self):
|
|
legacy = [{
|
|
'cell_name': 'office',
|
|
'public_key': 'officepubkey=',
|
|
'vpn_subnet': '10.1.0.0/24',
|
|
'dns_ip': '10.1.0.1',
|
|
'domain': 'office.cell',
|
|
'permissions': {'inbound': {}, 'outbound': {}},
|
|
}]
|
|
links_file = os.path.join(self.test_dir, 'cell_links.json')
|
|
with open(links_file, 'w') as f:
|
|
json.dump(legacy, f)
|
|
|
|
links = self.mgr.list_connections()
|
|
link = links[0]
|
|
self.assertIn('remote_api_url', link)
|
|
self.assertIn('pending_push', link)
|
|
self.assertIn('last_push_status', link)
|
|
self.assertIn('last_push_at', link)
|
|
self.assertIn('last_remote_update_at', link)
|
|
self.assertEqual(link['remote_api_url'], 'http://10.1.0.1:3000')
|
|
self.assertTrue(link['pending_push']) # pre-existing → marked pending
|
|
self.assertEqual(link['last_push_status'], 'never')
|
|
|
|
# Fields persisted to disk after migration
|
|
with open(links_file) as f:
|
|
raw = json.load(f)
|
|
self.assertIn('pending_push', raw[0])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|