#!/usr/bin/env python3 """ Connectivity Manager for Personal Internet Cell — Phase 5 Extended Connectivity. Provides per-peer egress routing through alternate exits (WireGuard external, OpenVPN, Tor, sshuttle SSH tunnel, upstream proxy) via Linux policy routing (fwmark + ip rule + dedicated routing tables) and dedicated iptables chains. Architecture ------------ - A peer's `exit_via` field selects the egress path: "default", "wireguard_ext", "openvpn", "tor", "sshuttle", or "proxy". - Each non-default exit type is assigned a unique fwmark and a dedicated routing table: wireguard_ext mark 0x10 table 110 iface wg_ext0 openvpn mark 0x20 table 120 iface tun0 tor mark 0x30 table 130 (transparent proxy → 9040) sshuttle mark 0x40 table 140 (transparent proxy → 12300) proxy mark 0x50 table 150 (transparent proxy → 12345, redsocks) - All rules live in dedicated PIC_CONNECTIVITY chains in the `mangle` and `nat` tables so they can be flushed/rebuilt without touching firewall_manager rules. - A kill-switch FORWARD DROP rule prevents leaks if the exit interface is down. Container model --------------- Each exit type runs in its own separate container; this manager only programs policy routing rules in the WireGuard server container (cell-wireguard) where peer traffic flows through. Config files ------------ - WireGuard external: {config_dir}/connectivity/wireguard_ext/wg_ext0.conf - OpenVPN: {config_dir}/connectivity/openvpn/.ovpn Both are validated to strip / reject hook directives that could execute arbitrary commands on the host. """ import ipaddress import logging import os import re import subprocess from typing import Any, Dict, List, Optional from base_service_manager import BaseServiceManager logger = logging.getLogger(__name__) WIREGUARD_CONTAINER = 'cell-wireguard' # Lines we strip from uploaded WireGuard configs — these can run arbitrary # host commands when wg-quick brings the interface up/down. _WG_FORBIDDEN_PREFIXES = ('PostUp', 'PostDown', 'PreUp', 'PreDown') # Lines we strip from uploaded OpenVPN configs — these execute external # scripts/binaries on connect/disconnect. _OVPN_FORBIDDEN_DIRECTIVES = ( 'up', 'down', 'script-security', 'plugin', 'route-up', 'route-pre-down', ) _NAME_RE = re.compile(r'^[a-z0-9_-]{1,32}$') # sshuttle / proxy configuration validation _HOST_RE = re.compile(r'^[A-Za-z0-9]([A-Za-z0-9.-]{0,252}[A-Za-z0-9])?$') _SSH_USER_RE = re.compile(r'^[a-z_][a-z0-9_-]{0,31}$') _PROXY_USER_RE = re.compile(r'^[A-Za-z0-9._-]{1,64}$') # Printable ASCII excluding double quote and backslash — safe inside the # quoted strings of a redsocks.conf without any escaping ambiguity. _PROXY_PASSWORD_RE = re.compile(r'^[\x20\x21\x23-\x5B\x5D-\x7E]{1,128}$') _B64_RE = re.compile(r'^[A-Za-z0-9+/]+={0,2}$') _KNOWN_HOSTS_HOSTS_RE = re.compile(r'^[A-Za-z0-9\[\]:.,*?_-]{1,512}$') _SSH_KEYTYPES = ( 'ssh-ed25519', 'ssh-rsa', 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521', ) def _contains_strict_hostkey_disable(value: str) -> bool: """Detect attempts to disable SSH host-key pinning (any spacing/case).""" normalized = re.sub(r'\s+', '', value.lower()) return ('stricthostkeychecking=no' in normalized or 'stricthostkeycheckingno' in normalized) def _validate_host(host) -> Optional[str]: """Return a validated hostname/IP, or None when invalid.""" if not isinstance(host, str): return None host = host.strip() if not host or '..' in host or not _HOST_RE.match(host): return None return host def _validate_port(port) -> Optional[int]: """Return a validated TCP port (1-65535), or None when invalid.""" try: port = int(port) except (TypeError, ValueError): return None if not 1 <= port <= 65535: return None return port class ConnectivityManager(BaseServiceManager): """Manages alternate egress paths (extended connectivity) for peers.""" EXIT_TYPES = ("default", "wireguard_ext", "openvpn", "tor", "sshuttle", "proxy") MARKS = {"wireguard_ext": 0x10, "openvpn": 0x20, "tor": 0x30, "sshuttle": 0x40, "proxy": 0x50} TABLES = {"wireguard_ext": 110, "openvpn": 120, "tor": 130, "sshuttle": 140, "proxy": 150} IFACES = {"wireguard_ext": "wg_ext0", "openvpn": "tun0"} TOR_TRANS_PORT = 9040 TOR_DNS_PORT = 5353 SSHUTTLE_PORT = 12300 REDSOCKS_PORT = 12345 # Exits that work as pure iptables REDIRECTs to a local transparent-proxy # port (no exit interface, no kill-switch interface). REDIRECT_PORTS = {"tor": TOR_TRANS_PORT, "sshuttle": SSHUTTLE_PORT, "proxy": REDSOCKS_PORT} # Store-service ids / container names backing each exit type — used to # report an exit as configured when it was installed via the Service Store # rather than through a legacy config upload. STORE_SERVICE_IDS = { "wireguard_ext": "wireguard-ext", "openvpn": "openvpn-client", "tor": "tor", "sshuttle": "sshuttle", "proxy": "proxy", } EXIT_CONTAINERS = { "wireguard_ext": "cell-wg-ext", "openvpn": "cell-openvpn", "tor": "cell-tor", "sshuttle": "cell-sshuttle", "proxy": "cell-redsocks", } # RFC1918 ranges excluded from the sshuttle tunnel by default so cell-local # and LAN traffic is never tunneled. RFC1918_SUBNETS = ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16") CONNECTIVITY_CHAIN = 'PIC_CONNECTIVITY' def __init__(self, config_manager=None, peer_registry=None, vault_manager=None, data_dir: str = '/app/data', config_dir: str = '/app/config'): super().__init__('connectivity', data_dir, config_dir) self.config_manager = config_manager self.peer_registry = peer_registry self.vault_manager = vault_manager # Connectivity configs live under the per-service data dir so that # ${PIC_DATA_DIR}/services//config bind mounts in store compose # templates can read them (Docker daemon resolves paths on the HOST, # so they must be reachable via data_dir, not config_dir). services_dir = os.path.join(data_dir, 'services') self.wireguard_ext_dir = os.path.join(services_dir, 'wireguard-ext', 'config') self.openvpn_dir = os.path.join(services_dir, 'openvpn-client', 'config') self.sshuttle_dir = os.path.join(services_dir, 'sshuttle', 'config') self.proxy_dir = os.path.join(services_dir, 'proxy', 'config') for d in (self.wireguard_ext_dir, self.openvpn_dir, self.sshuttle_dir, self.proxy_dir): self.safe_makedirs(d) # One-shot migration from the legacy config_dir/connectivity/ location. _legacy_base = os.path.join(config_dir, 'connectivity') self._migrate_legacy_configs(_legacy_base) # Subscribe to ServiceBus CONFIG_CHANGED events so routes are # reapplied if the underlying network changes. Done lazily — # service_bus is a singleton imported at app startup. self._subscribe_to_events() # ── Legacy migration ────────────────────────────────────────────────── def _migrate_legacy_configs(self, legacy_base: str) -> None: """Copy files from the old config_dir/connectivity/ tree to the new data_dir locations. The old layout stored WireGuard and OpenVPN configs under the API container's config_dir, which Docker cannot bind-mount into store-service containers. Files are copied (not moved) so the legacy location still works until the operator removes it manually. """ import shutil pairs = ( (os.path.join(legacy_base, 'wireguard_ext'), self.wireguard_ext_dir), (os.path.join(legacy_base, 'openvpn'), self.openvpn_dir), ) for src_dir, dst_dir in pairs: if not os.path.isdir(src_dir): continue try: for fname in os.listdir(src_dir): src_file = os.path.join(src_dir, fname) dst_file = os.path.join(dst_dir, fname) if os.path.isfile(src_file) and not os.path.exists(dst_file): shutil.copy2(src_file, dst_file) os.chmod(dst_file, 0o600) logger.info('connectivity: migrated %s → %s', src_file, dst_file) except OSError as e: logger.warning('connectivity: migration from %s failed: %s', src_dir, e) # ── Event wiring ────────────────────────────────────────────────────── def _subscribe_to_events(self) -> None: """Subscribe to network change events so routes auto-reapply.""" try: from managers import service_bus, EventType service_bus.subscribe_to_event( EventType.CONFIG_CHANGED, self._on_network_changed ) except Exception as e: # Non-fatal: subscription is best-effort, manual apply still works. logger.debug(f"connectivity: event subscribe skipped: {e}") def _on_network_changed(self, event) -> None: """ServiceBus handler: re-apply routes when network config changes.""" try: source = getattr(event, 'source', '') if source not in ('network', 'wireguard', 'connectivity'): return logger.info(f"connectivity: re-applying routes due to {source} change") self.apply_routes() except Exception as e: logger.warning(f"connectivity: on_network_changed failed (non-fatal): {e}") # ── BaseServiceManager required ─────────────────────────────────────── def get_status(self) -> Dict[str, Any]: """Return status summary including configured exits and peer count.""" try: exits_status: Dict[str, Dict[str, Any]] = {} for exit_type in self.EXIT_TYPES: if exit_type == "default": continue exits_status[exit_type] = self._exit_status(exit_type) peers_with_exit = 0 if self.peer_registry is not None: try: for peer in self.peer_registry.list_peers(): if peer.get('exit_via', 'default') != 'default': peers_with_exit += 1 except Exception as e: logger.warning(f"get_status: peer count failed: {e}") return { 'service': 'connectivity', 'running': True, 'exits': exits_status, 'peers_with_exit': peers_with_exit, } except Exception as e: return self.handle_error(e, 'get_status') def test_connectivity(self) -> Dict[str, Any]: """Minimal connectivity self-test.""" return {'success': True} def get_config(self) -> Dict[str, Any]: """Return current connectivity config from config_manager.""" try: if self.config_manager is not None and hasattr( self.config_manager, 'get_connectivity_config' ): return self.config_manager.get_connectivity_config() except Exception as e: logger.warning(f"get_config: config_manager lookup failed: {e}") return {'exits': {}, 'peer_exit_map': {}} # ── Public API ──────────────────────────────────────────────────────── def list_exits(self) -> List[Dict[str, Any]]: """List configured exits with current status.""" result: List[Dict[str, Any]] = [] for exit_type in self.EXIT_TYPES: if exit_type == "default": continue entry = {'type': exit_type} entry.update(self._exit_status(exit_type)) result.append(entry) return result def get_peer_exits(self) -> Dict[str, str]: """Return {peer_name: exit_type} for all peers.""" out: Dict[str, str] = {} if self.peer_registry is None: return out try: for peer in self.peer_registry.list_peers(): name = peer.get('peer') if name: out[name] = peer.get('exit_via', 'default') except Exception as e: logger.warning(f"get_peer_exits: {e}") return out def set_peer_exit(self, peer_name: str, exit_type: str) -> Dict[str, Any]: """Assign a peer to an egress path and apply the rule changes.""" if exit_type not in self.EXIT_TYPES: return { 'ok': False, 'error': f"invalid exit_type {exit_type!r}; " f"must be one of {self.EXIT_TYPES}", } if not isinstance(peer_name, str) or not re.match(r'^[A-Za-z0-9_.-]{1,64}$', peer_name): return {'ok': False, 'error': f'invalid peer_name {peer_name!r}'} if self.peer_registry is None: return {'ok': False, 'error': 'peer_registry not available'} try: ok = self.peer_registry.set_peer_exit_via(peer_name, exit_type) except Exception as e: logger.error(f"set_peer_exit: registry update failed: {e}") return {'ok': False, 'error': str(e)} if not ok: return {'ok': False, 'error': f'peer {peer_name!r} not found'} try: self.apply_routes() except Exception as e: logger.warning(f"set_peer_exit: apply_routes failed (non-fatal): {e}") return {'ok': True, 'peer': peer_name, 'exit_via': exit_type} def upload_wireguard_ext(self, conf_text: str) -> Dict[str, Any]: """Validate and store an external WireGuard config.""" try: cleaned = self._validate_wg_conf(conf_text) except ValueError as e: return {'ok': False, 'error': str(e)} path = os.path.join(self.wireguard_ext_dir, 'wg_ext0.conf') try: self._write_secure(path, cleaned) except Exception as e: logger.error(f"upload_wireguard_ext: write failed: {e}") return {'ok': False, 'error': str(e)} logger.info(f"connectivity: stored wg_ext0.conf ({len(cleaned)} bytes)") return {'ok': True} def upload_openvpn(self, ovpn_text: str, name: str = 'default') -> Dict[str, Any]: """Validate and store an OpenVPN profile.""" if not isinstance(name, str) or not _NAME_RE.match(name): return { 'ok': False, 'error': f'invalid name {name!r}; must match [a-z0-9_-]{{1,32}}', } try: cleaned = self._validate_ovpn(ovpn_text) except ValueError as e: return {'ok': False, 'error': str(e)} path = os.path.join(self.openvpn_dir, f'{name}.ovpn') try: self._write_secure(path, cleaned) except Exception as e: logger.error(f"upload_openvpn: write failed: {e}") return {'ok': False, 'error': str(e)} logger.info(f"connectivity: stored {name}.ovpn ({len(cleaned)} bytes)") return {'ok': True} def configure_sshuttle(self, cfg: Dict[str, Any]) -> Dict[str, Any]: """Validate and store an sshuttle (SSH tunnel) exit configuration. Requires a pinned host key (a single known_hosts line); rejects any attempt to disable strict host-key checking. Secrets (private key / password) are written 0o600 under data/services/sshuttle/config/ and mirrored into the vault — they are never placed in cell_config.json. """ if not isinstance(cfg, dict): return {'ok': False, 'error': 'config must be a JSON object'} for value in cfg.values(): if isinstance(value, str) and _contains_strict_hostkey_disable(value): return { 'ok': False, 'error': 'StrictHostKeyChecking=no is not allowed; ' 'a pinned host key (known_hosts line) is required', } host = _validate_host(cfg.get('host')) if not host: return {'ok': False, 'error': 'invalid host: must be a hostname or IP'} port = _validate_port(cfg.get('port', 22)) if port is None: return {'ok': False, 'error': 'invalid port: must be 1-65535'} user = cfg.get('user') if not isinstance(user, str) or not _SSH_USER_RE.match(user): return { 'ok': False, 'error': 'invalid user: must match ^[a-z_][a-z0-9_-]{0,31}$', } auth = cfg.get('auth', 'key') if auth not in ('key', 'password'): return {'ok': False, 'error': "invalid auth: must be 'key' or 'password'"} known_hosts = cfg.get('known_hosts') err = self._validate_known_hosts_line(known_hosts) if err: return {'ok': False, 'error': err} known_hosts = known_hosts.strip() private_key = '' password = '' if auth == 'key': private_key = cfg.get('private_key', '') if not isinstance(private_key, str) or 'PRIVATE KEY' not in private_key: return { 'ok': False, 'error': 'private_key is required for key auth and must be ' 'a PEM/OpenSSH private key', } else: password = cfg.get('password', '') if not isinstance(password, str) or not password or '\n' in password: return {'ok': False, 'error': 'password is required for password auth'} exclude_subnets = cfg.get('exclude_subnets') if exclude_subnets is None: exclude_subnets = self._default_exclude_subnets() if not isinstance(exclude_subnets, list): return {'ok': False, 'error': 'exclude_subnets must be a list of CIDRs'} validated_excludes = [] for net in exclude_subnets: try: validated_excludes.append(str(ipaddress.ip_network(str(net), strict=False))) except ValueError: return {'ok': False, 'error': f'invalid exclude subnet: {net!r}'} conf_lines = [ f'HOST={host}', f'PORT={port}', f'USER={user}', f'AUTH={auth}', f'LISTEN_PORT={self.SSHUTTLE_PORT}', f'EXCLUDE={",".join(validated_excludes)}', ] try: self._write_secure( os.path.join(self.sshuttle_dir, 'known_hosts'), known_hosts + '\n', ) if auth == 'key': key_text = private_key.rstrip('\n') + '\n' self._write_secure(os.path.join(self.sshuttle_dir, 'id_pic'), key_text) else: self._write_secure( os.path.join(self.sshuttle_dir, 'password'), password + '\n') self._write_secure( os.path.join(self.sshuttle_dir, 'sshuttle.conf'), '\n'.join(conf_lines) + '\n', ) except Exception as e: logger.error(f"configure_sshuttle: write failed: {e}") return {'ok': False, 'error': 'failed to write sshuttle configuration'} if self.vault_manager is not None: try: if auth == 'key': self.vault_manager.store_secret('connectivity_sshuttle_key', private_key) else: self.vault_manager.store_secret('connectivity_sshuttle_password', password) except Exception as e: logger.warning(f"configure_sshuttle: vault store failed: {e}") self._persist_exit_config('sshuttle', { 'host': host, 'port': port, 'user': user, 'auth': auth, 'exclude_subnets': validated_excludes, }) logger.info(f"connectivity: configured sshuttle exit ({user}@{host}:{port})") return {'ok': True} def configure_proxy(self, cfg: Dict[str, Any]) -> Dict[str, Any]: """Validate and store an upstream proxy (redsocks) exit configuration. Generates redsocks.conf from strictly validated fields only — no value that could break out of the quoted config strings is accepted. The password lives in the 0o600 conf file, never in compose env. """ if not isinstance(cfg, dict): return {'ok': False, 'error': 'config must be a JSON object'} scheme = cfg.get('scheme') if scheme not in ('http', 'socks5'): return {'ok': False, 'error': "invalid scheme: must be 'http' or 'socks5'"} host = _validate_host(cfg.get('host')) if not host: return {'ok': False, 'error': 'invalid host: must be a hostname or IP'} port = _validate_port(cfg.get('port')) if port is None: return {'ok': False, 'error': 'invalid port: must be 1-65535'} user = cfg.get('user') or '' password = cfg.get('password') or '' if user and not (isinstance(user, str) and _PROXY_USER_RE.match(user)): return { 'ok': False, 'error': 'invalid user: must match ^[A-Za-z0-9._-]{1,64}$', } if password and not (isinstance(password, str) and _PROXY_PASSWORD_RE.match(password)): return { 'ok': False, 'error': 'invalid password: 1-128 printable ASCII characters ' 'excluding double quotes and backslashes', } if password and not user: return {'ok': False, 'error': 'password requires a user'} conf = self._render_redsocks_conf(scheme, host, port, user, password) try: self._write_secure(os.path.join(self.proxy_dir, 'redsocks.conf'), conf) except Exception as e: logger.error(f"configure_proxy: write failed: {e}") return {'ok': False, 'error': 'failed to write redsocks configuration'} self._persist_exit_config('proxy', { 'scheme': scheme, 'host': host, 'port': port, 'user': user, }) logger.info(f"connectivity: configured proxy exit ({scheme}://{host}:{port})") return {'ok': True} def _render_redsocks_conf(self, scheme: str, host: str, port: int, user: str, password: str) -> str: """Build a redsocks.conf from already-validated fields.""" redsocks_type = 'socks5' if scheme == 'socks5' else 'http-connect' lines = [ 'base {', ' log_debug = off;', ' log_info = on;', ' log = stderr;', ' daemon = off;', ' redirector = iptables;', '}', '', 'redsocks {', ' local_ip = 0.0.0.0;', f' local_port = {self.REDSOCKS_PORT};', f' ip = {host};', f' port = {port};', f' type = {redsocks_type};', ] if user: lines.append(f' login = "{user}";') if password: lines.append(f' password = "{password}";') lines.append('}') return '\n'.join(lines) + '\n' @staticmethod def _validate_known_hosts_line(line) -> Optional[str]: """Validate a single known_hosts line; return an error string or None. Expected format: host[,ip] keytype base64key [comment] """ if not isinstance(line, str) or not line.strip(): return ('known_hosts is required: a pinned host key line ' '(host[,ip] keytype base64key)') line = line.strip() if '\n' in line or '\r' in line: return 'known_hosts must be a single line' parts = line.split() if len(parts) < 3: return ('invalid known_hosts line: expected ' 'host[,ip] keytype base64key') hosts, keytype, key = parts[0], parts[1], parts[2] if not _KNOWN_HOSTS_HOSTS_RE.match(hosts): return f'invalid known_hosts host field: {hosts!r}' if keytype not in _SSH_KEYTYPES: return (f'invalid known_hosts key type {keytype!r}; ' f'must be one of {_SSH_KEYTYPES}') if not _B64_RE.match(key): return 'invalid known_hosts key: not valid base64' return None def _default_exclude_subnets(self) -> List[str]: """Cell subnet + RFC1918 ranges — internal traffic is never tunneled.""" excludes = list(self.RFC1918_SUBNETS) try: if self.config_manager is not None: identity = self.config_manager.get_identity() if isinstance(identity, dict): ip_range = identity.get('ip_range', '') if isinstance(ip_range, str) and '/' in ip_range \ and ip_range not in excludes: excludes.insert(0, ip_range) except Exception as e: logger.debug(f"_default_exclude_subnets: {e}") return excludes def _persist_exit_config(self, exit_type: str, fields: Dict[str, Any]) -> None: """Persist non-secret exit fields under connectivity.exits in config.""" if self.config_manager is None: return try: cfg = self.config_manager.get_connectivity_config() exits = cfg.get('exits') if isinstance(cfg, dict) else None exits = dict(exits) if isinstance(exits, dict) else {} exits[exit_type] = fields self.config_manager.set_connectivity_field('exits', exits) except Exception as e: logger.warning(f"_persist_exit_config({exit_type}): {e}") # ── Routing application ─────────────────────────────────────────────── def apply_routes(self) -> Dict[str, Any]: """Idempotently rebuild all connectivity rules and policy routing.""" rules_applied = 0 try: self._ensure_chains() except Exception as e: logger.warning(f"apply_routes: _ensure_chains failed: {e}") # Flush our dedicated chains (without deleting them) for table, chain in (('mangle', self.CONNECTIVITY_CHAIN), ('nat', self.CONNECTIVITY_CHAIN)): try: self._flush_chain(table, chain) except Exception as e: logger.warning(f"apply_routes: flush {table}/{chain} failed: {e}") # Idempotent ip rule registration for each non-default exit for exit_type in self.MARKS: mark = self.MARKS[exit_type] table = self.TABLES[exit_type] try: self._remove_ip_rule(mark, table) self._add_ip_rule(mark, table) rules_applied += 1 except Exception as e: logger.warning(f"apply_routes: ip rule {exit_type} failed: {e}") # Per-peer marking + nat redirect (Tor only) if self.peer_registry is not None: try: peers = self.peer_registry.list_peers() except Exception as e: logger.warning(f"apply_routes: list_peers failed: {e}") peers = [] for peer in peers: exit_via = peer.get('exit_via', 'default') if exit_via == 'default' or exit_via not in self.MARKS: continue src_ip = self._peer_source_ip(peer.get('peer', '')) if not src_ip: continue mark = self.MARKS[exit_via] try: self._add_mark_rule(src_ip, mark) rules_applied += 1 except Exception as e: logger.warning( f"apply_routes: mark rule for {src_ip}/{exit_via}: {e}" ) # Tor / sshuttle / proxy: redirect TCP to the local # transparent-proxy port for that exit. if exit_via in self.REDIRECT_PORTS: try: self._add_redirect(src_ip, self.REDIRECT_PORTS[exit_via]) rules_applied += 1 except Exception as e: logger.warning( f"apply_routes: {exit_via} redirect for {src_ip}: {e}" ) # Kill-switch: drop marked packets that would otherwise leak via the # default route if the exit interface is down. for exit_type, iface in self.IFACES.items(): mark = self.MARKS[exit_type] try: self._add_killswitch(mark, iface) rules_applied += 1 except Exception as e: logger.warning(f"apply_routes: killswitch {exit_type}: {e}") return {'ok': True, 'rules_applied': rules_applied} # ── iptables / ip rule helpers ──────────────────────────────────────── def _wg_iptables(self, args: List[str], timeout: int = 10) -> subprocess.CompletedProcess: """Run iptables inside the WireGuard container (where peer traffic forwards).""" cmd = ['docker', 'exec', WIREGUARD_CONTAINER, 'iptables'] + args return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) def _wg_ip(self, args: List[str], timeout: int = 10) -> subprocess.CompletedProcess: """Run `ip` inside the WireGuard container.""" cmd = ['docker', 'exec', WIREGUARD_CONTAINER, 'ip'] + args return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) def _ensure_chains(self) -> None: """Create PIC_CONNECTIVITY chains in mangle and nat (idempotent).""" for table, parent_chain in ( ('mangle', 'PREROUTING'), ('nat', 'PREROUTING'), ): # Create chain if it doesn't already exist check = self._wg_iptables( ['-t', table, '-L', self.CONNECTIVITY_CHAIN, '-n'] ) if check.returncode != 0: create = self._wg_iptables( ['-t', table, '-N', self.CONNECTIVITY_CHAIN] ) if create.returncode != 0 and 'exists' not in (create.stderr or ''): logger.warning( f"_ensure_chains: cannot create {table}/{self.CONNECTIVITY_CHAIN}: " f"{create.stderr.strip()}" ) # Insert jump from parent chain at position 1, idempotent. jump_args = ['-t', table, '-C', parent_chain, '-j', self.CONNECTIVITY_CHAIN] exists = self._wg_iptables(jump_args) if exists.returncode != 0: self._wg_iptables( ['-t', table, '-I', parent_chain, '1', '-j', self.CONNECTIVITY_CHAIN] ) def _flush_chain(self, table: str, chain: str) -> None: """Flush a chain in-place (`iptables -F`) without deleting it.""" self._wg_iptables(['-t', table, '-F', chain]) def _add_ip_rule(self, mark: int, table: int) -> None: """Add `ip rule fwmark lookup `.""" self._wg_ip(['rule', 'add', 'fwmark', hex(mark), 'lookup', str(table)]) def _remove_ip_rule(self, mark: int, table: int) -> None: """Remove all matching `ip rule fwmark lookup
` (idempotent).""" # `ip rule del` returns nonzero when no matching rule exists; loop # until it fails to drain duplicates. for _ in range(8): r = self._wg_ip(['rule', 'del', 'fwmark', hex(mark), 'lookup', str(table)]) if r.returncode != 0: break def _add_mark_rule(self, src_ip: str, mark: int) -> None: """Mark packets from src_ip with mark in the mangle PIC_CONNECTIVITY chain.""" self._wg_iptables([ '-t', 'mangle', '-A', self.CONNECTIVITY_CHAIN, '-s', src_ip, '-j', 'MARK', '--set-mark', hex(mark), ]) def _add_redirect(self, src_ip: str, port: int) -> None: """Redirect peer's TCP traffic to a local transparent-proxy port.""" self._wg_iptables([ '-t', 'nat', '-A', self.CONNECTIVITY_CHAIN, '-s', src_ip, '-p', 'tcp', '-j', 'REDIRECT', '--to-ports', str(port), ]) def _add_tor_redirect(self, src_ip: str) -> None: """Redirect peer's TCP traffic to local Tor TransPort.""" self._add_redirect(src_ip, self.TOR_TRANS_PORT) def _add_killswitch(self, mark: int, iface: Optional[str]) -> None: """Drop marked packets that would egress via any interface other than iface. For Tor (no exit iface), skip — Tor traffic is fully redirected at nat/REDIRECT and never reaches FORWARD. """ if not iface: return # Use -C to test, -A to add — idempotent. check_args = ['-C', 'FORWARD', '-m', 'mark', '--mark', hex(mark), '!', '-o', iface, '-j', 'DROP'] exists = self._wg_iptables(check_args) if exists.returncode != 0: self._wg_iptables(['-A', 'FORWARD', '-m', 'mark', '--mark', hex(mark), '!', '-o', iface, '-j', 'DROP']) def _exit_status(self, exit_type: str) -> Dict[str, Any]: """Return per-exit status (config presence + interface up/down). An exit counts as configured when a legacy uploaded config file exists, OR the backing store service is installed, OR its container is running — exits installed via the Service Store never create the legacy upload files. """ info: Dict[str, Any] = {'configured': False, 'iface_up': False} if exit_type == 'wireguard_ext': path = os.path.join(self.wireguard_ext_dir, 'wg_ext0.conf') info['configured'] = os.path.isfile(path) elif exit_type == 'openvpn': try: info['configured'] = any( f.endswith('.ovpn') for f in os.listdir(self.openvpn_dir) ) except OSError: info['configured'] = False elif exit_type == 'tor': info['configured'] = True # Tor uses defaults; no per-cell config elif exit_type == 'sshuttle': info['configured'] = os.path.isfile( os.path.join(self.sshuttle_dir, 'sshuttle.conf')) elif exit_type == 'proxy': info['configured'] = os.path.isfile( os.path.join(self.proxy_dir, 'redsocks.conf')) if not info['configured'] and ( self._store_service_installed(exit_type) or self._exit_container_running(exit_type) ): info['configured'] = True iface = self.IFACES.get(exit_type) if iface: try: r = self._wg_ip(['link', 'show', iface], timeout=5) info['iface_up'] = r.returncode == 0 and 'UP' in (r.stdout or '') except Exception: info['iface_up'] = False if info['iface_up']: info['status'] = 'active' elif info['configured']: info['status'] = 'configured' else: info['status'] = 'not_configured' return info def _store_service_installed(self, exit_type: str) -> bool: """True when the store service backing this exit type is installed.""" svc_id = self.STORE_SERVICE_IDS.get(exit_type) if not svc_id or self.config_manager is None: return False try: installed = self.config_manager.get_installed_services() except Exception as e: logger.debug(f"_store_service_installed({exit_type}): {e}") return False return isinstance(installed, dict) and svc_id in installed def _exit_container_running(self, exit_type: str) -> bool: """True when the exit's container is currently running.""" cname = self.EXIT_CONTAINERS.get(exit_type) if not cname: return False try: r = subprocess.run( ['docker', 'inspect', '-f', '{{.State.Running}}', cname], capture_output=True, text=True, timeout=5, ) return r.returncode == 0 and r.stdout.strip() == 'true' except Exception: return False def _peer_source_ip(self, peer_name: str) -> Optional[str]: """Return a peer's WireGuard IP (no /CIDR suffix).""" if not peer_name or self.peer_registry is None: return None try: peer = self.peer_registry.get_peer(peer_name) except Exception as e: logger.warning(f"_peer_source_ip({peer_name}): {e}") return None if not peer: return None ip = peer.get('ip', '') if not ip: return None return ip.split('/')[0] # ── Config validation ───────────────────────────────────────────────── def _validate_wg_conf(self, text: str) -> str: """Strip Pre/Post-Up/Down hooks and reject conflicting wg0 interface. Raises ValueError if the config tries to define `Interface = wg0` (which would clash with the existing peer-server interface). """ if not isinstance(text, str): raise ValueError('wg conf must be a string') cleaned: List[str] = [] for raw_line in text.splitlines(): stripped = raw_line.strip() # Reject wg0 interface declaration that would conflict with the # existing WireGuard server interface. if stripped.lower().startswith('interface'): # Look ahead in subsequent lines for `= wg0` would be hard; # the [Interface] section header itself is fine. We only # reject explicit Name/Interface = wg0 directives. pass # Match assignments like `PostUp = ...` if '=' in stripped: key = stripped.split('=', 1)[0].strip() if key in _WG_FORBIDDEN_PREFIXES: logger.info(f"_validate_wg_conf: dropped {key} hook") continue # Detect Name = wg0 or Interface = wg0 inside Interface section if key.lower() in ('name', 'interface') and \ stripped.split('=', 1)[1].strip().lower() == 'wg0': raise ValueError( "config defines interface 'wg0' which conflicts " "with the peer-server interface" ) cleaned.append(raw_line) return '\n'.join(cleaned).rstrip() + '\n' def _validate_ovpn(self, text: str) -> str: """Strip directives that execute external scripts/binaries.""" if not isinstance(text, str): raise ValueError('ovpn conf must be a string') cleaned: List[str] = [] for raw_line in text.splitlines(): stripped = raw_line.strip() # Match the directive name (first whitespace-delimited token). if stripped and not stripped.startswith('#'): first = stripped.split(None, 1)[0] if first in _OVPN_FORBIDDEN_DIRECTIVES: logger.info(f"_validate_ovpn: dropped {first} directive") continue cleaned.append(raw_line) return '\n'.join(cleaned).rstrip() + '\n' # ── Filesystem helpers ──────────────────────────────────────────────── @staticmethod def _write_secure(path: str, text: str) -> None: """Atomic 0o600 write — secrets in these configs must not be world-readable.""" os.makedirs(os.path.dirname(path), exist_ok=True) tmp = path + '.tmp' fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) try: with os.fdopen(fd, 'w') as f: f.write(text) except Exception: try: os.unlink(tmp) except OSError: pass raise os.chmod(tmp, 0o600) os.replace(tmp, path) os.chmod(path, 0o600)