#!/usr/bin/env python3 """ CellLinkManager — manages site-to-site connections between PIC cells. Each connection is stored in data/cell_links.json and manifests as: - A WireGuard [Peer] block (AllowedIPs = remote cell's VPN subnet) - A CoreDNS forwarding block (remote domain → remote cell's DNS IP) - An iptables FORWARD rule set (service-level access control) """ import json import logging import os import subprocess from datetime import datetime from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) VALID_SERVICES = ('calendar', 'files', 'mail', 'webdav') _DEFAULT_PERMISSIONS = { 'inbound': {s: False for s in VALID_SERVICES}, 'outbound': {s: False for s in VALID_SERVICES}, } _PUSH_TIMEOUT = 5 # seconds def _default_perms() -> Dict[str, Any]: return { 'inbound': {s: False for s in VALID_SERVICES}, 'outbound': {s: False for s in VALID_SERVICES}, } class CellLinkManager: def __init__(self, data_dir: str, config_dir: str, wireguard_manager, network_manager): self.data_dir = data_dir self.config_dir = config_dir self.wireguard_manager = wireguard_manager self.network_manager = network_manager self.links_file = os.path.join(data_dir, 'cell_links.json') # ── Storage ─────────────────────────────────────────────────────────────── def _load(self) -> List[Dict[str, Any]]: if os.path.exists(self.links_file): try: with open(self.links_file) as f: links = json.load(f) changed = False for link in links: if 'permissions' not in link: link['permissions'] = _default_perms() changed = True # Phase 1 migration: permission-sync tracking fields if 'remote_api_url' not in link: link['remote_api_url'] = ( f"http://{link['dns_ip']}:3000" if link.get('dns_ip') else None ) changed = True if 'last_push_status' not in link: link['last_push_status'] = 'never' changed = True if 'last_push_at' not in link: link['last_push_at'] = None changed = True if 'last_push_error' not in link: link['last_push_error'] = None changed = True if 'pending_push' not in link: # Existing links predate sync — mark pending so startup replay syncs them link['pending_push'] = True changed = True if 'last_remote_update_at' not in link: link['last_remote_update_at'] = None changed = True # Phase 2 migration: exit-offer signaling fields if 'exit_offered' not in link: link['exit_offered'] = False changed = True if 'remote_exit_offered' not in link: link['remote_exit_offered'] = False changed = True # Phase 3 migration: per-peer internet routing if 'exit_relay_active' not in link: link['exit_relay_active'] = False changed = True if 'remote_exit_relay_active' not in link: link['remote_exit_relay_active'] = False changed = True if changed: self._save(links) return links except Exception: return [] return [] def _save(self, links: List[Dict[str, Any]]): with open(self.links_file, 'w') as f: json.dump(links, f, indent=2) # ── Sync helpers ────────────────────────────────────────────────────────── def _local_identity(self) -> Dict[str, str]: """Return this cell's name and WG public key for outbound peer-sync calls.""" from app import config_manager identity = config_manager.configs.get('_identity', {}) cell_name = identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell')) keys = self.wireguard_manager.get_keys() return {'cell_name': cell_name, 'public_key': keys.get('public_key', '')} def _local_wg_ip(self) -> Optional[str]: """Return the local cell-wireguard wg0 IP (e.g. '10.0.0.1'). Used to set X-Forwarded-For so the remote peer-sync endpoint can authenticate us by VPN subnet even after MASQUERADE changes the apparent source to the Docker bridge IP. """ try: r = subprocess.run( ['docker', 'exec', 'cell-wireguard', 'ip', 'addr', 'show', 'wg0'], capture_output=True, text=True, timeout=5 ) for line in r.stdout.splitlines(): line = line.strip() if line.startswith('inet '): return line.split()[1].split('/')[0] except Exception: pass return None def _push_permissions_to_remote(self, link: Dict[str, Any], from_cell: str, from_public_key: str) -> Dict[str, Any]: """POST mirrored permissions to the remote cell's peer-sync endpoint. Returns {'ok': bool, 'error': str|None}. Never raises. The body inverts inbound/outbound: our inbound (what we share with them) becomes their outbound (what they receive from us), and vice-versa. Uses 'docker exec cell-wireguard curl' so the HTTP request originates from inside cell-wireguard's network namespace, which has routes to remote cell VPN subnets that cell-api (on the Docker bridge) lacks. """ url = link.get('remote_api_url') if not url: return {'ok': False, 'error': 'no remote_api_url'} perms = link.get('permissions') or _default_perms() body = { 'version': 1, 'from_cell': from_cell, 'from_public_key': from_public_key, 'permissions': { 'outbound': dict(perms.get('inbound', {})), 'inbound': dict(perms.get('outbound', {})), }, 'exit_offered': bool(link.get('exit_offered', False)), 'use_as_exit_relay': bool(link.get('exit_relay_active', False)), 'sent_at': datetime.utcnow().isoformat() + 'Z', } payload = json.dumps(body) endpoint = url.rstrip('/') + '/api/cells/peer-sync/permissions' # Determine local WG IP so the remote can authenticate us by source subnet. # MASQUERADE rewrites source to cell-wireguard's eth0 IP (172.20.x.x), which # is NOT in the cell's vpn_subnet. Passing the true WG IP in X-Forwarded-For # lets _authenticate_peer_cell() find the matching cell link. local_wg_ip = self._local_wg_ip() xff_header = f'X-Forwarded-For: {local_wg_ip}' if local_wg_ip else None cmd = [ 'docker', 'exec', 'cell-wireguard', 'curl', '-s', '-o', '/dev/null', '-w', '%{http_code}', '-X', 'POST', '-H', 'Content-Type: application/json', ] if xff_header: cmd += ['-H', xff_header] cmd += [ '-d', payload, '--max-time', str(_PUSH_TIMEOUT), '--connect-timeout', '3', endpoint, ] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=_PUSH_TIMEOUT + 5 ) if result.returncode != 0: err = (result.stderr or result.stdout or 'curl error').strip()[:200] return {'ok': False, 'error': err} status = result.stdout.strip() if status.startswith('2'): return {'ok': True, 'error': None} return {'ok': False, 'error': f'HTTP {status}'} except subprocess.TimeoutExpired: return {'ok': False, 'error': 'timeout'} except Exception as e: return {'ok': False, 'error': str(e)[:200]} def _record_push_result(self, cell_name: str, result: Dict[str, Any]) -> None: """Persist last_push_* and pending_push after a push attempt.""" links = self._load() for link in links: if link['cell_name'] == cell_name: if result.get('ok'): link['last_push_status'] = 'ok' link['last_push_at'] = datetime.utcnow().isoformat() link['last_push_error'] = None link['pending_push'] = False else: link['last_push_status'] = 'failed' link['last_push_error'] = result.get('error') link['pending_push'] = True break self._save(links) def _try_push(self, cell_name: str, link: Dict[str, Any]) -> None: """Mark pending, push, record result. Non-fatal.""" # Mark dirty before pushing — a crash mid-push leaves it pending for replay links = self._load() for l in links: if l['cell_name'] == cell_name: l['pending_push'] = True break self._save(links) try: identity = self._local_identity() result = self._push_permissions_to_remote( link, identity['cell_name'], identity['public_key'] ) self._record_push_result(cell_name, result) if not result['ok']: logger.warning( f"Permission push to '{cell_name}' failed " f"(will retry on startup): {result['error']}" ) except Exception as e: logger.warning(f"Permission push to '{cell_name}' skipped (non-fatal): {e}") def apply_remote_permissions(self, from_public_key: str, permissions: Dict[str, Any], exit_offered: bool = False, use_as_exit_relay: bool = False) -> Dict[str, Any]: """Store permissions pushed by a remote cell (identified by WG public key). Validates service names, persists, and re-applies local iptables rules. Returns the updated link record. """ links = self._load() link = next((l for l in links if l['public_key'] == from_public_key), None) if not link: raise ValueError(f"No connection found for public_key '{from_public_key[:16]}...'") in_raw = permissions.get('inbound', {}) if isinstance(permissions, dict) else {} out_raw = permissions.get('outbound', {}) if isinstance(permissions, dict) else {} clean_inbound = {s: bool(in_raw.get(s, False)) for s in VALID_SERVICES} clean_outbound = {s: bool(out_raw.get(s, False)) for s in VALID_SERVICES} link['permissions'] = {'inbound': clean_inbound, 'outbound': clean_outbound} link['remote_exit_offered'] = bool(exit_offered) link['remote_exit_relay_active'] = bool(use_as_exit_relay) link['last_remote_update_at'] = datetime.utcnow().isoformat() self._save(links) inbound_list = [s for s, v in clean_inbound.items() if v] try: import firewall_manager as _fm _fm.apply_cell_rules(link['cell_name'], link['vpn_subnet'], inbound_list, exit_relay=use_as_exit_relay) except Exception as e: logger.warning( f"apply_cell_rules after remote push for '{link['cell_name']}' " f"failed (non-fatal): {e}" ) return link def replay_pending_pushes(self) -> Dict[str, int]: """Retry permission pushes for any link with pending_push=True. Called from _apply_startup_enforcement. Returns counts for logging. """ summary = {'attempted': 0, 'ok': 0, 'failed': 0} try: identity = self._local_identity() except Exception as e: logger.warning(f"replay_pending_pushes: cannot resolve identity ({e})") return summary for link in self._load(): if not link.get('pending_push'): continue summary['attempted'] += 1 result = self._push_permissions_to_remote( link, identity['cell_name'], identity['public_key'] ) self._record_push_result(link['cell_name'], result) if result.get('ok'): summary['ok'] += 1 logger.info(f"replay: synced permissions to '{link['cell_name']}'") else: summary['failed'] += 1 logger.warning( f"replay: push to '{link['cell_name']}' failed: {result.get('error')}" ) if summary['attempted']: logger.info( f"replay_pending_pushes: {summary['attempted']} attempted, " f"{summary['ok']} ok, {summary['failed']} failed" ) return summary # ── Public API ──────────────────────────────────────────────────────────── def generate_invite(self, cell_name: str, domain: str) -> Dict[str, Any]: """Return an invite package describing this cell for another cell to import.""" keys = self.wireguard_manager.get_keys() srv = self.wireguard_manager.get_server_config() server_vpn_ip = self.wireguard_manager._get_configured_address().split('/')[0] return { 'cell_name': cell_name, 'public_key': keys['public_key'], 'endpoint': srv.get('endpoint'), 'vpn_subnet': self.wireguard_manager._get_configured_network(), 'dns_ip': server_vpn_ip, 'domain': domain, 'version': 1, } def list_connections(self) -> List[Dict[str, Any]]: return self._load() def add_connection(self, invite: Dict[str, Any], inbound_services: Optional[List[str]] = None) -> Dict[str, Any]: """Import a remote cell's invite and establish the connection.""" links = self._load() name = invite['cell_name'] if any(l['cell_name'] == name for l in links): raise ValueError(f"Cell '{name}' is already connected") ok = self.wireguard_manager.add_cell_peer( name=name, public_key=invite['public_key'], endpoint=invite.get('endpoint', ''), vpn_subnet=invite['vpn_subnet'], ) if not ok: raise RuntimeError(f"Failed to add WireGuard peer for cell '{name}'") dns_result = self.network_manager.add_cell_dns_forward( domain=invite['domain'], dns_ip=invite['dns_ip'], ) if dns_result.get('warnings'): logger.warning('DNS forward warnings for %s: %s', name, dns_result['warnings']) inbound = [s for s in (inbound_services or []) if s in VALID_SERVICES] perms = _default_perms() for s in inbound: perms['inbound'][s] = True link = { 'cell_name': name, 'public_key': invite['public_key'], 'endpoint': invite.get('endpoint'), 'vpn_subnet': invite['vpn_subnet'], 'dns_ip': invite['dns_ip'], 'domain': invite['domain'], 'connected_at': datetime.utcnow().isoformat(), 'permissions': perms, 'remote_api_url': f"http://{invite['dns_ip']}:3000", 'last_push_status': 'never', 'last_push_at': None, 'last_push_error': None, 'pending_push': True, 'last_remote_update_at': None, } links.append(link) self._save(links) try: import firewall_manager as _fm _fm.apply_cell_rules(name, invite['vpn_subnet'], inbound) except Exception as e: logger.warning(f"apply_cell_rules for {name} failed (non-fatal): {e}") # Initial push so the remote immediately knows our permission state try: identity = self._local_identity() result = self._push_permissions_to_remote( link, identity['cell_name'], identity['public_key'] ) self._record_push_result(name, result) if not result['ok']: logger.warning( f"Initial push to '{name}' failed " f"(will retry on startup): {result['error']}" ) except Exception as e: logger.warning(f"Initial push to '{name}' skipped (non-fatal): {e}") return link def remove_connection(self, cell_name: str): """Tear down a cell connection by name.""" links = self._load() link = next((l for l in links if l['cell_name'] == cell_name), None) if not link: raise ValueError(f"Cell '{cell_name}' not found") try: import firewall_manager as _fm _fm.clear_cell_rules(cell_name) except Exception as e: logger.warning(f"clear_cell_rules for {cell_name} failed (non-fatal): {e}") self.wireguard_manager.remove_peer(link['public_key']) self.network_manager.remove_cell_dns_forward(link['domain']) links = [l for l in links if l['cell_name'] != cell_name] self._save(links) def update_permissions(self, cell_name: str, inbound: Dict[str, bool], outbound: Dict[str, bool]) -> Dict[str, Any]: """Update service sharing permissions for a cell connection. Validates, persists, re-applies iptables, then pushes to remote. Returns the updated link record. """ links = self._load() link = next((l for l in links if l['cell_name'] == cell_name), None) if not link: raise ValueError(f"Cell '{cell_name}' not found") clean_inbound = {s: bool(inbound.get(s, False)) for s in VALID_SERVICES} clean_outbound = {s: bool(outbound.get(s, False)) for s in VALID_SERVICES} link['permissions'] = {'inbound': clean_inbound, 'outbound': clean_outbound} self._save(links) inbound_list = [s for s, v in clean_inbound.items() if v] try: import firewall_manager as _fm _fm.apply_cell_rules(cell_name, link['vpn_subnet'], inbound_list) except Exception as e: logger.warning(f"apply_cell_rules for {cell_name} failed (non-fatal): {e}") # Push mirrored state to the remote cell (non-fatal) self._try_push(cell_name, link) return link def get_permissions(self, cell_name: str) -> Dict[str, Any]: """Return the permissions dict for a connected cell.""" links = self._load() link = next((l for l in links if l['cell_name'] == cell_name), None) if not link: raise ValueError(f"Cell '{cell_name}' not found") return link.get('permissions', _default_perms()) def set_exit_offered(self, cell_name: str, offered: bool) -> Dict[str, Any]: """Toggle whether THIS cell offers to route internet traffic for cell_name. The new value is persisted locally then pushed to the remote cell so it knows our offer changed. Returns the updated link record. """ links = self._load() link = next((l for l in links if l['cell_name'] == cell_name), None) if not link: raise ValueError(f"Cell '{cell_name}' not found") link['exit_offered'] = bool(offered) self._save(links) self._try_push(cell_name, link) return link def set_exit_relay_active(self, cell_name: str, active: bool) -> Dict[str, Any]: """Record that THIS cell is routing a peer's internet traffic via cell_name. Persists the flag locally and pushes updated state to the remote cell so it can enable/disable the FORWARD-to-eth0 rule on its side. Returns the updated link record. """ links = self._load() link = next((l for l in links if l['cell_name'] == cell_name), None) if not link: raise ValueError(f"Cell '{cell_name}' not found") link['exit_relay_active'] = bool(active) self._save(links) self._try_push(cell_name, link) return link def get_connection_status(self, cell_name: str) -> Dict[str, Any]: """Return link record enriched with live WireGuard handshake status.""" links = self._load() link = next((l for l in links if l['cell_name'] == cell_name), None) if not link: raise ValueError(f"Cell '{cell_name}' not found") try: st = self.wireguard_manager.get_peer_status(link['public_key']) return {**link, 'online': st.get('online', False), 'last_handshake': st.get('last_handshake')} except Exception: return {**link, 'online': False, 'last_handshake': None}