Files
pic/tests/test_peer_registry.py
T
roof aa1e5c41ec
Unit Tests / test (push) Successful in 12m6s
test: raise coverage 68.7% -> ~80.4%; add ~250 tests for new egress/DDNS/network paths
Coverage was below acceptable levels and several newly-added code paths
(sshuttle egress, proxy egress, DDNS provider stubs, DNS overview route,
peer-registry provisioning) had zero test coverage.

~250 new unit tests are added across 16 new test files. Existing test files
are updated to match refactored interfaces (DHCP removed, constants
introduced, network_manager restructured). .coveragerc is added to pin the
source mapping and the 70% floor so regressions are caught at commit time.

tests/test_enhanced_api.py was previously living in api/ (wrong location)
and is moved to tests/ where it belongs.

Integration test files are updated to remove references to DHCP endpoints
and add coverage for the new DNS overview and DDNS sync endpoints.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 09:03:39 -04:00

229 lines
10 KiB
Python

import unittest
import tempfile
import shutil
import os
import json
import sys
from pathlib import Path
# Add api directory to path
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from peer_registry import PeerRegistry
class TestPeerRegistry(unittest.TestCase):
def setUp(self):
# Use a temp directory for the peers file
self.test_dir = tempfile.mkdtemp()
self.registry = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_initialization_and_empty(self):
self.assertEqual(self.registry.list_peers(), [])
def test_add_and_get_peer(self):
peer = {'peer': 'peer1', 'ip': '10.0.0.2'}
result = self.registry.add_peer(peer)
self.assertTrue(result)
self.assertEqual(self.registry.get_peer('peer1'), peer)
# Adding duplicate should fail
result = self.registry.add_peer(peer)
self.assertFalse(result)
# Defensive: check peer_obj is not None
peer_obj = self.registry.get_peer('peer1')
self.assertIsNotNone(peer_obj)
self.assertEqual(peer_obj['ip'], '10.0.0.2')
def test_remove_peer(self):
peer = {'peer': 'peer1', 'ip': '10.0.0.2'}
self.registry.add_peer(peer)
result = self.registry.remove_peer('peer1')
self.assertTrue(result)
self.assertIsNone(self.registry.get_peer('peer1'))
# Removing non-existent peer should return False
result = self.registry.remove_peer('peer1')
self.assertFalse(result)
def test_update_peer_ip(self):
peer = {'peer': 'peer1', 'ip': '10.0.0.2'}
self.registry.add_peer(peer)
result = self.registry.update_peer_ip('peer1', '10.0.0.3')
self.assertTrue(result)
peer_obj = self.registry.get_peer('peer1')
self.assertIsNotNone(peer_obj)
self.assertEqual(peer_obj['ip'], '10.0.0.3')
# Updating non-existent peer should return False
result = self.registry.update_peer_ip('peer2', '10.0.0.4')
self.assertFalse(result)
def test_persistence(self):
peer = {'peer': 'peer1', 'ip': '10.0.0.2'}
self.registry.add_peer(peer)
# Create a new registry instance to test loading from file
new_registry = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
peer_obj = new_registry.get_peer('peer1')
self.assertIsNotNone(peer_obj)
self.assertEqual(peer_obj['ip'], '10.0.0.2')
def test_corrupt_file_handling(self):
# Write corrupt JSON to the peers file
peers_file = os.path.join(self.test_dir, 'peers.json')
with open(peers_file, 'w') as f:
f.write('{bad json')
# Should not raise, should load as empty
registry = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
self.assertEqual(registry.list_peers(), [])
def test_route_via_migration_adds_field(self):
"""Existing peers without route_via get it as None on load."""
peers_file = os.path.join(self.test_dir, 'peers.json')
raw = [{'peer': 'alice', 'ip': '10.0.0.5', 'public_key': 'key=',
'active': True, 'created_at': '2026-01-01T00:00:00'}]
with open(peers_file, 'w') as f:
json.dump(raw, f)
reg = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
peer = reg.get_peer('alice')
self.assertIn('route_via', peer)
self.assertIsNone(peer['route_via'])
def test_set_route_via_persists(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
updated = self.registry.set_route_via('alice', 'exit-cell')
self.assertEqual(updated['route_via'], 'exit-cell')
# Verify it survives a reload
reloaded = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
self.assertEqual(reloaded.get_peer('alice')['route_via'], 'exit-cell')
def test_set_route_via_clear(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
self.registry.set_route_via('alice', 'exit-cell')
updated = self.registry.set_route_via('alice', None)
self.assertIsNone(updated['route_via'])
def test_set_route_via_unknown_peer_raises(self):
with self.assertRaises(ValueError):
self.registry.set_route_via('nobody', 'exit-cell')
def test_set_peer_exit_via_valid(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
result = self.registry.set_peer_exit_via('alice', 'wireguard_ext')
self.assertTrue(result)
peer = self.registry.get_peer('alice')
self.assertEqual(peer['exit_via'], 'wireguard_ext')
def test_set_peer_exit_via_all_valid_types(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
for exit_type in ('default', 'wireguard_ext', 'openvpn', 'tor'):
result = self.registry.set_peer_exit_via('alice', exit_type)
self.assertTrue(result)
peer = self.registry.get_peer('alice')
self.assertEqual(peer['exit_via'], exit_type)
def test_set_peer_exit_via_invalid_type_returns_false(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
result = self.registry.set_peer_exit_via('alice', 'invalid_exit')
self.assertFalse(result)
def test_set_peer_exit_via_nonexistent_peer_returns_false(self):
result = self.registry.set_peer_exit_via('nobody', 'default')
self.assertFalse(result)
def test_set_peer_exit_via_persists(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
self.registry.set_peer_exit_via('alice', 'tor')
reloaded = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
self.assertEqual(reloaded.get_peer('alice')['exit_via'], 'tor')
def test_update_peer_updates_arbitrary_fields(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
result = self.registry.update_peer('alice', {'custom_field': 'hello', 'ip': '10.0.0.99'})
self.assertTrue(result)
peer = self.registry.get_peer('alice')
self.assertEqual(peer['custom_field'], 'hello')
self.assertEqual(peer['ip'], '10.0.0.99')
def test_update_peer_nonexistent_returns_false(self):
result = self.registry.update_peer('nobody', {'ip': '10.0.0.99'})
self.assertFalse(result)
def test_clear_reinstall_flag(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5',
'config_needs_reinstall': True})
result = self.registry.clear_reinstall_flag('alice')
self.assertTrue(result)
peer = self.registry.get_peer('alice')
self.assertFalse(peer['config_needs_reinstall'])
def test_get_peer_stats_empty(self):
stats = self.registry.get_peer_stats()
self.assertEqual(stats['total_peers'], 0)
self.assertEqual(stats['active_peers'], 0)
self.assertEqual(stats['inactive_peers'], 0)
self.assertEqual(stats['ip_ranges'], {})
def test_get_peer_stats_with_peers(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.2', 'active': True})
self.registry.add_peer({'peer': 'bob', 'ip': '10.0.0.3', 'active': False})
stats = self.registry.get_peer_stats()
self.assertEqual(stats['total_peers'], 2)
self.assertEqual(stats['active_peers'], 1)
self.assertEqual(stats['inactive_peers'], 1)
self.assertIn('10.0.0.0/24', stats['ip_ranges'])
self.assertEqual(stats['ip_ranges']['10.0.0.0/24'], 2)
def test_get_status_returns_correct_counts(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.2', 'active': True})
self.registry.add_peer({'peer': 'bob', 'ip': '10.0.0.3', 'active': False})
status = self.registry.get_status()
self.assertEqual(status['peers_count'], 2)
self.assertEqual(status['active_peers'], 1)
self.assertEqual(status['inactive_peers'], 1)
self.assertTrue(status['running'])
def test_test_connectivity_returns_dict(self):
result = self.registry.test_connectivity()
self.assertIn('filesystem_access', result)
self.assertIn('data_integrity', result)
self.assertIn('peer_operations', result)
self.assertIn('success', result)
def test_test_connectivity_success(self):
result = self.registry.test_connectivity()
# All subtests should succeed since data dir exists
self.assertTrue(result['filesystem_access']['success'])
self.assertTrue(result['data_integrity']['success'])
def test_list_peers_returns_copy(self):
"""Modifying the returned list shouldn't affect internal state."""
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
peers = self.registry.list_peers()
peers.clear()
self.assertEqual(len(self.registry.list_peers()), 1)
def test_exit_via_migration_adds_field(self):
"""Existing peers without exit_via get it as 'default' on load."""
import json as _json
peers_file = os.path.join(self.test_dir, 'peers.json')
raw = [{'peer': 'alice', 'ip': '10.0.0.5', 'public_key': 'key=',
'active': True, 'route_via': None, 'created_at': '2026-01-01T00:00:00'}]
with open(peers_file, 'w') as f:
_json.dump(raw, f)
reg = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
peer = reg.get_peer('alice')
self.assertIn('exit_via', peer)
self.assertEqual(peer['exit_via'], 'default')
def test_save_peers_uses_restrictive_permissions(self):
"""peers.json should be created with mode 0o600."""
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
import stat
mode = os.stat(self.registry.peers_file).st_mode
perms = stat.S_IMODE(mode)
self.assertEqual(perms, 0o600)
if __name__ == '__main__':
unittest.main()