#!/usr/bin/env python3 """ CellLinkManager — manages site-to-site connections between PIC cells. Each connection is stored in data/cell_links.json and manifests as: - A WireGuard [Peer] block (AllowedIPs = remote cell's VPN subnet) - A CoreDNS forwarding block (remote domain → remote cell's DNS IP) """ import os import json import logging from datetime import datetime from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) class CellLinkManager: def __init__(self, data_dir: str, config_dir: str, wireguard_manager, network_manager): self.data_dir = data_dir self.config_dir = config_dir self.wireguard_manager = wireguard_manager self.network_manager = network_manager self.links_file = os.path.join(data_dir, 'cell_links.json') # ── Storage ─────────────────────────────────────────────────────────────── def _load(self) -> List[Dict[str, Any]]: if os.path.exists(self.links_file): try: with open(self.links_file) as f: return json.load(f) except Exception: return [] return [] def _save(self, links: List[Dict[str, Any]]): with open(self.links_file, 'w') as f: json.dump(links, f, indent=2) # ── Public API ──────────────────────────────────────────────────────────── def generate_invite(self, cell_name: str, domain: str) -> Dict[str, Any]: """Return an invite package describing this cell for another cell to import.""" keys = self.wireguard_manager.get_keys() srv = self.wireguard_manager.get_server_config() server_vpn_ip = self.wireguard_manager._get_configured_address().split('/')[0] return { 'cell_name': cell_name, 'public_key': keys['public_key'], 'endpoint': srv.get('endpoint'), 'vpn_subnet': self.wireguard_manager._get_configured_network(), 'dns_ip': server_vpn_ip, 'domain': domain, 'version': 1, } def list_connections(self) -> List[Dict[str, Any]]: return self._load() def add_connection(self, invite: Dict[str, Any]) -> Dict[str, Any]: """Import a remote cell's invite and establish the connection.""" links = self._load() name = invite['cell_name'] if any(l['cell_name'] == name for l in links): raise ValueError(f"Cell '{name}' is already connected") ok = self.wireguard_manager.add_cell_peer( name=name, public_key=invite['public_key'], endpoint=invite.get('endpoint', ''), vpn_subnet=invite['vpn_subnet'], ) if not ok: raise RuntimeError(f"Failed to add WireGuard peer for cell '{name}'") dns_result = self.network_manager.add_cell_dns_forward( domain=invite['domain'], dns_ip=invite['dns_ip'], ) if dns_result.get('warnings'): logger.warning('DNS forward warnings for %s: %s', name, dns_result['warnings']) link = { 'cell_name': name, 'public_key': invite['public_key'], 'endpoint': invite.get('endpoint'), 'vpn_subnet': invite['vpn_subnet'], 'dns_ip': invite['dns_ip'], 'domain': invite['domain'], 'connected_at': datetime.utcnow().isoformat(), } links.append(link) self._save(links) return link def remove_connection(self, cell_name: str): """Tear down a cell connection by name.""" links = self._load() link = next((l for l in links if l['cell_name'] == cell_name), None) if not link: raise ValueError(f"Cell '{cell_name}' not found") self.wireguard_manager.remove_peer(link['public_key']) self.network_manager.remove_cell_dns_forward(link['domain']) links = [l for l in links if l['cell_name'] != cell_name] self._save(links) def get_connection_status(self, cell_name: str) -> Dict[str, Any]: """Return link record enriched with live WireGuard handshake status.""" links = self._load() link = next((l for l in links if l['cell_name'] == cell_name), None) if not link: raise ValueError(f"Cell '{cell_name}' not found") try: st = self.wireguard_manager.get_peer_status(link['public_key']) return {**link, 'online': st.get('online', False), 'last_handshake': st.get('last_handshake')} except Exception: return {**link, 'online': False, 'last_handshake': None}