diff --git a/api/config_manager.py b/api/config_manager.py index c931fe6..f07fc64 100644 --- a/api/config_manager.py +++ b/api/config_manager.py @@ -14,6 +14,7 @@ import fnmatch import yaml import shutil import hashlib +import threading from datetime import datetime from typing import Dict, List, Optional, Any from pathlib import Path @@ -61,6 +62,13 @@ class ConfigManager: pass self.service_schemas = self._load_service_schemas() self.configs = self._load_all_configs() + # Guards concurrent reads/writes of the connectivity v2 section. + self._connectivity_lock = threading.RLock() + # Optional callback invoked to migrate the legacy connectivity section + # to v2 on first access. Wired by ConnectivityManager (which owns the + # resource-allocation logic). Until set, get_connectivity() returns the + # raw (possibly legacy) section without migrating. + self._connectivity_migrator = None # Ensure _identity key always exists if '_identity' not in self.configs: self.configs['_identity'] = {} @@ -1037,6 +1045,130 @@ class ConfigManager: logger.error(f"set_connectivity_field({field}): {e}") return False + # ── Connectivity v2 — named connection instances ────────────────────── + # + # The legacy schema stored at most one exit per type under + # `connectivity.exits` plus a `peer_exit_map`. v2 replaces this with a list + # of named connection instances under `connectivity.connections`, each with + # its own allocated routing resources (mark/table/iface/redirect_port) and + # vault secret references. The legacy keys are kept readable so the one-time + # migration can consume them; the new code path uses `connections`. + + def register_connectivity_migrator(self, migrator) -> None: + """Register the v1→v2 migration callback (owned by ConnectivityManager). + + `migrator(legacy_section) -> list[connection_record]` builds the v2 + connection records (allocating resources, repointing secrets) from the + legacy section. Called at most once, lazily, on first get_connectivity(). + """ + self._connectivity_migrator = migrator + + def get_connectivity(self) -> Dict[str, Any]: + """Return the connectivity v2 dict, running v1→v2 migration if needed. + + Idempotent: once `version` is 2 the stored section is returned as-is. + When `version` < 2 and a migrator is registered, the legacy exits are + converted to connection instances exactly once and the result persisted. + """ + with self._connectivity_lock: + cfg = self.configs.get('connectivity') + if not isinstance(cfg, dict): + cfg = {} + if cfg.get('version') == 2 and isinstance(cfg.get('connections'), list): + return self._copy_connectivity(cfg) + + connections: List[Dict[str, Any]] = [] + if self._connectivity_migrator is not None: + try: + built = self._connectivity_migrator(dict(cfg)) + if isinstance(built, list): + connections = built + except Exception as e: + logger.error(f"connectivity v1→v2 migration failed: {e}") + raise + + new_cfg = dict(cfg) + new_cfg['version'] = 2 + new_cfg['connections'] = connections + self.configs['connectivity'] = new_cfg + self._save_all_configs() + return self._copy_connectivity(new_cfg) + + @staticmethod + def _copy_connectivity(cfg: Dict[str, Any]) -> Dict[str, Any]: + """Deep-ish copy of the connectivity section so callers can't mutate state.""" + out = dict(cfg) + out['connections'] = [dict(c) for c in cfg.get('connections', [])] + return out + + def list_connections(self) -> List[Dict[str, Any]]: + """Return a copy of all v2 connection records.""" + with self._connectivity_lock: + return self.get_connectivity().get('connections', []) + + def get_connection(self, conn_id: str) -> Optional[Dict[str, Any]]: + """Return a copy of one connection record by id, or None.""" + with self._connectivity_lock: + for conn in self.get_connectivity().get('connections', []): + if conn.get('id') == conn_id: + return dict(conn) + return None + + def add_connection(self, record: Dict[str, Any]) -> bool: + """Append a connection record and persist atomically.""" + with self._connectivity_lock: + cfg = self.get_connectivity() + conns = cfg.get('connections', []) + conns.append(dict(record)) + self.configs['connectivity'] = { + **self.configs.get('connectivity', {}), + 'version': 2, + 'connections': conns, + } + self._save_all_configs() + return True + + def update_connection(self, conn_id: str, fields: Dict[str, Any]) -> bool: + """Merge `fields` into the connection record with id `conn_id`.""" + with self._connectivity_lock: + cfg = self.get_connectivity() + conns = cfg.get('connections', []) + found = False + for conn in conns: + if conn.get('id') == conn_id: + conn.update(fields) + found = True + break + if not found: + return False + self.configs['connectivity'] = { + **self.configs.get('connectivity', {}), + 'version': 2, + 'connections': conns, + } + self._save_all_configs() + return True + + def delete_connection(self, conn_id: str) -> bool: + """Remove the connection record with id `conn_id`.""" + with self._connectivity_lock: + cfg = self.get_connectivity() + conns = cfg.get('connections', []) + remaining = [c for c in conns if c.get('id') != conn_id] + if len(remaining) == len(conns): + return False + self.configs['connectivity'] = { + **self.configs.get('connectivity', {}), + 'version': 2, + 'connections': remaining, + } + self._save_all_configs() + return True + + def set_connection_status(self, conn_id: str, status: Dict[str, Any]) -> bool: + """Replace the `status` sub-dict of one connection record.""" + return self.update_connection(conn_id, {'status': dict(status)}) + def get_all_configs(self) -> Dict[str, Dict]: """Get all service configurations""" return self.configs.copy() diff --git a/api/connectivity_manager.py b/api/connectivity_manager.py index f871bc0..3632d4d 100644 --- a/api/connectivity_manager.py +++ b/api/connectivity_manager.py @@ -42,8 +42,12 @@ import ipaddress import logging import os import re +import secrets +import threading +import time +from typing import Any, Dict, List, Optional, Tuple + import subprocess -from typing import Any, Dict, List, Optional from base_service_manager import BaseServiceManager @@ -127,6 +131,41 @@ class ConnectivityManager(BaseServiceManager): REDIRECT_PORTS = {"tor": TOR_TRANS_PORT, "sshuttle": SSHUTTLE_PORT, "proxy": REDSOCKS_PORT} + # ── Connectivity v2 — instance resource allocation ──────────────────── + # Connection instance types (the legacy "default" pseudo-exit is excluded — + # a peer/service routed via "default" simply has no connection). + CONNECTION_TYPES = ("wireguard_ext", "openvpn", "tor", "sshuttle", "proxy") + # Types whose egress is a real interface (kill-switch capable). They get an + # iface name and no redirect port. + IFACE_TYPES = ("wireguard_ext", "openvpn") + # Types implemented as a local transparent-proxy REDIRECT. They get a + # redirect port and no iface. + REDIRECT_TYPES = ("tor", "sshuttle", "proxy") + # Only a single Tor instance is supported (one Tor container per cell). + SINGLE_INSTANCE_TYPES = ("tor",) + + # fwmark block 0x1000–0x1FFF, stride 0x10. + MARK_BASE = 0x1000 + MARK_STRIDE = 0x10 + MARK_MAX = 0x1FFF + # routing tables 1000+. + TABLE_BASE = 1000 + # transparent-proxy redirect port pool for instances. + REDIRECT_PORT_BASE = 9100 + REDIRECT_PORT_MAX = 9199 + + IFACE_PREFIXES = {"wireguard_ext": "wgext_", "openvpn": "ovpn_"} + + CONNECTION_NAME_RE = re.compile(r'^[A-Za-z0-9][A-Za-z0-9 _.-]{0,63}$') + + DEFAULT_CONNECTION_NAMES = { + "wireguard_ext": "WireGuard External", + "openvpn": "OpenVPN", + "tor": "Tor", + "sshuttle": "SSH Tunnel", + "proxy": "Proxy", + } + # Store-service ids / container names backing each exit type — used to # report an exit as configured when it was installed via the Service Store # rather than through a legacy config upload. @@ -159,6 +198,17 @@ class ConnectivityManager(BaseServiceManager): self.peer_registry = peer_registry self.vault_manager = vault_manager + # Serializes connection CRUD + resource allocation across threads. + self._conn_lock = threading.RLock() + + # Wire the v1→v2 migration so it runs lazily on first get_connectivity(). + if self.config_manager is not None and hasattr( + self.config_manager, 'register_connectivity_migrator' + ): + self.config_manager.register_connectivity_migrator( + self._migrate_connectivity_v1_to_v2 + ) + # Connectivity configs live under the per-service data dir so that # ${PIC_DATA_DIR}/services//config bind mounts in store compose # templates can read them (Docker daemon resolves paths on the HOST, @@ -632,6 +682,624 @@ class ConnectivityManager(BaseServiceManager): except Exception as e: logger.warning(f"_persist_exit_config({exit_type}): {e}") + # ── Connectivity v2 — per-instance config validation ────────────────── + # + # These validators operate on a single connection instance's config dict, + # returning (clean_config, clean_secrets, error). They share the field + # rules used by the legacy single-slot configure_* methods so behaviour is + # identical; the legacy methods remain the v1 write path until phase 2. + + def _validate_connection_config( + self, conn_type: str, config: Dict[str, Any], + secrets_in: Optional[Dict[str, Any]], + ) -> Tuple[Dict[str, Any], Dict[str, str], Optional[str]]: + """Validate one instance's config + secrets for `conn_type`. + + Returns (clean_config, clean_secrets, error). On error the first two + values are empty. Secrets are returned separately so callers store them + in the vault and never in cell_config. + """ + if not isinstance(config, dict): + return {}, {}, 'config must be a JSON object' + secrets_in = secrets_in or {} + if not isinstance(secrets_in, dict): + return {}, {}, 'secrets must be a JSON object' + + if conn_type == 'sshuttle': + return self._validate_sshuttle_instance(config, secrets_in) + if conn_type == 'proxy': + return self._validate_proxy_instance(config, secrets_in) + if conn_type == 'wireguard_ext': + return self._validate_wg_instance(config, secrets_in) + if conn_type == 'openvpn': + return self._validate_ovpn_instance(config, secrets_in) + if conn_type == 'tor': + # Tor has no per-instance config or secret. + return {}, {}, None + return {}, {}, f'unsupported connection type {conn_type!r}' + + def _validate_sshuttle_instance( + self, cfg: Dict[str, Any], secrets_in: Dict[str, Any], + ) -> Tuple[Dict[str, Any], Dict[str, str], Optional[str]]: + for value in cfg.values(): + if isinstance(value, str) and _contains_strict_hostkey_disable(value): + return {}, {}, ('StrictHostKeyChecking=no is not allowed; a ' + 'pinned host key (known_hosts line) is required') + + host = _validate_host(cfg.get('host')) + if not host: + return {}, {}, 'invalid host: must be a hostname or IP' + port = _validate_port(cfg.get('port', 22)) + if port is None: + return {}, {}, 'invalid port: must be 1-65535' + user = cfg.get('user') + if not isinstance(user, str) or not _SSH_USER_RE.match(user): + return {}, {}, 'invalid user: must match ^[a-z_][a-z0-9_-]{0,31}$' + auth = cfg.get('auth', 'key') + if auth not in ('key', 'password'): + return {}, {}, "invalid auth: must be 'key' or 'password'" + + known_hosts = cfg.get('known_hosts') + err = self._validate_known_hosts_line(known_hosts) + if err: + return {}, {}, err + known_hosts = known_hosts.strip() + + clean_secrets: Dict[str, str] = {'known_hosts': known_hosts} + if auth == 'key': + private_key = secrets_in.get('private_key', cfg.get('private_key', '')) + if not isinstance(private_key, str) or 'PRIVATE KEY' not in private_key: + return {}, {}, ('private_key is required for key auth and must be ' + 'a PEM/OpenSSH private key') + clean_secrets['private_key'] = private_key + else: + password = secrets_in.get('password', cfg.get('password', '')) + if not isinstance(password, str) or not password or '\n' in password: + return {}, {}, 'password is required for password auth' + clean_secrets['password'] = password + + exclude_subnets = cfg.get('exclude_subnets') + if exclude_subnets is None: + exclude_subnets = self._default_exclude_subnets() + if not isinstance(exclude_subnets, list): + return {}, {}, 'exclude_subnets must be a list of CIDRs' + validated_excludes = [] + for net in exclude_subnets: + try: + validated_excludes.append(str(ipaddress.ip_network(str(net), strict=False))) + except ValueError: + return {}, {}, f'invalid exclude subnet: {net!r}' + + clean_config = { + 'host': host, 'port': port, 'user': user, 'auth': auth, + 'exclude_subnets': validated_excludes, + } + return clean_config, clean_secrets, None + + def _validate_proxy_instance( + self, cfg: Dict[str, Any], secrets_in: Dict[str, Any], + ) -> Tuple[Dict[str, Any], Dict[str, str], Optional[str]]: + scheme = cfg.get('scheme') + if scheme not in ('http', 'socks5'): + return {}, {}, "invalid scheme: must be 'http' or 'socks5'" + host = _validate_host(cfg.get('host')) + if not host: + return {}, {}, 'invalid host: must be a hostname or IP' + port = _validate_port(cfg.get('port')) + if port is None: + return {}, {}, 'invalid port: must be 1-65535' + user = cfg.get('user') or '' + password = secrets_in.get('password', cfg.get('password') or '') + if user and not (isinstance(user, str) and _PROXY_USER_RE.match(user)): + return {}, {}, 'invalid user: must match ^[A-Za-z0-9._-]{1,64}$' + if password and not (isinstance(password, str) + and _PROXY_PASSWORD_RE.match(password)): + return {}, {}, ('invalid password: 1-128 printable ASCII characters ' + 'excluding double quotes and backslashes') + if password and not user: + return {}, {}, 'password requires a user' + + clean_config = {'scheme': scheme, 'host': host, 'port': port, 'user': user} + clean_secrets: Dict[str, str] = {} + if password: + clean_secrets['password'] = password + return clean_config, clean_secrets, None + + def _validate_wg_instance( + self, cfg: Dict[str, Any], secrets_in: Dict[str, Any], + ) -> Tuple[Dict[str, Any], Dict[str, str], Optional[str]]: + conf_text = secrets_in.get('conf', cfg.get('conf', '')) + if not isinstance(conf_text, str) or not conf_text.strip(): + return {}, {}, 'conf is required: a WireGuard config' + try: + cleaned = self._validate_wg_conf(conf_text) + except ValueError as e: + return {}, {}, str(e) + return {}, {'conf': cleaned}, None + + def _validate_ovpn_instance( + self, cfg: Dict[str, Any], secrets_in: Dict[str, Any], + ) -> Tuple[Dict[str, Any], Dict[str, str], Optional[str]]: + conf_text = secrets_in.get('conf', cfg.get('conf', '')) + if not isinstance(conf_text, str) or not conf_text.strip(): + return {}, {}, 'conf is required: an OpenVPN profile' + try: + cleaned = self._validate_ovpn(conf_text) + except ValueError as e: + return {}, {}, str(e) + return {}, {'conf': cleaned}, None + + # ── Connectivity v2 — resource allocator ────────────────────────────── + + def _used_resources(self) -> Tuple[set, set, set, set]: + """Return (marks, tables, ifaces, ports) currently used by connections.""" + marks, tables, ifaces, ports = set(), set(), set(), set() + if self.config_manager is None: + return marks, tables, ifaces, ports + try: + conns = self.config_manager.list_connections() + except Exception as e: + logger.warning(f"_used_resources: list_connections failed: {e}") + conns = [] + for c in conns: + if isinstance(c.get('mark'), int): + marks.add(c['mark']) + if isinstance(c.get('table'), int): + tables.add(c['table']) + if c.get('iface'): + ifaces.add(c['iface']) + if isinstance(c.get('redirect_port'), int): + ports.add(c['redirect_port']) + return marks, tables, ifaces, ports + + def _allocate_resources( + self, conn_type: str, conn_id: str, + ) -> Tuple[int, int, Optional[str], Optional[int]]: + """Allocate (mark, table, iface, redirect_port) for a new connection. + + Lowest-free-overall within each pool (delete frees + cleans rules, so + reuse is safe). iface is set only for IFACE_TYPES, redirect_port only + for REDIRECT_TYPES. + """ + marks, tables, ifaces, ports = self._used_resources() + + mark = self.MARK_BASE + while mark in marks: + mark += self.MARK_STRIDE + if mark > self.MARK_MAX: + raise ValueError('no free fwmark available in 0x1000–0x1FFF') + + table = self.TABLE_BASE + while table in tables: + table += 1 + + iface: Optional[str] = None + if conn_type in self.IFACE_TYPES: + hexid = conn_id.split('_')[-1][:8] + iface = f"{self.IFACE_PREFIXES[conn_type]}{hexid}" + + redirect_port: Optional[int] = None + if conn_type in self.REDIRECT_TYPES: + port = self.REDIRECT_PORT_BASE + while port in ports and port <= self.REDIRECT_PORT_MAX: + port += 1 + if port > self.REDIRECT_PORT_MAX: + raise ValueError('no free redirect port available in 9100–9199') + redirect_port = port + + return mark, table, iface, redirect_port + + @staticmethod + def _new_conn_id() -> str: + return f"conn_{secrets.token_hex(4)}" + + # ── Connectivity v2 — connection CRUD ───────────────────────────────── + + def _compute_state(self, conn_type: str, config: Dict[str, Any], + secret_refs: List[str]) -> str: + """'configured' when all required fields/secrets present, else 'added'.""" + required = self._required_for_type(conn_type, config) + for field in required.get('config', ()): + if not config.get(field): + return 'added' + for ref_suffix in required.get('secrets', ()): + if not any(r.endswith(f'_{ref_suffix}') for r in secret_refs): + return 'added' + return 'configured' + + def _required_for_type(self, conn_type: str, + config: Dict[str, Any]) -> Dict[str, Tuple[str, ...]]: + """Required non-secret fields and secret suffixes per type.""" + if conn_type == 'sshuttle': + auth_secret = 'private_key' if config.get('auth') != 'password' else 'password' + return {'config': ('host', 'user', 'auth'), + 'secrets': ('known_hosts', auth_secret)} + if conn_type == 'proxy': + return {'config': ('scheme', 'host', 'port'), 'secrets': ()} + if conn_type in ('wireguard_ext', 'openvpn'): + return {'config': (), 'secrets': ('conf',)} + if conn_type == 'tor': + return {'config': (), 'secrets': ()} + return {'config': (), 'secrets': ()} + + def create_connection(self, conn_type: str, name: str, + config: Optional[Dict[str, Any]] = None, + secrets: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """Create a named connection instance of `conn_type`. + + Validates the type, name (non-empty + unique per cell), and the + per-type field rules; enforces a single Tor; allocates routing + resources; stores secrets in the vault under conn__ and + records only the refs. Returns the created record (no secret values). + """ + if conn_type not in self.CONNECTION_TYPES: + return {'ok': False, 'error': + f'invalid type {conn_type!r}; must be one of {self.CONNECTION_TYPES}'} + if not isinstance(name, str) or not self.CONNECTION_NAME_RE.match(name.strip()): + return {'ok': False, 'error': 'invalid name: 1-64 chars, ' + 'letters/digits/space/._- and must start alphanumeric'} + name = name.strip() + config = config or {} + + clean_config, clean_secrets, err = self._validate_connection_config( + conn_type, config, secrets) + if err: + return {'ok': False, 'error': err} + + with self._conn_lock: + existing = [] + if self.config_manager is not None: + try: + existing = self.config_manager.list_connections() + except Exception as e: + logger.error(f"create_connection: list failed: {e}") + return {'ok': False, 'error': 'failed to read connections'} + + if conn_type in self.SINGLE_INSTANCE_TYPES: + if any(c.get('type') == conn_type for c in existing): + return {'ok': False, 'error': + f'only a single {conn_type} connection is supported'} + + if any(c.get('name', '').strip().lower() == name.lower() + for c in existing): + return {'ok': False, 'error': f'a connection named {name!r} already exists'} + + conn_id = self._new_conn_id() + try: + mark, table, iface, redirect_port = self._allocate_resources( + conn_type, conn_id) + except ValueError as e: + return {'ok': False, 'error': str(e)} + + secret_refs: List[str] = [] + stored_refs: List[str] = [] + if clean_secrets: + if self.vault_manager is None: + return {'ok': False, 'error': 'vault unavailable; cannot store secrets'} + for field, value in clean_secrets.items(): + ref = f"{conn_id}_{field}" + try: + self.vault_manager.store_secret(ref, value) + except Exception as e: + logger.error(f"create_connection: vault store {ref}: {e}") + for done in stored_refs: + try: + self.vault_manager.delete_secret(done) + except Exception: + pass + return {'ok': False, 'error': 'failed to store secret in vault'} + stored_refs.append(ref) + secret_refs.append(ref) + + now = self._now_iso() + state = self._compute_state(conn_type, clean_config, secret_refs) + record = { + 'id': conn_id, + 'type': conn_type, + 'name': name, + 'enabled': True, + 'mark': mark, + 'table': table, + 'iface': iface, + 'redirect_port': redirect_port, + 'config': clean_config, + 'secret_refs': secret_refs, + 'cell_name': None, + 'status': { + 'state': state, + 'health': 'unknown', + 'last_check': None, + 'detail': None, + }, + 'created_at': now, + 'updated_at': now, + } + try: + self.config_manager.add_connection(record) + except Exception as e: + logger.error(f"create_connection: persist failed: {e}") + for ref in stored_refs: + try: + self.vault_manager.delete_secret(ref) + except Exception: + pass + return {'ok': False, 'error': 'failed to persist connection'} + + logger.info(f"connectivity: created connection {conn_id} " + f"({conn_type}/{name}) mark={hex(mark)} table={table}") + return {'ok': True, 'connection': self._public_record(record)} + + def update_connection(self, conn_id: str, name: Optional[str] = None, + config: Optional[Dict[str, Any]] = None, + secrets: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """Update an existing connection's name, config and/or secrets.""" + with self._conn_lock: + if self.config_manager is None: + return {'ok': False, 'error': 'config unavailable'} + record = self.config_manager.get_connection(conn_id) + if record is None: + return {'ok': False, 'error': f'connection {conn_id!r} not found'} + conn_type = record.get('type') + + fields: Dict[str, Any] = {} + + if name is not None: + if not isinstance(name, str) or not self.CONNECTION_NAME_RE.match(name.strip()): + return {'ok': False, 'error': 'invalid name'} + name = name.strip() + for c in self.config_manager.list_connections(): + if c.get('id') != conn_id and \ + c.get('name', '').strip().lower() == name.lower(): + return {'ok': False, 'error': + f'a connection named {name!r} already exists'} + fields['name'] = name + + secret_refs = list(record.get('secret_refs', [])) + new_config = dict(record.get('config', {})) + if config is not None or secrets is not None: + merged = dict(record.get('config', {})) + if isinstance(config, dict): + merged.update(config) + clean_config, clean_secrets, err = self._validate_connection_config( + conn_type, merged, secrets) + if err: + return {'ok': False, 'error': err} + new_config = clean_config + fields['config'] = clean_config + if clean_secrets: + if self.vault_manager is None: + return {'ok': False, 'error': 'vault unavailable'} + for field, value in clean_secrets.items(): + ref = f"{conn_id}_{field}" + try: + self.vault_manager.store_secret(ref, value) + except Exception as e: + logger.error(f"update_connection: vault store {ref}: {e}") + return {'ok': False, 'error': 'failed to store secret'} + if ref not in secret_refs: + secret_refs.append(ref) + fields['secret_refs'] = secret_refs + + if fields: + fields['updated_at'] = self._now_iso() + fields['status'] = { + **record.get('status', {}), + 'state': self._compute_state(conn_type, new_config, secret_refs), + } + self.config_manager.update_connection(conn_id, fields) + + updated = self.config_manager.get_connection(conn_id) + return {'ok': True, 'connection': self._public_record(updated)} + + def delete_connection(self, conn_id: str) -> Dict[str, Any]: + """Delete a connection: free resources + vault secrets. Blocked if referenced.""" + with self._conn_lock: + if self.config_manager is None: + return {'ok': False, 'error': 'config unavailable'} + record = self.config_manager.get_connection(conn_id) + if record is None: + return {'ok': False, 'error': f'connection {conn_id!r} not found'} + + ref = self._connection_reference(conn_id) + if ref: + return {'ok': False, 'error': + f'connection is in use by {ref}; detach it first'} + + for secret_ref in record.get('secret_refs', []): + if self.vault_manager is not None: + try: + self.vault_manager.delete_secret(secret_ref) + except Exception as e: + logger.warning(f"delete_connection: vault delete {secret_ref}: {e}") + + self.config_manager.delete_connection(conn_id) + logger.info(f"connectivity: deleted connection {conn_id}") + return {'ok': True} + + def _connection_reference(self, conn_id: str) -> Optional[str]: + """Return a human description if a peer/egress references this connection. + + Phase 2 wires peers/egress to connection ids; until then nothing + references a connection, so this returns None. Kept as the single + choke-point so phase 2 only has to fill in the lookups here. + """ + if self.peer_registry is not None: + try: + for peer in self.peer_registry.list_peers(): + if peer.get('connection_id') == conn_id: + return f"peer {peer.get('peer')!r}" + except Exception as e: + logger.debug(f"_connection_reference (peers): {e}") + return None + + def list_connections(self) -> List[Dict[str, Any]]: + """Return all connection records (public form, computed status.state).""" + if self.config_manager is None: + return [] + try: + conns = self.config_manager.list_connections() + except Exception as e: + logger.warning(f"list_connections: {e}") + return [] + out: List[Dict[str, Any]] = [] + for record in conns: + state = self._compute_state( + record.get('type'), record.get('config', {}), + record.get('secret_refs', [])) + status = dict(record.get('status', {})) + status['state'] = state + rec = dict(record) + rec['status'] = status + out.append(self._public_record(rec)) + return out + + def get_connection(self, conn_id: str) -> Optional[Dict[str, Any]]: + """Return one connection record (public form), or None.""" + if self.config_manager is None: + return None + record = self.config_manager.get_connection(conn_id) + if record is None: + return None + status = dict(record.get('status', {})) + status['state'] = self._compute_state( + record.get('type'), record.get('config', {}), + record.get('secret_refs', [])) + rec = dict(record) + rec['status'] = status + return self._public_record(rec) + + @staticmethod + def _public_record(record: Dict[str, Any]) -> Dict[str, Any]: + """Strip any secret values; only secret_refs are exposed.""" + rec = dict(record) + rec.pop('private_key', None) + rec.pop('password', None) + rec.pop('conf', None) + config = dict(rec.get('config', {})) + for k in ('private_key', 'password', 'conf'): + config.pop(k, None) + rec['config'] = config + return rec + + @staticmethod + def _now_iso() -> str: + return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + + # ── Connectivity v2 — v1→v2 migration ───────────────────────────────── + + def _migrate_connectivity_v1_to_v2( + self, legacy: Dict[str, Any], + ) -> List[Dict[str, Any]]: + """Build v2 connection records from the legacy connectivity section. + + Called once by config_manager.get_connectivity() when version<2. For + each legacy exit type detected as configured (via _exit_status), creates + exactly one connection instance with allocated resources, copying the + legacy `config` and repointing any existing per-type vault secret to the + new conn__ name. Returns [] when nothing was configured. + + Existing secrets are RE-STORED under the new ref name and the old name + deleted, so the only reference that survives is the new one — no secret + is ever lost (re-store happens before old delete). + """ + records: List[Dict[str, Any]] = [] + exits = legacy.get('exits') if isinstance(legacy, dict) else {} + exits = exits if isinstance(exits, dict) else {} + + # Legacy per-type vault secret names → instance secret field. + legacy_secret_names = { + 'sshuttle': [ + ('connectivity_sshuttle_key', 'private_key'), + ('connectivity_sshuttle_password', 'password'), + ], + } + + used_marks, used_tables, used_ifaces, used_ports = set(), set(), set(), set() + + for conn_type in self.CONNECTION_TYPES: + status = self._exit_status(conn_type) + if not status.get('configured'): + continue + + conn_id = self._new_conn_id() + + mark = self.MARK_BASE + while mark in used_marks: + mark += self.MARK_STRIDE + used_marks.add(mark) + + table = self.TABLE_BASE + while table in used_tables: + table += 1 + used_tables.add(table) + + iface = None + if conn_type in self.IFACE_TYPES: + iface = f"{self.IFACE_PREFIXES[conn_type]}{conn_id.split('_')[-1][:8]}" + used_ifaces.add(iface) + + redirect_port = None + if conn_type in self.REDIRECT_TYPES: + port = self.REDIRECT_PORT_BASE + while port in used_ports: + port += 1 + redirect_port = port + used_ports.add(port) + + legacy_config = exits.get(conn_type) + config = dict(legacy_config) if isinstance(legacy_config, dict) else {} + # Never let a stray secret hide in the copied non-secret config. + for k in ('private_key', 'password', 'conf'): + config.pop(k, None) + + secret_refs: List[str] = [] + for old_name, field in legacy_secret_names.get(conn_type, []): + if self.vault_manager is None: + continue + try: + value = self.vault_manager.get_secret(old_name) + except Exception as e: + logger.warning(f"migration: read {old_name} failed: {e}") + value = None + if not value: + continue + new_ref = f"{conn_id}_{field}" + try: + self.vault_manager.store_secret(new_ref, value) + secret_refs.append(new_ref) + self.vault_manager.delete_secret(old_name) + except Exception as e: + logger.error(f"migration: repoint {old_name}→{new_ref} failed: {e}") + + now = self._now_iso() + state = self._compute_state(conn_type, config, secret_refs) + records.append({ + 'id': conn_id, + 'type': conn_type, + 'name': self.DEFAULT_CONNECTION_NAMES.get(conn_type, conn_type), + 'enabled': True, + 'mark': mark, + 'table': table, + 'iface': iface, + 'redirect_port': redirect_port, + 'config': config, + 'secret_refs': secret_refs, + 'cell_name': None, + 'status': { + 'state': state, + 'health': 'unknown', + 'last_check': None, + 'detail': None, + }, + 'created_at': now, + 'updated_at': now, + }) + logger.info(f"connectivity: migrated legacy {conn_type} exit → {conn_id}") + + return records + # ── Routing application ─────────────────────────────────────────────── def apply_routes(self) -> Dict[str, Any]: diff --git a/tests/test_connectivity_connections.py b/tests/test_connectivity_connections.py new file mode 100644 index 0000000..4647d6c --- /dev/null +++ b/tests/test_connectivity_connections.py @@ -0,0 +1,473 @@ +""" +Tests for the connectivity v2 data model — named connection instances. + +Covers the resource allocator (no mark/table/port collisions, correct iface +naming, iface vs redirect-port split), the v1→v2 migration (legacy exits → +one connection each, secret repointing, idempotency, empty case), connection +CRUD (validation, single-tor, duplicate name, secret refs not values, delete +frees resources + vault secret, delete blocked when referenced), and computed +status.state (added vs configured). + +ConfigManager runs against a tmp dir; the vault is an in-memory fake. No real +Docker/iptables/subprocess is invoked (apply_routes is never called here). +""" + +import os +import sys +import shutil +import tempfile +import unittest +from unittest.mock import MagicMock, patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api')) + +from config_manager import ConfigManager +from connectivity_manager import ConnectivityManager + +VALID_KEY = ( + '-----BEGIN OPENSSH PRIVATE KEY-----\n' + 'b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAAB\n' + '-----END OPENSSH PRIVATE KEY-----\n' +) +VALID_KNOWN_HOSTS = ( + 'ssh.example.com,203.0.113.5 ssh-ed25519 ' + 'AAAAC3NzaC1lZDI1NTE5AAAAIB5d0o0Yw1xP1Yw1xP1Yw1xP1Yw1xP1Yw1xP1Yw1xP1Y' +) + + +class FakeVault: + """In-memory stand-in for VaultManager's secret store.""" + + def __init__(self): + self.store = {} + + def store_secret(self, name, value): + self.store[name] = value + return True + + def get_secret(self, name): + return self.store.get(name) + + def delete_secret(self, name): + if name in self.store: + del self.store[name] + return True + return False + + def list_secrets(self): + return list(self.store.keys()) + + +def _sshuttle_cfg(**overrides): + cfg = { + 'host': 'ssh.example.com', + 'port': 22, + 'user': 'tunnel', + 'auth': 'key', + 'known_hosts': VALID_KNOWN_HOSTS, + 'exclude_subnets': ['10.0.0.0/8'], + } + cfg.update(overrides) + return cfg + + +def _proxy_cfg(**overrides): + cfg = {'scheme': 'http', 'host': 'proxy.example.com', 'port': 3128} + cfg.update(overrides) + return cfg + + +class _Base(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.cfg_file = os.path.join(self.tmp, 'cell_config.json') + self.data_dir = os.path.join(self.tmp, 'data') + os.makedirs(self.data_dir, exist_ok=True) + self.cm = ConfigManager(self.cfg_file, self.data_dir) + self.vault = FakeVault() + self.peer_registry = MagicMock() + self.peer_registry.list_peers.return_value = [] + with patch.object(ConnectivityManager, '_subscribe_to_events', + lambda self: None): + self.mgr = ConnectivityManager( + config_manager=self.cm, + peer_registry=self.peer_registry, + vault_manager=self.vault, + data_dir=self.data_dir, + config_dir=self.tmp, + ) + # Start CRUD tests from a clean migrated v2 (no legacy exits). Migration + # is exercised separately in TestMigration; here we want an empty slate + # so the always-configured Tor default doesn't get auto-migrated in. + self.cm.configs['connectivity'] = {'version': 2, 'connections': []} + self.cm._save_all_configs() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + +# --------------------------------------------------------------------------- +# Resource allocator +# --------------------------------------------------------------------------- + +class TestAllocator(_Base): + + def test_iface_types_get_iface_not_port(self): + for ctype in ('wireguard_ext', 'openvpn'): + cid = 'conn_deadbeef' + mark, table, iface, port = self.mgr._allocate_resources(ctype, cid) + self.assertIsNotNone(iface) + self.assertIsNone(port) + self.assertTrue(iface.startswith( + self.mgr.IFACE_PREFIXES[ctype])) + self.assertLessEqual(len(iface), 15) + + def test_redirect_types_get_port_not_iface(self): + for ctype in ('tor', 'sshuttle', 'proxy'): + cid = 'conn_abcd1234' + mark, table, iface, port = self.mgr._allocate_resources(ctype, cid) + self.assertIsNone(iface) + self.assertIsNotNone(port) + self.assertTrue( + self.mgr.REDIRECT_PORT_BASE <= port <= self.mgr.REDIRECT_PORT_MAX) + + def test_iface_uses_8hex_id(self): + _m, _t, iface, _p = self.mgr._allocate_resources( + 'wireguard_ext', 'conn_0123456789') + self.assertEqual(iface, 'wgext_01234567') + + def test_no_collisions_across_many_connections(self): + created = [] + # Mix of iface + redirect types; tor is single-instance so exclude it. + plan = (['wireguard_ext', 'openvpn', 'sshuttle', 'proxy'] * 4) + for i, ctype in enumerate(plan): + if ctype == 'sshuttle': + res = self.mgr.create_connection( + ctype, f'ssh-{i}', _sshuttle_cfg(), + secrets={'private_key': VALID_KEY}) + elif ctype == 'proxy': + res = self.mgr.create_connection(ctype, f'px-{i}', _proxy_cfg()) + elif ctype == 'wireguard_ext': + res = self.mgr.create_connection( + ctype, f'wg-{i}', {}, + secrets={'conf': '[Interface]\nPrivateKey = x\n'}) + else: + res = self.mgr.create_connection( + ctype, f'ov-{i}', {}, + secrets={'conf': 'client\nremote vpn.example.com 1194\n'}) + self.assertTrue(res['ok'], res) + created.append(res['connection']) + + marks = [c['mark'] for c in created] + tables = [c['table'] for c in created] + ifaces = [c['iface'] for c in created if c['iface']] + ports = [c['redirect_port'] for c in created if c['redirect_port']] + self.assertEqual(len(marks), len(set(marks))) + self.assertEqual(len(tables), len(set(tables))) + self.assertEqual(len(ifaces), len(set(ifaces))) + self.assertEqual(len(ports), len(set(ports))) + for m in marks: + self.assertEqual(m % self.mgr.MARK_STRIDE, 0) + self.assertTrue(self.mgr.MARK_BASE <= m <= self.mgr.MARK_MAX) + for t in tables: + self.assertGreaterEqual(t, self.mgr.TABLE_BASE) + + +# --------------------------------------------------------------------------- +# Connection CRUD +# --------------------------------------------------------------------------- + +class TestCreateConnection(_Base): + + def test_create_proxy_allocates_and_persists(self): + res = self.mgr.create_connection('proxy', 'Work proxy', _proxy_cfg()) + self.assertTrue(res['ok'], res) + rec = res['connection'] + self.assertTrue(rec['id'].startswith('conn_')) + self.assertEqual(rec['type'], 'proxy') + self.assertIsNotNone(rec['redirect_port']) + self.assertIsNone(rec['iface']) + # Persisted to config_manager. + self.assertEqual(len(self.cm.list_connections()), 1) + + def test_secret_stored_as_ref_not_value(self): + res = self.mgr.create_connection( + 'sshuttle', 'tun', _sshuttle_cfg(), + secrets={'private_key': VALID_KEY}) + self.assertTrue(res['ok'], res) + rec = res['connection'] + cid = rec['id'] + # Refs recorded, values absent from the record + config. + self.assertIn(f'{cid}_private_key', rec['secret_refs']) + self.assertIn(f'{cid}_known_hosts', rec['secret_refs']) + self.assertNotIn('private_key', rec['config']) + stored = self.cm.get_connection(cid) + self.assertNotIn('private_key', stored['config']) + # Secret value lives only in the vault. + self.assertEqual(self.vault.get_secret(f'{cid}_private_key'), VALID_KEY) + + def test_invalid_type_rejected(self): + res = self.mgr.create_connection('bogus', 'x', {}) + self.assertFalse(res['ok']) + + def test_invalid_name_rejected(self): + res = self.mgr.create_connection('proxy', '', _proxy_cfg()) + self.assertFalse(res['ok']) + + def test_invalid_config_rejected(self): + res = self.mgr.create_connection('proxy', 'p', _proxy_cfg(scheme='ftp')) + self.assertFalse(res['ok']) + self.assertEqual(len(self.cm.list_connections()), 0) + + def test_single_tor_enforced(self): + first = self.mgr.create_connection('tor', 'Tor 1', {}) + self.assertTrue(first['ok'], first) + second = self.mgr.create_connection('tor', 'Tor 2', {}) + self.assertFalse(second['ok']) + self.assertIn('single', second['error'].lower()) + + def test_duplicate_name_rejected(self): + a = self.mgr.create_connection('proxy', 'Dup', _proxy_cfg()) + self.assertTrue(a['ok']) + b = self.mgr.create_connection('proxy', 'dup', _proxy_cfg(port=8080)) + self.assertFalse(b['ok']) + self.assertIn('already exists', b['error']) + + def test_state_configured_when_complete(self): + res = self.mgr.create_connection('proxy', 'full', _proxy_cfg()) + self.assertEqual(res['connection']['status']['state'], 'configured') + self.assertEqual(res['connection']['status']['health'], 'unknown') + + def test_state_added_when_missing_secret(self): + # wireguard_ext with no conf secret → required secret missing → added. + # (validator requires conf, so create one that omits it must fail; to + # exercise the 'added' path we build a record directly via migration of + # an installed-but-unconfigured exit — see TestStatusState.) + res = self.mgr.create_connection( + 'wireguard_ext', 'wgfull', {}, + secrets={'conf': '[Interface]\nPrivateKey = x\n'}) + self.assertTrue(res['ok'], res) + self.assertEqual(res['connection']['status']['state'], 'configured') + + +class TestDeleteConnection(_Base): + + def test_delete_frees_resources_and_secret(self): + res = self.mgr.create_connection( + 'sshuttle', 'tun', _sshuttle_cfg(), + secrets={'private_key': VALID_KEY}) + cid = res['connection']['id'] + self.assertIn(f'{cid}_private_key', self.vault.list_secrets()) + + out = self.mgr.delete_connection(cid) + self.assertTrue(out['ok'], out) + self.assertEqual(len(self.cm.list_connections()), 0) + self.assertNotIn(f'{cid}_private_key', self.vault.list_secrets()) + self.assertNotIn(f'{cid}_known_hosts', self.vault.list_secrets()) + + def test_freed_resources_are_reusable(self): + a = self.mgr.create_connection('proxy', 'A', _proxy_cfg()) + mark_a = a['connection']['mark'] + self.mgr.delete_connection(a['connection']['id']) + b = self.mgr.create_connection('proxy', 'B', _proxy_cfg()) + self.assertEqual(b['connection']['mark'], mark_a) + + def test_delete_blocked_when_referenced(self): + res = self.mgr.create_connection('proxy', 'ref', _proxy_cfg()) + cid = res['connection']['id'] + self.peer_registry.list_peers.return_value = [ + {'peer': 'alice', 'connection_id': cid}] + out = self.mgr.delete_connection(cid) + self.assertFalse(out['ok']) + self.assertIn('in use', out['error']) + # Still present, secret intact. + self.assertEqual(len(self.cm.list_connections()), 1) + + def test_delete_unknown_returns_error(self): + out = self.mgr.delete_connection('conn_nope') + self.assertFalse(out['ok']) + + +class TestUpdateConnection(_Base): + + def test_update_name(self): + res = self.mgr.create_connection('proxy', 'old', _proxy_cfg()) + cid = res['connection']['id'] + out = self.mgr.update_connection(cid, name='new') + self.assertTrue(out['ok'], out) + self.assertEqual(self.cm.get_connection(cid)['name'], 'new') + + def test_update_duplicate_name_rejected(self): + a = self.mgr.create_connection('proxy', 'A', _proxy_cfg()) + b = self.mgr.create_connection('proxy', 'B', _proxy_cfg(port=8080)) + out = self.mgr.update_connection(b['connection']['id'], name='A') + self.assertFalse(out['ok']) + + def test_update_secret_repoints_vault(self): + res = self.mgr.create_connection( + 'sshuttle', 'tun', _sshuttle_cfg(), + secrets={'private_key': VALID_KEY}) + cid = res['connection']['id'] + new_key = VALID_KEY.replace('AAAAB', 'BBBBB') + out = self.mgr.update_connection( + cid, config=_sshuttle_cfg(), secrets={'private_key': new_key}) + self.assertTrue(out['ok'], out) + self.assertEqual(self.vault.get_secret(f'{cid}_private_key'), new_key) + + +# --------------------------------------------------------------------------- +# list/get enrichment +# --------------------------------------------------------------------------- + +class TestListGet(_Base): + + def test_list_and_get_strip_secret_values(self): + res = self.mgr.create_connection( + 'sshuttle', 'tun', _sshuttle_cfg(), + secrets={'private_key': VALID_KEY}) + cid = res['connection']['id'] + listed = self.mgr.list_connections() + self.assertEqual(len(listed), 1) + self.assertNotIn('private_key', listed[0]['config']) + self.assertEqual(listed[0]['status']['health'], 'unknown') + got = self.mgr.get_connection(cid) + self.assertEqual(got['id'], cid) + self.assertNotIn('private_key', got['config']) + + def test_get_unknown_returns_none(self): + self.assertIsNone(self.mgr.get_connection('conn_missing')) + + +# --------------------------------------------------------------------------- +# Computed status.state +# --------------------------------------------------------------------------- + +class TestStatusState(_Base): + + def test_compute_state_proxy(self): + self.assertEqual( + self.mgr._compute_state('proxy', _proxy_cfg(), []), 'configured') + self.assertEqual( + self.mgr._compute_state('proxy', {'scheme': 'http'}, []), 'added') + + def test_compute_state_sshuttle_password_auth(self): + cfg = _sshuttle_cfg(auth='password') + cid = 'conn_x' + self.assertEqual( + self.mgr._compute_state(cfg['auth'] and 'sshuttle', cfg, + [f'{cid}_known_hosts', f'{cid}_password']), + 'configured') + self.assertEqual( + self.mgr._compute_state('sshuttle', cfg, [f'{cid}_known_hosts']), + 'added') + + def test_compute_state_tor_always_configured(self): + self.assertEqual(self.mgr._compute_state('tor', {}, []), 'configured') + + +# --------------------------------------------------------------------------- +# v1 → v2 migration +# --------------------------------------------------------------------------- + +class TestMigration(_Base): + + def test_empty_legacy_becomes_version2_empty(self): + # No exits configured: _exit_status reports nothing configured. + self.cm.configs['connectivity'] = {'exits': {}, 'peer_exit_map': {}} + with patch.object(self.mgr, '_exit_status', + return_value={'configured': False}): + v2 = self.cm.get_connectivity() + self.assertEqual(v2['version'], 2) + self.assertEqual(v2['connections'], []) + + def test_each_configured_exit_becomes_one_connection(self): + configured = {'sshuttle', 'proxy', 'tor'} + + def fake_status(exit_type): + return {'configured': exit_type in configured} + + # Seed legacy config + secrets. + self.cm.configs['connectivity'] = { + 'exits': { + 'sshuttle': {'host': 'h', 'port': 22, 'user': 'u', + 'auth': 'key', 'exclude_subnets': []}, + 'proxy': {'scheme': 'http', 'host': 'p', 'port': 3128, 'user': ''}, + }, + 'peer_exit_map': {}, + } + self.vault.store_secret('connectivity_sshuttle_key', VALID_KEY) + + with patch.object(self.mgr, '_exit_status', side_effect=fake_status): + v2 = self.cm.get_connectivity() + + self.assertEqual(v2['version'], 2) + types = sorted(c['type'] for c in v2['connections']) + self.assertEqual(types, ['proxy', 'sshuttle', 'tor']) + # Exactly one per type. + self.assertEqual(len(v2['connections']), 3) + + def test_secret_repointed_not_lost(self): + self.cm.configs['connectivity'] = { + 'exits': {'sshuttle': {'host': 'h', 'port': 22, 'user': 'u', + 'auth': 'key', 'exclude_subnets': []}}, + 'peer_exit_map': {}, + } + self.vault.store_secret('connectivity_sshuttle_key', VALID_KEY) + + def fake_status(exit_type): + return {'configured': exit_type == 'sshuttle'} + + with patch.object(self.mgr, '_exit_status', side_effect=fake_status): + v2 = self.cm.get_connectivity() + + conn = next(c for c in v2['connections'] if c['type'] == 'sshuttle') + cid = conn['id'] + new_ref = f'{cid}_private_key' + # Old name gone, new name holds the value — nothing lost. + self.assertIsNone(self.vault.get_secret('connectivity_sshuttle_key')) + self.assertEqual(self.vault.get_secret(new_ref), VALID_KEY) + self.assertIn(new_ref, conn['secret_refs']) + + def test_migration_idempotent(self): + def fake_status(exit_type): + return {'configured': exit_type == 'proxy'} + + self.cm.configs['connectivity'] = { + 'exits': {'proxy': {'scheme': 'http', 'host': 'p', 'port': 3128}}, + 'peer_exit_map': {}, + } + with patch.object(self.mgr, '_exit_status', side_effect=fake_status): + first = self.cm.get_connectivity() + ids_first = [c['id'] for c in first['connections']] + # Second access must not re-run migration / duplicate. + second = self.cm.get_connectivity() + ids_second = [c['id'] for c in second['connections']] + self.assertEqual(ids_first, ids_second) + self.assertEqual(len(second['connections']), 1) + + def test_migration_allocates_distinct_resources(self): + configured = {'wireguard_ext', 'openvpn', 'sshuttle', 'proxy', 'tor'} + self.cm.configs['connectivity'] = {'exits': {}, 'peer_exit_map': {}} + + def fake_status(exit_type): + return {'configured': exit_type in configured} + + with patch.object(self.mgr, '_exit_status', side_effect=fake_status): + v2 = self.cm.get_connectivity() + + conns = v2['connections'] + marks = [c['mark'] for c in conns] + tables = [c['table'] for c in conns] + self.assertEqual(len(marks), len(set(marks))) + self.assertEqual(len(tables), len(set(tables))) + for c in conns: + if c['type'] in ('wireguard_ext', 'openvpn'): + self.assertIsNotNone(c['iface']) + self.assertIsNone(c['redirect_port']) + else: + self.assertIsNone(c['iface']) + self.assertIsNotNone(c['redirect_port']) + + +if __name__ == '__main__': + unittest.main()