Files
pic/tests/test_peer_registry.py
T
roof 8ea834e108 feat: Phase 3 - per-peer internet routing via exit cell
Adds the ability to route a specific peer's internet traffic through a
connected cell acting as an exit relay.

Cell A side:
- PUT /api/peers/<peer>/route-via {"via_cell": "cellB"} sets route_via
- Updates WG AllowedIPs to include 0.0.0.0/0 for the exit cell peer
- Adds ip rule + ip route in policy table inside cell-wireguard so the
  specific peer's traffic egresses via cellB's WG IP
- Sets exit_relay_active on the cell link and pushes use_as_exit_relay=True
  to cellB via peer-sync

Cell B side:
- Receives use_as_exit_relay in the peer-sync payload
- Calls apply_cell_rules(..., exit_relay=True) to add FORWARD -o eth0 ACCEPT
- Stores remote_exit_relay_active flag for startup recovery

Startup recovery:
- apply_all_cell_rules passes exit_relay=remote_exit_relay_active (cellB)
- _apply_startup_enforcement reapplies ip rule for each peer with route_via (cellA)
  since policy routing rules don't survive container restart

peer_registry gets route_via field with lazy migration.
22 new tests across test_cell_link_manager, test_peer_registry, test_peer_route_via.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 16:23:31 -04:00

111 lines
4.5 KiB
Python

import unittest
import tempfile
import shutil
import os
import json
import sys
from pathlib import Path
# Add api directory to path
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from peer_registry import PeerRegistry
class TestPeerRegistry(unittest.TestCase):
def setUp(self):
# Use a temp directory for the peers file
self.test_dir = tempfile.mkdtemp()
self.registry = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_initialization_and_empty(self):
self.assertEqual(self.registry.list_peers(), [])
def test_add_and_get_peer(self):
peer = {'peer': 'peer1', 'ip': '10.0.0.2'}
result = self.registry.add_peer(peer)
self.assertTrue(result)
self.assertEqual(self.registry.get_peer('peer1'), peer)
# Adding duplicate should fail
result = self.registry.add_peer(peer)
self.assertFalse(result)
# Defensive: check peer_obj is not None
peer_obj = self.registry.get_peer('peer1')
self.assertIsNotNone(peer_obj)
self.assertEqual(peer_obj['ip'], '10.0.0.2')
def test_remove_peer(self):
peer = {'peer': 'peer1', 'ip': '10.0.0.2'}
self.registry.add_peer(peer)
result = self.registry.remove_peer('peer1')
self.assertTrue(result)
self.assertIsNone(self.registry.get_peer('peer1'))
# Removing non-existent peer should return False
result = self.registry.remove_peer('peer1')
self.assertFalse(result)
def test_update_peer_ip(self):
peer = {'peer': 'peer1', 'ip': '10.0.0.2'}
self.registry.add_peer(peer)
result = self.registry.update_peer_ip('peer1', '10.0.0.3')
self.assertTrue(result)
peer_obj = self.registry.get_peer('peer1')
self.assertIsNotNone(peer_obj)
self.assertEqual(peer_obj['ip'], '10.0.0.3')
# Updating non-existent peer should return False
result = self.registry.update_peer_ip('peer2', '10.0.0.4')
self.assertFalse(result)
def test_persistence(self):
peer = {'peer': 'peer1', 'ip': '10.0.0.2'}
self.registry.add_peer(peer)
# Create a new registry instance to test loading from file
new_registry = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
peer_obj = new_registry.get_peer('peer1')
self.assertIsNotNone(peer_obj)
self.assertEqual(peer_obj['ip'], '10.0.0.2')
def test_corrupt_file_handling(self):
# Write corrupt JSON to the peers file
peers_file = os.path.join(self.test_dir, 'peers.json')
with open(peers_file, 'w') as f:
f.write('{bad json')
# Should not raise, should load as empty
registry = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
self.assertEqual(registry.list_peers(), [])
def test_route_via_migration_adds_field(self):
"""Existing peers without route_via get it as None on load."""
peers_file = os.path.join(self.test_dir, 'peers.json')
raw = [{'peer': 'alice', 'ip': '10.0.0.5', 'public_key': 'key=',
'active': True, 'created_at': '2026-01-01T00:00:00'}]
with open(peers_file, 'w') as f:
json.dump(raw, f)
reg = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
peer = reg.get_peer('alice')
self.assertIn('route_via', peer)
self.assertIsNone(peer['route_via'])
def test_set_route_via_persists(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
updated = self.registry.set_route_via('alice', 'exit-cell')
self.assertEqual(updated['route_via'], 'exit-cell')
# Verify it survives a reload
reloaded = PeerRegistry(data_dir=self.test_dir, config_dir=self.test_dir)
self.assertEqual(reloaded.get_peer('alice')['route_via'], 'exit-cell')
def test_set_route_via_clear(self):
self.registry.add_peer({'peer': 'alice', 'ip': '10.0.0.5'})
self.registry.set_route_via('alice', 'exit-cell')
updated = self.registry.set_route_via('alice', None)
self.assertIsNone(updated['route_via'])
def test_set_route_via_unknown_peer_raises(self):
with self.assertRaises(ValueError):
self.registry.set_route_via('nobody', 'exit-cell')
if __name__ == '__main__':
unittest.main()