Files
pic/api/peer_registry.py
T
roof 8ea834e108 feat: Phase 3 - per-peer internet routing via exit cell
Adds the ability to route a specific peer's internet traffic through a
connected cell acting as an exit relay.

Cell A side:
- PUT /api/peers/<peer>/route-via {"via_cell": "cellB"} sets route_via
- Updates WG AllowedIPs to include 0.0.0.0/0 for the exit cell peer
- Adds ip rule + ip route in policy table inside cell-wireguard so the
  specific peer's traffic egresses via cellB's WG IP
- Sets exit_relay_active on the cell link and pushes use_as_exit_relay=True
  to cellB via peer-sync

Cell B side:
- Receives use_as_exit_relay in the peer-sync payload
- Calls apply_cell_rules(..., exit_relay=True) to add FORWARD -o eth0 ACCEPT
- Stores remote_exit_relay_active flag for startup recovery

Startup recovery:
- apply_all_cell_rules passes exit_relay=remote_exit_relay_active (cellB)
- _apply_startup_enforcement reapplies ip rule for each peer with route_via (cellA)
  since policy routing rules don't survive container restart

peer_registry gets route_via field with lazy migration.
22 new tests across test_cell_link_manager, test_peer_registry, test_peer_route_via.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 16:23:31 -04:00

380 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Peer Registry for Personal Internet Cell
Handles peer registration and management
"""
import json
import os
import logging
from threading import RLock
from datetime import datetime
from typing import Dict, List, Any, Optional
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
class PeerRegistry(BaseServiceManager):
"""Manages peer registration and management"""
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('peer_registry', data_dir, config_dir)
self.lock = RLock()
self.peers = []
self.peers_file = os.path.join(data_dir, 'peers.json')
self._load_peers()
def get_status(self) -> Dict[str, Any]:
"""Get peer registry status"""
try:
with self.lock:
status = {
'running': True,
'status': 'online',
'peers_count': len(self.peers),
'active_peers': len([p for p in self.peers if p.get('active', True)]),
'inactive_peers': len([p for p in self.peers if not p.get('active', True)]),
'last_updated': datetime.utcnow().isoformat(),
'timestamp': datetime.utcnow().isoformat()
}
return status
except Exception as e:
return self.handle_error(e, "get_status")
def test_connectivity(self) -> Dict[str, Any]:
"""Test peer registry connectivity"""
try:
# Test file system access
fs_test = self._test_filesystem_access()
# Test peer data integrity
integrity_test = self._test_data_integrity()
# Test peer operations
operations_test = self._test_peer_operations()
results = {
'filesystem_access': fs_test,
'data_integrity': integrity_test,
'peer_operations': operations_test,
'success': fs_test.get('success', False) and integrity_test.get('success', False),
'timestamp': datetime.utcnow().isoformat()
}
return results
except Exception as e:
return self.handle_error(e, "test_connectivity")
def _test_filesystem_access(self) -> Dict[str, Any]:
"""Test filesystem access for peer data"""
try:
# Test if we can read/write to the peers file
test_peer = {
'peer': 'test_peer',
'ip': '192.168.1.100',
'public_key': 'test_key',
'active': False,
'test': True
}
# Test write
with self.lock:
original_peers = self.peers.copy()
self.peers.append(test_peer)
self._save_peers()
# Test read
with self.lock:
loaded_peers = self.peers.copy()
# Remove test peer
self.peers = [p for p in self.peers if not p.get('test', False)]
self._save_peers()
# Restore original state
with self.lock:
self.peers = original_peers
self._save_peers()
return {
'success': True,
'message': 'Filesystem access working',
'read_write': True
}
except Exception as e:
return {
'success': False,
'message': f'Filesystem access failed: {str(e)}',
'error': str(e)
}
def _test_data_integrity(self) -> Dict[str, Any]:
"""Test peer data integrity"""
try:
with self.lock:
# Check if peers data is valid JSON
peers_copy = self.peers.copy()
# Validate peer structure
valid_peers = 0
invalid_peers = 0
for peer in peers_copy:
if isinstance(peer, dict) and 'peer' in peer and 'ip' in peer:
valid_peers += 1
else:
invalid_peers += 1
return {
'success': True,
'message': 'Data integrity check passed',
'valid_peers': valid_peers,
'invalid_peers': invalid_peers,
'total_peers': len(peers_copy)
}
except Exception as e:
return {
'success': False,
'message': f'Data integrity check failed: {str(e)}',
'error': str(e)
}
def _test_peer_operations(self) -> Dict[str, Any]:
"""Test peer operations"""
try:
# Test adding a peer
test_peer = {
'peer': 'test_operation_peer',
'ip': '192.168.1.101',
'public_key': 'test_operation_key',
'active': False,
'test': True
}
# Test add
add_success = self.add_peer(test_peer)
# Test get
retrieved_peer = self.get_peer('test_operation_peer')
get_success = retrieved_peer is not None
# Test update
update_success = self.update_peer_ip('test_operation_peer', '192.168.1.102')
# Test remove
remove_success = self.remove_peer('test_operation_peer')
return {
'success': add_success and get_success and update_success and remove_success,
'message': 'Peer operations working',
'add_success': add_success,
'get_success': get_success,
'update_success': update_success,
'remove_success': remove_success
}
except Exception as e:
return {
'success': False,
'message': f'Peer operations test failed: {str(e)}',
'error': str(e)
}
def _load_peers(self):
"""Load peers from file"""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(self.peers_file), exist_ok=True)
if os.path.exists(self.peers_file):
with open(self.peers_file, 'r') as f:
try:
self.peers = json.load(f)
self.logger.info(f"Loaded {len(self.peers)} peers from file")
except Exception as e:
self.logger.error(f"Error loading peers: {e}")
self.peers = []
# Phase 3 migration: per-peer internet routing
changed = False
for peer in self.peers:
if 'route_via' not in peer:
peer['route_via'] = None
changed = True
if changed:
self._save_peers()
else:
self.peers = []
self.logger.info("No peers file found, starting with empty registry")
except Exception as e:
self.logger.error(f"Error in _load_peers: {e}")
self.peers = []
def _save_peers(self):
"""Save peers to file"""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(self.peers_file), exist_ok=True)
# Write to a temp file with restrictive perms, then atomically replace.
# peers.json contains WireGuard private keys — must never be world-readable.
tmp_path = self.peers_file + '.tmp'
# Open with O_CREAT|O_WRONLY|O_TRUNC and mode 0o600 so the file is
# created with restrictive permissions from the very first byte.
fd = os.open(tmp_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
try:
with os.fdopen(fd, 'w') as f:
json.dump(self.peers, f, indent=2)
except Exception:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
# Ensure perms are 0o600 even if umask or prior file affected them.
os.chmod(tmp_path, 0o600)
os.replace(tmp_path, self.peers_file)
# Belt-and-braces: also chmod the destination in case it pre-existed
# with looser perms on a filesystem that preserves the destination's mode.
os.chmod(self.peers_file, 0o600)
self.logger.info(f"Saved {len(self.peers)} peers to file")
except Exception as e:
self.logger.error(f"Error saving peers: {e}")
def list_peers(self) -> List[Dict[str, Any]]:
"""List all peers"""
with self.lock:
return list(self.peers)
def get_peer(self, name: str) -> Optional[Dict[str, Any]]:
"""Get a specific peer by name"""
with self.lock:
for peer in self.peers:
if peer.get('peer') == name:
return peer
return None
def add_peer(self, peer_info: Dict[str, Any]) -> bool:
"""Add a new peer"""
try:
with self.lock:
if self.get_peer(peer_info.get('peer')):
self.logger.warning(f"Peer {peer_info.get('peer')} already exists")
return False
# Add timestamp
peer_info['created_at'] = datetime.utcnow().isoformat()
peer_info['active'] = peer_info.get('active', True)
self.peers.append(peer_info)
self._save_peers()
self.logger.info(f"Added peer: {peer_info.get('peer')}")
return True
except Exception as e:
self.logger.error(f"Error adding peer: {e}")
return False
def remove_peer(self, name: str) -> bool:
"""Remove a peer"""
try:
with self.lock:
before = len(self.peers)
self.peers = [p for p in self.peers if p.get('peer') != name]
self._save_peers()
removed = len(self.peers) < before
if removed:
self.logger.info(f"Removed peer: {name}")
else:
self.logger.warning(f"Peer {name} not found for removal")
return removed
except Exception as e:
self.logger.error(f"Error removing peer {name}: {e}")
return False
def update_peer(self, name: str, fields: Dict[str, Any]) -> bool:
"""Update arbitrary fields on a peer."""
try:
with self.lock:
for peer in self.peers:
if peer.get('peer') == name:
peer.update(fields)
peer['updated_at'] = datetime.utcnow().isoformat()
self._save_peers()
self.logger.info(f"Updated peer {name}: {list(fields.keys())}")
return True
self.logger.warning(f"Peer {name} not found for update")
return False
except Exception as e:
self.logger.error(f"Error updating peer {name}: {e}")
return False
def clear_reinstall_flag(self, name: str) -> bool:
"""Clear the config_needs_reinstall flag after user downloads new config."""
return self.update_peer(name, {'config_needs_reinstall': False})
def update_peer_ip(self, name: str, new_ip: str) -> bool:
"""Update peer IP address"""
try:
with self.lock:
for peer in self.peers:
if peer.get('peer') == name:
old_ip = peer.get('ip')
peer['ip'] = new_ip
peer['updated_at'] = datetime.utcnow().isoformat()
self._save_peers()
self.logger.info(f"Updated peer {name} IP from {old_ip} to {new_ip}")
return True
self.logger.warning(f"Peer {name} not found for IP update")
return False
except Exception as e:
self.logger.error(f"Error updating peer {name} IP: {e}")
return False
def set_route_via(self, peer_name: str, via_cell: Optional[str]) -> Dict[str, Any]:
"""Set or clear the route_via field on a peer. Returns the updated peer dict."""
with self.lock:
for peer in self.peers:
if peer.get('peer') == peer_name:
peer['route_via'] = via_cell
peer['updated_at'] = datetime.utcnow().isoformat()
self._save_peers()
self.logger.info(f"Set route_via for {peer_name}: {via_cell!r}")
return dict(peer)
raise ValueError(f"Peer '{peer_name}' not found")
def get_peer_stats(self) -> Dict[str, Any]:
"""Get peer registry statistics"""
try:
with self.lock:
active_peers = [p for p in self.peers if p.get('active', True)]
inactive_peers = [p for p in self.peers if not p.get('active', True)]
# Count peers by IP range
ip_ranges = {}
for peer in self.peers:
ip = peer.get('ip', '')
if ip:
range_key = '.'.join(ip.split('.')[:3]) + '.0/24'
ip_ranges[range_key] = ip_ranges.get(range_key, 0) + 1
return {
'total_peers': len(self.peers),
'active_peers': len(active_peers),
'inactive_peers': len(inactive_peers),
'ip_ranges': ip_ranges,
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
self.logger.error(f"Error getting peer stats: {e}")
return {
'total_peers': 0,
'active_peers': 0,
'inactive_peers': 0,
'ip_ranges': {},
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
}