import unittest import tempfile import shutil import os import json import sys from pathlib import Path from unittest.mock import MagicMock # Add api directory to path api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) from peer_registry import PeerRegistry class TestPeerRegistry(unittest.TestCase): def setUp(self): # Use a temp directory for the peers file self.test_dir = tempfile.mkdtemp() self.registry = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir) def tearDown(self): shutil.rmtree(self.test_dir) def test_initialization_and_empty(self): self.assertEqual(self.registry.list_peers(), []) def test_add_and_get_peer(self): peer = {'peer': 'peer1', 'ip': '10.0.0.2'} result = self.registry.add_peer(peer) self.assertTrue(result) self.assertEqual(self.registry.get_peer('peer1'), peer) # Adding duplicate should fail result = self.registry.add_peer(peer) self.assertFalse(result) # Defensive: check peer_obj is not None peer_obj = self.registry.get_peer('peer1') self.assertIsNotNone(peer_obj) self.assertEqual(peer_obj['ip'], '10.0.0.2') def test_remove_peer(self): peer = {'peer': 'peer1', 'ip': '10.0.0.2'} self.registry.add_peer(peer) result = self.registry.remove_peer('peer1') self.assertTrue(result) self.assertIsNone(self.registry.get_peer('peer1')) # Removing non-existent peer should return False result = self.registry.remove_peer('peer1') self.assertFalse(result) def test_update_peer_ip(self): peer = {'peer': 'peer1', 'ip': '10.0.0.2'} self.registry.add_peer(peer) result = self.registry.update_peer_ip('peer1', '10.0.0.3') self.assertTrue(result) peer_obj = self.registry.get_peer('peer1') self.assertIsNotNone(peer_obj) self.assertEqual(peer_obj['ip'], '10.0.0.3') # Updating non-existent peer should return False result = self.registry.update_peer_ip('peer2', '10.0.0.4') self.assertFalse(result) def test_persistence(self): peer = {'peer': 'peer1', 'ip': '10.0.0.2'} self.registry.add_peer(peer) # Create a new registry instance to test loading from file new_registry = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir) peer_obj = new_registry.get_peer('peer1') self.assertIsNotNone(peer_obj) self.assertEqual(peer_obj['ip'], '10.0.0.2') def test_corrupt_file_handling(self): # Write corrupt JSON to the peers file peers_file = os.path.join(self.test_dir, 'peers.json') with open(peers_file, 'w') as f: f.write('{bad json') # Should not raise, should load as empty registry = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir) self.assertEqual(registry.list_peers(), []) def test_route_via_migration_adds_field(self): """Existing peers without route_via get it as None on load.""" peers_file = os.path.join(self.test_dir, 'peers.json') raw = [{'peer': 'alice', 'ip': '10.0.0.5', 'public_key': 'key=', 'active': True, 'created_at': '2026-01-01T00:00:00'}] with open(peers_file, 'w') as f: json.dump(raw, f) reg = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir) peer = reg.get_peer('alice') self.assertIn('route_via', peer) self.assertIsNone(peer['route_via']) def test_set_route_via_persists(self): self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) updated = self.registry.set_route_via('alice', 'exit-cell') self.assertEqual(updated['route_via'], 'exit-cell') # Verify it survives a reload reloaded = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir) self.assertEqual(reloaded.get_peer('alice')['route_via'], 'exit-cell') def test_set_route_via_clear(self): self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) self.registry.set_route_via('alice', 'exit-cell') updated = self.registry.set_route_via('alice', None) self.assertIsNone(updated['route_via']) def test_set_route_via_unknown_peer_raises(self): with self.assertRaises(ValueError): self.registry.set_route_via('nobody', 'exit-cell') def _connectivity_cm(self, connections): """A mock config_manager exposing v2 connection records.""" cm = MagicMock() cm.list_connections.return_value = connections return cm def test_set_peer_exit_via_valid_connection_id(self): conns = [{'id': 'conn_wg', 'type': 'wireguard_ext'}] self.registry.config_manager = self._connectivity_cm(conns) self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) result = self.registry.set_peer_exit_via('alice', 'conn_wg') self.assertTrue(result) self.assertEqual(self.registry.get_peer('alice')['exit_via'], 'conn_wg') def test_set_peer_exit_via_default_always_valid(self): self.registry.config_manager = self._connectivity_cm([]) self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) result = self.registry.set_peer_exit_via('alice', 'default') self.assertTrue(result) self.assertEqual(self.registry.get_peer('alice')['exit_via'], 'default') def test_set_peer_exit_via_legacy_type_resolves_to_instance(self): """Back-compat shim: a legacy type resolves to the one instance of it.""" conns = [{'id': 'conn_tor', 'type': 'tor'}] self.registry.config_manager = self._connectivity_cm(conns) self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) result = self.registry.set_peer_exit_via('alice', 'tor') self.assertTrue(result) self.assertEqual(self.registry.get_peer('alice')['exit_via'], 'conn_tor') def test_set_peer_exit_via_unknown_id_returns_false(self): self.registry.config_manager = self._connectivity_cm([]) self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) result = self.registry.set_peer_exit_via('alice', 'conn_ghost') self.assertFalse(result) def test_set_peer_exit_via_nonexistent_peer_returns_false(self): self.registry.config_manager = self._connectivity_cm([]) result = self.registry.set_peer_exit_via('nobody', 'default') self.assertFalse(result) def test_set_peer_exit_via_persists(self): conns = [{'id': 'conn_tor', 'type': 'tor'}] cm = self._connectivity_cm(conns) self.registry.config_manager = cm self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) self.registry.set_peer_exit_via('alice', 'conn_tor') reloaded = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir, config_manager=cm) self.assertEqual(reloaded.get_peer('alice')['exit_via'], 'conn_tor') def test_exit_via_migration_legacy_type_to_id(self): """On load, a legacy per-type exit_via becomes the migrated instance id.""" peers_file = os.path.join(self.test_dir, 'peers.json') with open(peers_file, 'w') as f: json.dump([{'peer': 'alice', 'ip': '10.0.0.5', 'exit_via': 'wireguard_ext'}], f) cm = self._connectivity_cm([{'id': 'conn_wg', 'type': 'wireguard_ext'}]) reg = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir, config_manager=cm) self.assertEqual(reg.get_peer('alice')['exit_via'], 'conn_wg') def test_exit_via_migration_unknown_type_to_default(self): """A legacy type with no migrated instance falls back to 'default'.""" peers_file = os.path.join(self.test_dir, 'peers.json') with open(peers_file, 'w') as f: json.dump([{'peer': 'alice', 'ip': '10.0.0.5', 'exit_via': 'openvpn'}], f) cm = self._connectivity_cm([]) # no instances reg = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir, config_manager=cm) self.assertEqual(reg.get_peer('alice')['exit_via'], 'default') def test_exit_via_migration_id_is_idempotent(self): """An already-migrated id is left untouched and not re-migrated.""" peers_file = os.path.join(self.test_dir, 'peers.json') with open(peers_file, 'w') as f: json.dump([{'peer': 'alice', 'ip': '10.0.0.5', 'exit_via': 'conn_wg'}], f) cm = self._connectivity_cm([{'id': 'conn_wg', 'type': 'wireguard_ext'}]) reg = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir, config_manager=cm) self.assertEqual(reg.get_peer('alice')['exit_via'], 'conn_wg') self.assertFalse(reg._migrate_exit_via_to_connection_id()) def test_update_peer_updates_arbitrary_fields(self): self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) result = self.registry.update_peer('alice', {'custom_field': 'hello', 'ip': '10.0.0.99'}) self.assertTrue(result) peer = self.registry.get_peer('alice') self.assertEqual(peer['custom_field'], 'hello') self.assertEqual(peer['ip'], '10.0.0.99') def test_update_peer_nonexistent_returns_false(self): result = self.registry.update_peer('nobody', {'ip': '10.0.0.99'}) self.assertFalse(result) def test_clear_reinstall_flag(self): self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5', 'config_needs_reinstall': True}) result = self.registry.clear_reinstall_flag('alice') self.assertTrue(result) peer = self.registry.get_peer('alice') self.assertFalse(peer['config_needs_reinstall']) def test_get_peer_stats_empty(self): stats = self.registry.get_peer_stats() self.assertEqual(stats['total_peers'], 0) self.assertEqual(stats['active_peers'], 0) self.assertEqual(stats['inactive_peers'], 0) self.assertEqual(stats['ip_ranges'], {}) def test_get_peer_stats_with_peers(self): self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.2', 'active': True}) self.registry.add_peer({'peer': 'bob', 'ip': '10.0.0.3', 'active': False}) stats = self.registry.get_peer_stats() self.assertEqual(stats['total_peers'], 2) self.assertEqual(stats['active_peers'], 1) self.assertEqual(stats['inactive_peers'], 1) self.assertIn('10.0.0.0/24', stats['ip_ranges']) self.assertEqual(stats['ip_ranges']['10.0.0.0/24'], 2) def test_get_status_returns_correct_counts(self): self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.2', 'active': True}) self.registry.add_peer({'peer': 'bob', 'ip': '10.0.0.3', 'active': False}) status = self.registry.get_status() self.assertEqual(status['peers_count'], 2) self.assertEqual(status['active_peers'], 1) self.assertEqual(status['inactive_peers'], 1) self.assertTrue(status['running']) def test_test_connectivity_returns_dict(self): result = self.registry.test_connectivity() self.assertIn('filesystem_access', result) self.assertIn('data_integrity', result) self.assertIn('peer_operations', result) self.assertIn('success', result) def test_test_connectivity_success(self): result = self.registry.test_connectivity() # All subtests should succeed since data dir exists self.assertTrue(result['filesystem_access']['success']) self.assertTrue(result['data_integrity']['success']) def test_list_peers_returns_copy(self): """Modifying the returned list shouldn't affect internal state.""" self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) peers = self.registry.list_peers() peers.clear() self.assertEqual(len(self.registry.list_peers()), 1) def test_exit_via_migration_adds_field(self): """Existing peers without exit_via get it as 'default' on load.""" import json as _json peers_file = os.path.join(self.test_dir, 'peers.json') raw = [{'peer': 'alice', 'ip': '10.0.0.5', 'public_key': 'key=', 'active': True, 'route_via': None, 'created_at': '2026-01-01T00:00:00'}] with open(peers_file, 'w') as f: _json.dump(raw, f) reg = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir) peer = reg.get_peer('alice') self.assertIn('exit_via', peer) self.assertEqual(peer['exit_via'], 'default') def test_save_peers_uses_restrictive_permissions(self): """peers.json should be created with mode 0o600.""" self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) import stat mode = os.stat(self.registry.peers_file).st_mode perms = stat.S_IMODE(mode) self.assertEqual(perms, 0o600) if __name__ == '__main__': unittest.main()