diff --git a/api/app.py b/api/app.py index 32e4554..1aa8908 100644 --- a/api/app.py +++ b/api/app.py @@ -409,6 +409,10 @@ ROUTE_ACTION_MAP = { ('POST', 'connectivity_configure_sshuttle'): ('connection.exit_sshuttle', 'connection', None), ('POST', 'connectivity_configure_proxy'): ('connection.exit_proxy', 'connection', None), ('PUT', 'connectivity_set_peer_exit'): ('connection.peer_exit_set', 'peer', 'peer_name'), + ('POST', 'connectivity_create_connection'): ('connection.create', 'connection', None), + ('PUT', 'connectivity_update_connection'): ('connection.update', 'connection', 'conn_id'), + ('DELETE', 'connectivity_delete_connection'): ('connection.delete', 'connection', 'conn_id'), + ('PUT', 'connectivity_set_peer_failopen'): ('peer.failopen', 'peer', 'peer_name'), # egress ('PUT', 'egress_set_service_exit'): ('egress.service_exit_set', 'service', 'service_id'), # cells @@ -867,6 +871,7 @@ def perform_health_check(): def health_monitor_loop(): _cert_check_cycle = 0 + _conn_health_cycle = 0 while health_monitor_running: with app.app_context(): health_result = perform_health_check() @@ -898,6 +903,15 @@ def health_monitor_loop(): caddy_manager.refresh_cert_status() except Exception as _cert_err: logger.warning("Cert status refresh failed (non-fatal): %s", _cert_err) + # Refresh connection health every 2 cycles (\u2248 every 2 min) so the + # connections list and per-peer fallback decisions stay current. + _conn_health_cycle += 1 + if _conn_health_cycle >= 2: + _conn_health_cycle = 0 + try: + connectivity_manager.refresh_health() + except Exception as _ch_err: + logger.warning("Connection health refresh failed (non-fatal): %s", _ch_err) time.sleep(60) # Check every 60 seconds # Start health monitor thread @@ -1172,6 +1186,112 @@ def connectivity_get_peer_exits(): return jsonify({'error': str(e)}), 500 +# Connectivity v2 — generic connection CRUD (going-forward API; admin-only via +# enforce_auth which restricts all non-peer /api/* routes to the admin role). + +@app.route('/api/connectivity/connections', methods=['GET']) +def connectivity_list_connections(): + """List all connection instances (with status; never any secret value).""" + try: + return jsonify({'connections': connectivity_manager.list_connections()}) + except Exception as e: + logger.error(f"connectivity_list_connections: {e}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/connectivity/connections', methods=['POST']) +def connectivity_create_connection(): + """Create a connection instance. Secrets are stored in the vault, never echoed.""" + try: + data = request.get_json(silent=True) or {} + conn_type = data.get('type') + name = data.get('name') + config = data.get('config') or {} + conn_secrets = data.get('secrets') or {} + if not isinstance(conn_type, str) or not conn_type: + return jsonify({'ok': False, 'error': 'type is required'}), 400 + if not isinstance(name, str) or not name.strip(): + return jsonify({'ok': False, 'error': 'name is required'}), 400 + result = connectivity_manager.create_connection( + conn_type, name, config=config, secrets=conn_secrets) + if result.get('ok'): + return jsonify(result), 201 + return jsonify(result), 400 + except Exception as e: + logger.error(f"connectivity_create_connection: {e}") + return jsonify({'error': 'internal error'}), 500 + + +@app.route('/api/connectivity/connections/', methods=['PUT']) +def connectivity_update_connection(conn_id: str): + """Update a connection's name, config and/or secrets. Secrets never echoed.""" + try: + data = request.get_json(silent=True) or {} + result = connectivity_manager.update_connection( + conn_id, + name=data.get('name'), + config=data.get('config'), + secrets=data.get('secrets'), + ) + if result.get('ok'): + return jsonify(result) + status = 404 if 'not found' in result.get('error', '') else 400 + return jsonify(result), status + except Exception as e: + logger.error(f"connectivity_update_connection({conn_id}): {e}") + return jsonify({'error': 'internal error'}), 500 + + +@app.route('/api/connectivity/connections/', methods=['DELETE']) +def connectivity_delete_connection(conn_id: str): + """Delete a connection. Blocked with 409 when a peer/egress references it.""" + try: + result = connectivity_manager.delete_connection(conn_id) + if result.get('ok'): + return jsonify(result) + error = result.get('error', '') + if 'not found' in error: + return jsonify(result), 404 + if 'in use by' in error: + return jsonify(result), 409 + return jsonify(result), 400 + except Exception as e: + logger.error(f"connectivity_delete_connection({conn_id}): {e}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/connectivity/connections//health', methods=['GET']) +def connectivity_connection_health(conn_id: str): + """On-demand probe of one connection's health (admin).""" + try: + conn = connectivity_manager.get_connection(conn_id) + if conn is None: + return jsonify({'error': f'connection {conn_id!r} not found'}), 404 + health, detail = connectivity_manager.probe_health(conn) + return jsonify({'id': conn_id, 'health': health, 'detail': detail}) + except Exception as e: + logger.error(f"connectivity_connection_health({conn_id}): {e}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/connectivity/peers//failopen', methods=['PUT']) +def connectivity_set_peer_failopen(peer_name: str): + """Set or clear a peer's fail-open override. Body: {"failopen": bool|null}.""" + try: + data = request.get_json(silent=True) or {} + failopen = data.get('failopen') + if failopen is not None and not isinstance(failopen, bool): + return jsonify({'ok': False, 'error': 'failopen must be a boolean or null'}), 400 + result = connectivity_manager.set_peer_failopen(peer_name, failopen) + if result.get('ok'): + return jsonify(result) + status = 404 if 'not found' in result.get('error', '') else 400 + return jsonify(result), status + except Exception as e: + logger.error(f"connectivity_set_peer_failopen({peer_name}): {e}") + return jsonify({'error': str(e)}), 500 + + @app.route('/api/caddy/cert-status', methods=['GET']) def caddy_cert_status(): """Return TLS certificate status (expiry, days remaining, domain, mode). diff --git a/api/connectivity_manager.py b/api/connectivity_manager.py index 1fb278d..08c2cba 100644 --- a/api/connectivity_manager.py +++ b/api/connectivity_manager.py @@ -43,6 +43,7 @@ import logging import os import re import secrets +import socket import threading import time from typing import Any, Dict, List, Optional, Tuple @@ -154,6 +155,13 @@ class ConnectivityManager(BaseServiceManager): REDIRECT_PORT_BASE = 9100 REDIRECT_PORT_MAX = 9199 + # WireGuard handshake older than this (seconds) means the tunnel is down. + WG_HANDSHAKE_MAX_AGE = 180 + # Cached health is reused for this long so on-demand GETs don't re-probe. + HEALTH_TTL = 30 + # TCP connect timeout for socket-based probes (sshuttle/proxy). + PROBE_TCP_TIMEOUT = 3 + IFACE_PREFIXES = {"wireguard_ext": "wgext_", "openvpn": "ovpn_"} CONNECTION_NAME_RE = re.compile(r'^[A-Za-z0-9][A-Za-z0-9 _.-]{0,63}$') @@ -1374,7 +1382,16 @@ class ConnectivityManager(BaseServiceManager): f"apply_routes: ip rule {conn.get('id')} failed: {e}") # Per-peer marking + nat redirect, resolved through each peer's - # connection instance. + # connection instance. A peer whose connection is unhealthy AND whose + # effective fail-open is True is skipped entirely: no mark, so its + # traffic falls through to the default route (direct internet) instead + # of being blocked by the kill-switch. Fail-closed peers keep their mark + # so the kill-switch blocks them while the tunnel is down. + # + # marked_conn_ids tracks connections that actually have at least one + # marked peer, so their kill-switch stays installed; a connection whose + # only peers are all fail-open-and-down gets no kill-switch. + marked_conn_ids: set = set() if self.peer_registry is not None: try: peers = self.peer_registry.list_peers() @@ -1387,18 +1404,27 @@ class ConnectivityManager(BaseServiceManager): conn = self._resolve_peer_connection(peer, by_id) if conn is None: continue + if self._peer_fails_open(peer, conn): + logger.info( + f"apply_routes: peer {peer.get('peer')!r} fails open over " + f"down connection {conn.get('id')!r}; skipping mark") + continue src_ip = self._peer_source_ip(peer.get('peer', '')) if not src_ip: continue rules_applied += self._apply_connection_for_src(src_ip, conn) + marked_conn_ids.add(conn.get('id')) # Kill-switch: drop marked packets that would otherwise leak via the - # default route if an iface-based exit interface is down. + # default route if an iface-based exit interface is down. Installed only + # for connections that still have at least one marked peer. for conn in connections: iface = conn.get('iface') mark = conn.get('mark') if not iface or not isinstance(mark, int): continue + if conn.get('id') not in marked_conn_ids: + continue try: self._add_killswitch(mark, iface) rules_applied += 1 @@ -1408,6 +1434,20 @@ class ConnectivityManager(BaseServiceManager): return {'ok': True, 'rules_applied': rules_applied} + def _peer_fails_open(self, peer: Dict[str, Any], + conn: Dict[str, Any]) -> bool: + """True when this peer should fall back to the default route. + + A peer falls back only when its connection is unhealthy (health is + explicitly 'down') AND its effective fail-open is True. A 'working' or + 'unknown' connection always routes normally so a stale/never-probed + status never silently drops the tunnel. + """ + health = (conn.get('status') or {}).get('health') + if health != 'down': + return False + return self.effective_failopen(peer, conn) + def _routing_connections(self) -> List[Dict[str, Any]]: """Return the connection instances that drive routing (enabled only).""" if self.config_manager is None: @@ -1631,6 +1671,267 @@ class ConnectivityManager(BaseServiceManager): except Exception: return False + # ── Health probing ──────────────────────────────────────────────────── + # + # probe_health(connection) returns 'working' | 'down' | 'unknown' per type. + # All real subprocess/socket calls are factored behind small helpers + # (_exec_in_container, _tcp_reachable) so tests patch those, never raw + # subprocess/socket. refresh_health() persists results via + # config_manager.set_connection_status and caches them for HEALTH_TTL. + + # Per-type fallback default: tor fails open (direct internet on outage), + # all interface/tunnel/proxy types fail closed (traffic blocked on outage). + FAILOPEN_DEFAULTS = { + "wireguard_ext": False, + "openvpn": False, + "sshuttle": False, + "proxy": False, + "tor": True, + } + + def probe_health(self, connection: Dict[str, Any]) -> Tuple[str, Optional[str]]: + """Probe a connection's liveness. Returns (health, detail). + + health is 'working' | 'down' | 'unknown'. detail is a short human + string (or None). Never raises — probe errors map to ('unknown', msg). + """ + conn_type = connection.get('type') + try: + if conn_type == 'wireguard_ext': + return self._probe_wireguard_ext(connection) + if conn_type == 'openvpn': + return self._probe_openvpn(connection) + if conn_type == 'tor': + return self._probe_tor(connection) + if conn_type == 'sshuttle': + return self._probe_sshuttle(connection) + if conn_type == 'proxy': + return self._probe_proxy(connection) + except Exception as e: + logger.warning(f"probe_health({connection.get('id')}): {e}") + return 'unknown', str(e) + return 'unknown', f'no probe for type {conn_type!r}' + + def _probe_wireguard_ext( + self, conn: Dict[str, Any], + ) -> Tuple[str, Optional[str]]: + """A WireGuard exit is working when its latest handshake is recent.""" + iface = conn.get('iface') + if not iface: + return 'unknown', 'no interface assigned' + container = self.EXIT_CONTAINERS.get('wireguard_ext') + r = self._exec_in_container( + container, ['wg', 'show', iface, 'latest-handshakes']) + if r is None or r.returncode != 0: + return 'down', 'wg show failed or interface absent' + newest = 0 + for line in (r.stdout or '').splitlines(): + parts = line.split() + if len(parts) >= 2: + try: + newest = max(newest, int(parts[-1])) + except ValueError: + continue + if newest == 0: + return 'down', 'no handshake yet' + age = int(time.time()) - newest + if age <= self.WG_HANDSHAKE_MAX_AGE: + return 'working', f'handshake {age}s ago' + return 'down', f'handshake stale ({age}s ago)' + + def _probe_openvpn(self, conn: Dict[str, Any]) -> Tuple[str, Optional[str]]: + """An OpenVPN exit is working when its tun iface exists and is UP.""" + iface = conn.get('iface') + container = self.EXIT_CONTAINERS.get('openvpn') + # The tun device lives in the openvpn container's net namespace. + r = self._exec_in_container(container, ['ip', 'link', 'show', iface]) \ + if iface else None + if r is None or r.returncode != 0: + # Fall back to the legacy fixed tun0 iface check in the WG container. + r2 = self._wg_ip(['link', 'show', self.IFACES.get('openvpn', 'tun0')]) + if r2.returncode == 0 and 'UP' in (r2.stdout or ''): + return 'working', 'tun up' + return 'down', 'tun interface absent or down' + if 'UP' in (r.stdout or ''): + return 'working', 'tun up' + return 'down', 'tun interface down' + + def _probe_tor(self, conn: Dict[str, Any]) -> Tuple[str, Optional[str]]: + """Tor is working when its container is running and bootstrapped.""" + container = self.EXIT_CONTAINERS.get('tor') + if not self._container_running(container): + return 'down', 'tor container not running' + r = self._exec_in_container( + container, ['sh', '-c', + 'grep -m1 "Bootstrapped 100" /var/log/tor/notices.log ' + '2>/dev/null || true']) + if r is not None and 'Bootstrapped 100' in (r.stdout or ''): + return 'working', 'bootstrapped' + # Container is up but bootstrap state is unknown — treat as working + # (tor fails open by default; a running container is the cheap signal). + return 'working', 'running' + + def _probe_sshuttle(self, conn: Dict[str, Any]) -> Tuple[str, Optional[str]]: + """sshuttle is working when the SSH host AND local listener are reachable.""" + cfg = conn.get('config', {}) + host = cfg.get('host') + port = cfg.get('port', 22) + if not host: + return 'unknown', 'no host configured' + if not self._tcp_reachable(host, int(port)): + return 'down', f'ssh host {host}:{port} unreachable' + listen_port = conn.get('redirect_port') + container = self.EXIT_CONTAINERS.get('sshuttle') + if isinstance(listen_port, int) and not self._listener_reachable( + container, listen_port): + return 'down', f'sshuttle listener :{listen_port} down' + return 'working', 'ssh host + listener reachable' + + def _probe_proxy(self, conn: Dict[str, Any]) -> Tuple[str, Optional[str]]: + """A proxy exit is working when the upstream proxy host:port accepts TCP.""" + cfg = conn.get('config', {}) + host = cfg.get('host') + port = cfg.get('port') + if not host or not port: + return 'unknown', 'no upstream host/port configured' + if self._tcp_reachable(host, int(port)): + return 'working', f'{host}:{port} reachable' + return 'down', f'upstream {host}:{port} unreachable' + + def _listener_reachable(self, container: Optional[str], port: int) -> bool: + """True when a local TCP listener on `port` is up inside the exit container.""" + r = self._exec_in_container( + container, ['sh', '-c', + f'nc -z -w2 127.0.0.1 {port} 2>/dev/null && echo ok || true']) + if r is not None and 'ok' in (r.stdout or ''): + return True + # Fall back to a host-side probe (container may lack nc). + return self._tcp_reachable('127.0.0.1', port) + + def _container_running(self, container: Optional[str]) -> bool: + """True when the named container is running (cheap docker inspect).""" + if not container: + return False + try: + r = subprocess.run( + ['docker', 'inspect', '-f', '{{.State.Running}}', container], + capture_output=True, text=True, timeout=5) + return r.returncode == 0 and r.stdout.strip() == 'true' + except Exception: + return False + + def _exec_in_container( + self, container: Optional[str], args: List[str], timeout: int = 8, + ) -> Optional[subprocess.CompletedProcess]: + """Run a command inside a container; None on failure. Mock target for tests.""" + if not container: + return None + try: + return subprocess.run( + ['docker', 'exec', container] + args, + capture_output=True, text=True, timeout=timeout) + except Exception as e: + logger.debug(f"_exec_in_container({container}): {e}") + return None + + def _tcp_reachable(self, host: str, port: int) -> bool: + """True when a TCP connection to host:port succeeds. Mock target for tests.""" + try: + with socket.create_connection( + (host, port), timeout=self.PROBE_TCP_TIMEOUT): + return True + except Exception: + return False + + def refresh_health( + self, connection_id: Optional[str] = None, force: bool = False, + ) -> Dict[str, Any]: + """Probe one or all connections and persist health into their status. + + With a TTL cache: a connection whose last_check is younger than + HEALTH_TTL is skipped unless force=True. Returns {id: health}. + """ + if self.config_manager is None: + return {} + try: + conns = self.config_manager.list_connections() + except Exception as e: + logger.warning(f"refresh_health: list_connections failed: {e}") + return {} + if connection_id is not None: + conns = [c for c in conns if c.get('id') == connection_id] + + results: Dict[str, Any] = {} + now = int(time.time()) + for conn in conns: + cid = conn.get('id') + if not cid or not conn.get('enabled', True): + continue + status = dict(conn.get('status', {})) + if not force and self._health_is_fresh(status, now): + results[cid] = status.get('health', 'unknown') + continue + health, detail = self.probe_health(conn) + status['health'] = health + status['detail'] = detail + status['last_check'] = self._now_iso() + try: + self.config_manager.set_connection_status(cid, status) + except Exception as e: + logger.warning(f"refresh_health: persist {cid} failed: {e}") + results[cid] = health + return results + + def _health_is_fresh(self, status: Dict[str, Any], now: int) -> bool: + """True when status.last_check is within HEALTH_TTL of `now`. + + last_check is a UTC ISO string (from _now_iso); calendar.timegm parses + it back as UTC so the comparison is timezone-consistent with `now` + (int(time.time()), also epoch/UTC). + """ + import calendar + last = status.get('last_check') + if not last: + return False + try: + ts = calendar.timegm(time.strptime(last, '%Y-%m-%dT%H:%M:%SZ')) + return (now - int(ts)) < self.HEALTH_TTL + except (ValueError, OverflowError): + return False + + def effective_failopen(self, peer: Dict[str, Any], + conn: Optional[Dict[str, Any]]) -> bool: + """Resolve a peer's effective fail-open for its connection. + + peer.exit_failopen overrides when set (bool); otherwise the per-type + default applies. An unknown/missing connection type fails closed. + """ + override = peer.get('exit_failopen') + if isinstance(override, bool): + return override + conn_type = conn.get('type') if conn else None + return self.FAILOPEN_DEFAULTS.get(conn_type, False) + + def set_peer_failopen(self, peer_name: str, + failopen: Optional[bool]) -> Dict[str, Any]: + """Set or clear a peer's fail-open override (None = use type default).""" + if failopen is not None and not isinstance(failopen, bool): + return {'ok': False, 'error': 'failopen must be a boolean or null'} + if self.peer_registry is None: + return {'ok': False, 'error': 'peer_registry not available'} + if not self._peer_exists(peer_name): + return {'ok': False, 'error': f'peer {peer_name!r} not found'} + try: + self.peer_registry.update_peer(peer_name, {'exit_failopen': failopen}) + except Exception as e: + logger.error(f"set_peer_failopen({peer_name}): {e}") + return {'ok': False, 'error': str(e)} + try: + self.apply_routes() + except Exception as e: + logger.warning(f"set_peer_failopen: apply_routes failed (non-fatal): {e}") + return {'ok': True, 'peer': peer_name, 'exit_failopen': failopen} + def _peer_source_ip(self, peer_name: str) -> Optional[str]: """Return a peer's WireGuard IP (no /CIDR suffix).""" if not peer_name or self.peer_registry is None: diff --git a/tests/test_audit_hook_routes.py b/tests/test_audit_hook_routes.py index 3f41b0d..7586ce9 100644 --- a/tests/test_audit_hook_routes.py +++ b/tests/test_audit_hook_routes.py @@ -110,6 +110,39 @@ def test_unmapped_mutating_endpoint_gets_generic_action(auth_mgr, audit_mgr): assert match[0]['target_type'] == 'unknown' +# ── connectivity v2 connection routes are audited ───────────────────────────── + +def test_connection_create_audited(auth_mgr, audit_mgr): + with _client(auth_mgr, audit_mgr, login_as='admin') as c: + with patch('app.connectivity_manager') as cm: + cm.create_connection.return_value = {'ok': True, 'connection': {'id': 'c'}} + c.post('/api/connectivity/connections', + json={'type': 'tor', 'name': 'T'}) + res = audit_mgr.query({'action': 'connection.create'}) + assert res['total'] >= 1 + assert res['entries'][0]['target_type'] == 'connection' + + +def test_connection_delete_audited_with_id(auth_mgr, audit_mgr): + with _client(auth_mgr, audit_mgr, login_as='admin') as c: + with patch('app.connectivity_manager') as cm: + cm.delete_connection.return_value = {'ok': True} + c.delete('/api/connectivity/connections/conn_abc') + res = audit_mgr.query({'action': 'connection.delete'}) + assert res['total'] >= 1 + assert res['entries'][0]['target_id'] == 'conn_abc' + + +def test_peer_failopen_audited(auth_mgr, audit_mgr): + with _client(auth_mgr, audit_mgr, login_as='admin') as c: + with patch('app.connectivity_manager') as cm: + cm.set_peer_failopen.return_value = {'ok': True, 'peer': 'bob'} + c.put('/api/connectivity/peers/bob/failopen', json={'failopen': True}) + res = audit_mgr.query({'action': 'peer.failopen'}) + assert res['total'] >= 1 + assert res['entries'][0]['target_id'] == 'bob' + + # ── auth routes: never write password ───────────────────────────────────────── def test_change_password_audited_without_value(auth_mgr, audit_mgr): diff --git a/tests/test_connectivity_api.py b/tests/test_connectivity_api.py new file mode 100644 index 0000000..b883628 --- /dev/null +++ b/tests/test_connectivity_api.py @@ -0,0 +1,331 @@ +""" +Phase 4 tests for the generic connection CRUD REST API and the per-peer +fail-open endpoint. Logic lives in ConnectivityManager (mocked here); these +tests assert the thin route layer: status codes, error mapping (404/409/400), +that secrets are never echoed, and that admin-only enforcement applies. + +The `client` fixture sets TESTING=True (bypassing auth/CSRF) for happy-path +status-code checks; admin-only enforcement is verified separately against a +real seeded AuthManager with TESTING off. +""" + +import os +import sys +import json +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent / 'api')) + +import app as app_module +from app import app +from auth_manager import AuthManager + + +@pytest.fixture +def client(): + app.config['TESTING'] = True + with app.test_client() as c: + yield c + + +# --------------------------------------------------------------------------- +# GET /api/connectivity/connections +# --------------------------------------------------------------------------- + +class TestListConnections: + def test_list_returns_connections_with_status(self, client): + cm = MagicMock() + cm.list_connections.return_value = [ + {'id': 'conn_a', 'type': 'proxy', 'name': 'P', + 'secret_refs': ['conn_a_password'], + 'status': {'state': 'configured', 'health': 'working'}, + 'config': {'host': 'p', 'port': 3128}}, + ] + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.get('/api/connectivity/connections') + assert resp.status_code == 200 + body = resp.get_json() + assert body['connections'][0]['id'] == 'conn_a' + assert body['connections'][0]['status']['health'] == 'working' + + def test_list_never_returns_secret_values(self, client): + cm = MagicMock() + # _public_record strips secret values; the manager is what enforces it. + cm.list_connections.return_value = [ + {'id': 'conn_a', 'type': 'sshuttle', 'name': 'S', + 'secret_refs': ['conn_a_private_key'], + 'config': {'host': 'h', 'user': 'u', 'auth': 'key'}, + 'status': {}}, + ] + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.get('/api/connectivity/connections') + raw = resp.get_data(as_text=True) + assert 'PRIVATE KEY' not in raw + assert 'private_key' not in json.loads(raw)['connections'][0]['config'] + + def test_list_500_on_exception(self, client): + cm = MagicMock() + cm.list_connections.side_effect = RuntimeError('boom') + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.get('/api/connectivity/connections') + assert resp.status_code == 500 + + +# --------------------------------------------------------------------------- +# POST /api/connectivity/connections +# --------------------------------------------------------------------------- + +class TestCreateConnection: + def test_create_201_with_record(self, client): + cm = MagicMock() + cm.create_connection.return_value = { + 'ok': True, 'connection': {'id': 'conn_x', 'type': 'proxy', + 'secret_refs': []}} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.post('/api/connectivity/connections', + json={'type': 'proxy', 'name': 'My Proxy', + 'config': {'scheme': 'http', 'host': 'p', + 'port': 3128}}) + assert resp.status_code == 201 + assert resp.get_json()['connection']['id'] == 'conn_x' + + def test_create_passes_secrets_to_manager(self, client): + cm = MagicMock() + cm.create_connection.return_value = {'ok': True, 'connection': {'id': 'c'}} + with patch.object(app_module, 'connectivity_manager', cm): + client.post('/api/connectivity/connections', + json={'type': 'sshuttle', 'name': 'S', + 'config': {'host': 'h'}, + 'secrets': {'private_key': 'SECRET'}}) + _, kwargs = cm.create_connection.call_args + assert kwargs['secrets'] == {'private_key': 'SECRET'} + + def test_create_does_not_echo_secret_value(self, client): + cm = MagicMock() + cm.create_connection.return_value = { + 'ok': True, 'connection': {'id': 'c', 'secret_refs': ['c_private_key'], + 'config': {}}} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.post('/api/connectivity/connections', + json={'type': 'sshuttle', 'name': 'S', + 'secrets': {'private_key': 'TOPSECRET'}}) + assert 'TOPSECRET' not in resp.get_data(as_text=True) + + def test_create_missing_type_400(self, client): + resp = client.post('/api/connectivity/connections', json={'name': 'x'}) + assert resp.status_code == 400 + + def test_create_missing_name_400(self, client): + resp = client.post('/api/connectivity/connections', json={'type': 'tor'}) + assert resp.status_code == 400 + + def test_create_validation_error_400(self, client): + cm = MagicMock() + cm.create_connection.return_value = {'ok': False, 'error': 'invalid host'} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.post('/api/connectivity/connections', + json={'type': 'proxy', 'name': 'P', 'config': {}}) + assert resp.status_code == 400 + assert 'invalid host' in resp.get_json()['error'] + + +# --------------------------------------------------------------------------- +# PUT /api/connectivity/connections/ +# --------------------------------------------------------------------------- + +class TestUpdateConnection: + def test_update_200(self, client): + cm = MagicMock() + cm.update_connection.return_value = { + 'ok': True, 'connection': {'id': 'conn_a', 'name': 'New'}} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.put('/api/connectivity/connections/conn_a', + json={'name': 'New'}) + assert resp.status_code == 200 + + def test_update_not_found_404(self, client): + cm = MagicMock() + cm.update_connection.return_value = { + 'ok': False, 'error': "connection 'conn_z' not found"} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.put('/api/connectivity/connections/conn_z', + json={'name': 'x'}) + assert resp.status_code == 404 + + def test_update_validation_400(self, client): + cm = MagicMock() + cm.update_connection.return_value = {'ok': False, 'error': 'invalid name'} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.put('/api/connectivity/connections/conn_a', + json={'name': '!!!'}) + assert resp.status_code == 400 + + +# --------------------------------------------------------------------------- +# DELETE /api/connectivity/connections/ +# --------------------------------------------------------------------------- + +class TestDeleteConnection: + def test_delete_200(self, client): + cm = MagicMock() + cm.delete_connection.return_value = {'ok': True} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.delete('/api/connectivity/connections/conn_a') + assert resp.status_code == 200 + + def test_delete_referenced_409(self, client): + cm = MagicMock() + cm.delete_connection.return_value = { + 'ok': False, 'error': "connection is in use by peer 'alice'; detach it first"} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.delete('/api/connectivity/connections/conn_a') + assert resp.status_code == 409 + assert 'in use by' in resp.get_json()['error'] + + def test_delete_not_found_404(self, client): + cm = MagicMock() + cm.delete_connection.return_value = { + 'ok': False, 'error': "connection 'conn_z' not found"} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.delete('/api/connectivity/connections/conn_z') + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# GET /api/connectivity/connections//health +# --------------------------------------------------------------------------- + +class TestConnectionHealthEndpoint: + def test_health_returns_probe_result(self, client): + cm = MagicMock() + cm.get_connection.return_value = {'id': 'conn_a', 'type': 'proxy'} + cm.probe_health.return_value = ('working', 'reachable') + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.get('/api/connectivity/connections/conn_a/health') + assert resp.status_code == 200 + body = resp.get_json() + assert body['health'] == 'working' + assert body['detail'] == 'reachable' + + def test_health_unknown_connection_404(self, client): + cm = MagicMock() + cm.get_connection.return_value = None + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.get('/api/connectivity/connections/conn_z/health') + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# PUT /api/connectivity/peers//failopen +# --------------------------------------------------------------------------- + +class TestSetPeerFailopen: + def test_set_true_200(self, client): + cm = MagicMock() + cm.set_peer_failopen.return_value = { + 'ok': True, 'peer': 'alice', 'exit_failopen': True} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.put('/api/connectivity/peers/alice/failopen', + json={'failopen': True}) + assert resp.status_code == 200 + cm.set_peer_failopen.assert_called_once_with('alice', True) + + def test_clear_with_null_200(self, client): + cm = MagicMock() + cm.set_peer_failopen.return_value = { + 'ok': True, 'peer': 'alice', 'exit_failopen': None} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.put('/api/connectivity/peers/alice/failopen', + json={'failopen': None}) + assert resp.status_code == 200 + cm.set_peer_failopen.assert_called_once_with('alice', None) + + def test_non_bool_400(self, client): + resp = client.put('/api/connectivity/peers/alice/failopen', + json={'failopen': 'yes'}) + assert resp.status_code == 400 + + def test_unknown_peer_404(self, client): + cm = MagicMock() + cm.set_peer_failopen.return_value = { + 'ok': False, 'error': "peer 'ghost' not found"} + with patch.object(app_module, 'connectivity_manager', cm): + resp = client.put('/api/connectivity/peers/ghost/failopen', + json={'failopen': True}) + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# admin-only enforcement (mutating connection routes) +# --------------------------------------------------------------------------- + +def _seed_auth(tmp_path): + data_dir = str(tmp_path / 'data') + config_dir = str(tmp_path / 'config') + os.makedirs(data_dir, exist_ok=True) + os.makedirs(config_dir, exist_ok=True) + mgr = AuthManager(data_dir=data_dir, config_dir=config_dir) + mgr.create_user('admin', 'AdminPass123!', 'admin') + mgr.create_user('alice', 'AlicePass123!', 'peer') + return mgr + + +class TestAdminOnly: + def _login(self, c, user, pw): + return c.post('/api/auth/login', + data=json.dumps({'username': user, 'password': pw}), + content_type='application/json') + + def test_peer_role_forbidden_on_create(self, tmp_path): + auth = _seed_auth(tmp_path) + app.config['TESTING'] = False + app.config['SECRET_KEY'] = 'test-secret' + try: + import auth_routes + with patch.object(app_module, 'auth_manager', auth), \ + patch.object(auth_routes, 'auth_manager', auth, create=True), \ + patch.object(app_module.setup_manager, 'is_setup_complete', + return_value=True): + with app.test_client() as c: + assert self._login(c, 'alice', 'AlicePass123!').status_code == 200 + # CSRF token from session for the mutating request. + with c.session_transaction() as sess: + token = sess.get('csrf_token') + resp = c.post('/api/connectivity/connections', + json={'type': 'tor', 'name': 'T'}, + headers={'X-CSRF-Token': token or ''}) + assert resp.status_code == 403 + finally: + app.config['TESTING'] = True + + def test_admin_role_allowed_on_create(self, tmp_path): + auth = _seed_auth(tmp_path) + cm = MagicMock() + cm.create_connection.return_value = {'ok': True, 'connection': {'id': 'c'}} + app.config['TESTING'] = False + app.config['SECRET_KEY'] = 'test-secret' + try: + import auth_routes + with patch.object(app_module, 'auth_manager', auth), \ + patch.object(auth_routes, 'auth_manager', auth, create=True), \ + patch.object(app_module, 'connectivity_manager', cm), \ + patch.object(app_module.setup_manager, 'is_setup_complete', + return_value=True): + with app.test_client() as c: + assert self._login(c, 'admin', 'AdminPass123!').status_code == 200 + with c.session_transaction() as sess: + token = sess.get('csrf_token') + resp = c.post('/api/connectivity/connections', + json={'type': 'tor', 'name': 'T'}, + headers={'X-CSRF-Token': token or ''}) + assert resp.status_code == 201 + finally: + app.config['TESTING'] = True + + +if __name__ == '__main__': + import pytest as _pytest + _pytest.main([__file__, '-q']) diff --git a/tests/test_connectivity_health.py b/tests/test_connectivity_health.py new file mode 100644 index 0000000..0d7e585 --- /dev/null +++ b/tests/test_connectivity_health.py @@ -0,0 +1,453 @@ +""" +Phase 3 tests for ConnectivityManager — per-connection health probing, +refresh_health persistence + TTL cache, per-peer configurable fallback +(exit_failopen + type defaults), and apply_routes fail-open/fail-closed +behaviour while a connection is DOWN. + +All real subprocess/socket access is mocked via the small helper methods +(_exec_in_container, _tcp_reachable, _container_running, _listener_reachable, +_wg_ip) so no live infrastructure is touched. +""" + +import os +import sys +import shutil +import tempfile +import time +import unittest +from unittest.mock import MagicMock, patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api')) + +import connectivity_manager as cm_module +from connectivity_manager import ConnectivityManager + + +def _make_manager(config_manager=None, peer_registry=None, tmp_dir=None): + if tmp_dir is None: + tmp_dir = tempfile.mkdtemp() + if config_manager is None: + config_manager = MagicMock() + config_manager.get_identity.return_value = { + 'cell_name': 'test', 'ip_range': '172.20.0.0/16'} + config_manager.list_connections.return_value = [] + if peer_registry is None: + peer_registry = MagicMock() + peer_registry.list_peers.return_value = [] + with patch.object(ConnectivityManager, '_subscribe_to_events', + lambda self: None): + mgr = ConnectivityManager( + config_manager=config_manager, + peer_registry=peer_registry, + data_dir=tmp_dir, + config_dir=tmp_dir, + ) + return mgr + + +def _cp(returncode=0, stdout='', stderr=''): + return MagicMock(returncode=returncode, stdout=stdout, stderr=stderr) + + +# --------------------------------------------------------------------------- +# probe_health per type +# --------------------------------------------------------------------------- + +class TestProbeHealth(unittest.TestCase): + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.mgr = _make_manager(tmp_dir=self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + # wireguard_ext --------------------------------------------------------- + + def test_wireguard_recent_handshake_working(self): + conn = {'id': 'c1', 'type': 'wireguard_ext', 'iface': 'wgext_a'} + recent = str(int(time.time()) - 10) + with patch.object(self.mgr, '_exec_in_container', + return_value=_cp(stdout=f'PUBKEY\t{recent}\n')): + health, detail = self.mgr.probe_health(conn) + self.assertEqual(health, 'working') + + def test_wireguard_stale_handshake_down(self): + conn = {'id': 'c1', 'type': 'wireguard_ext', 'iface': 'wgext_a'} + stale = str(int(time.time()) - 9999) + with patch.object(self.mgr, '_exec_in_container', + return_value=_cp(stdout=f'PUBKEY\t{stale}\n')): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'down') + + def test_wireguard_no_handshake_down(self): + conn = {'id': 'c1', 'type': 'wireguard_ext', 'iface': 'wgext_a'} + with patch.object(self.mgr, '_exec_in_container', + return_value=_cp(stdout='PUBKEY\t0\n')): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'down') + + def test_wireguard_exec_fails_down(self): + conn = {'id': 'c1', 'type': 'wireguard_ext', 'iface': 'wgext_a'} + with patch.object(self.mgr, '_exec_in_container', return_value=None): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'down') + + def test_wireguard_no_iface_unknown(self): + conn = {'id': 'c1', 'type': 'wireguard_ext', 'iface': None} + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'unknown') + + # openvpn --------------------------------------------------------------- + + def test_openvpn_tun_up_working(self): + conn = {'id': 'c2', 'type': 'openvpn', 'iface': 'ovpn_x'} + with patch.object(self.mgr, '_exec_in_container', + return_value=_cp(stdout='5: ovpn_x: ')): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'working') + + def test_openvpn_tun_down(self): + conn = {'id': 'c2', 'type': 'openvpn', 'iface': 'ovpn_x'} + with patch.object(self.mgr, '_exec_in_container', + return_value=_cp(stdout='5: ovpn_x: ')): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'down') + + def test_openvpn_falls_back_to_wg_container_tun(self): + conn = {'id': 'c2', 'type': 'openvpn', 'iface': 'ovpn_x'} + with patch.object(self.mgr, '_exec_in_container', return_value=None), \ + patch.object(self.mgr, '_wg_ip', + return_value=_cp(stdout='tun0: ')): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'working') + + # tor ------------------------------------------------------------------- + + def test_tor_bootstrapped_working(self): + conn = {'id': 'c3', 'type': 'tor'} + with patch.object(self.mgr, '_container_running', return_value=True), \ + patch.object(self.mgr, '_exec_in_container', + return_value=_cp(stdout='Bootstrapped 100% (done)')): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'working') + + def test_tor_container_down(self): + conn = {'id': 'c3', 'type': 'tor'} + with patch.object(self.mgr, '_container_running', return_value=False): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'down') + + def test_tor_running_but_no_bootstrap_log_still_working(self): + conn = {'id': 'c3', 'type': 'tor'} + with patch.object(self.mgr, '_container_running', return_value=True), \ + patch.object(self.mgr, '_exec_in_container', + return_value=_cp(stdout='')): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'working') + + # sshuttle -------------------------------------------------------------- + + def test_sshuttle_host_and_listener_working(self): + conn = {'id': 'c4', 'type': 'sshuttle', 'redirect_port': 9100, + 'config': {'host': 'ssh.example.com', 'port': 22}} + with patch.object(self.mgr, '_tcp_reachable', return_value=True), \ + patch.object(self.mgr, '_listener_reachable', return_value=True): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'working') + + def test_sshuttle_host_unreachable_down(self): + conn = {'id': 'c4', 'type': 'sshuttle', 'redirect_port': 9100, + 'config': {'host': 'ssh.example.com', 'port': 22}} + with patch.object(self.mgr, '_tcp_reachable', return_value=False): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'down') + + def test_sshuttle_listener_down(self): + conn = {'id': 'c4', 'type': 'sshuttle', 'redirect_port': 9100, + 'config': {'host': 'ssh.example.com', 'port': 22}} + with patch.object(self.mgr, '_tcp_reachable', return_value=True), \ + patch.object(self.mgr, '_listener_reachable', return_value=False): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'down') + + def test_sshuttle_no_host_unknown(self): + conn = {'id': 'c4', 'type': 'sshuttle', 'config': {}} + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'unknown') + + # proxy ----------------------------------------------------------------- + + def test_proxy_reachable_working(self): + conn = {'id': 'c5', 'type': 'proxy', + 'config': {'host': 'proxy.example.com', 'port': 3128}} + with patch.object(self.mgr, '_tcp_reachable', return_value=True): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'working') + + def test_proxy_unreachable_down(self): + conn = {'id': 'c5', 'type': 'proxy', + 'config': {'host': 'proxy.example.com', 'port': 3128}} + with patch.object(self.mgr, '_tcp_reachable', return_value=False): + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'down') + + def test_proxy_missing_config_unknown(self): + conn = {'id': 'c5', 'type': 'proxy', 'config': {}} + health, _ = self.mgr.probe_health(conn) + self.assertEqual(health, 'unknown') + + def test_unknown_type_returns_unknown(self): + health, _ = self.mgr.probe_health({'id': 'x', 'type': 'bogus'}) + self.assertEqual(health, 'unknown') + + def test_probe_never_raises(self): + conn = {'id': 'c5', 'type': 'proxy', + 'config': {'host': 'h', 'port': 1}} + with patch.object(self.mgr, '_tcp_reachable', + side_effect=RuntimeError('boom')): + health, detail = self.mgr.probe_health(conn) + self.assertEqual(health, 'unknown') + self.assertIn('boom', detail) + + +# --------------------------------------------------------------------------- +# refresh_health — persistence + TTL cache +# --------------------------------------------------------------------------- + +class TestRefreshHealth(unittest.TestCase): + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.conns = [ + {'id': 'c1', 'type': 'proxy', 'enabled': True, + 'config': {'host': 'p', 'port': 3128}, 'status': {}}, + ] + self.cm = MagicMock() + self.cm.list_connections.return_value = self.conns + self.mgr = _make_manager(config_manager=self.cm, tmp_dir=self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_refresh_persists_status(self): + with patch.object(self.mgr, '_tcp_reachable', return_value=True): + result = self.mgr.refresh_health() + self.assertEqual(result['c1'], 'working') + self.cm.set_connection_status.assert_called_once() + cid, status = self.cm.set_connection_status.call_args.args + self.assertEqual(cid, 'c1') + self.assertEqual(status['health'], 'working') + self.assertIsNotNone(status['last_check']) + + def test_refresh_single_connection_only(self): + self.conns.append({'id': 'c2', 'type': 'proxy', 'enabled': True, + 'config': {'host': 'p2', 'port': 3128}, 'status': {}}) + with patch.object(self.mgr, '_tcp_reachable', return_value=True): + result = self.mgr.refresh_health(connection_id='c2') + self.assertEqual(list(result.keys()), ['c2']) + + def test_ttl_skips_fresh_connections(self): + fresh = self.mgr._now_iso() + self.conns[0]['status'] = {'health': 'working', 'last_check': fresh} + with patch.object(self.mgr, '_tcp_reachable') as tcp: + result = self.mgr.refresh_health() + tcp.assert_not_called() + self.assertEqual(result['c1'], 'working') + self.cm.set_connection_status.assert_not_called() + + def test_force_reprobes_even_when_fresh(self): + fresh = self.mgr._now_iso() + self.conns[0]['status'] = {'health': 'working', 'last_check': fresh} + with patch.object(self.mgr, '_tcp_reachable', return_value=False): + self.mgr.refresh_health(force=True) + self.cm.set_connection_status.assert_called_once() + + def test_disabled_connection_skipped(self): + self.conns[0]['enabled'] = False + with patch.object(self.mgr, '_tcp_reachable', return_value=True) as tcp: + result = self.mgr.refresh_health() + tcp.assert_not_called() + self.assertEqual(result, {}) + + +# --------------------------------------------------------------------------- +# per-peer fail-open resolution +# --------------------------------------------------------------------------- + +class TestFailopenResolution(unittest.TestCase): + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.pr = MagicMock() + self.mgr = _make_manager(peer_registry=self.pr, tmp_dir=self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_type_default_tor_fails_open(self): + peer = {'peer': 'a'} + self.assertTrue(self.mgr.effective_failopen(peer, {'type': 'tor'})) + + def test_type_default_wireguard_fails_closed(self): + peer = {'peer': 'a'} + self.assertFalse( + self.mgr.effective_failopen(peer, {'type': 'wireguard_ext'})) + + def test_override_true_beats_type_default(self): + peer = {'peer': 'a', 'exit_failopen': True} + self.assertTrue( + self.mgr.effective_failopen(peer, {'type': 'wireguard_ext'})) + + def test_override_false_beats_tor_default(self): + peer = {'peer': 'a', 'exit_failopen': False} + self.assertFalse(self.mgr.effective_failopen(peer, {'type': 'tor'})) + + def test_none_override_uses_type_default(self): + peer = {'peer': 'a', 'exit_failopen': None} + self.assertTrue(self.mgr.effective_failopen(peer, {'type': 'tor'})) + + def test_set_peer_failopen_updates_peer(self): + self.pr.get_peer.return_value = {'peer': 'a'} + with patch.object(self.mgr, 'apply_routes', return_value={'ok': True}): + result = self.mgr.set_peer_failopen('a', True) + self.assertTrue(result['ok']) + self.pr.update_peer.assert_called_once_with('a', {'exit_failopen': True}) + + def test_set_peer_failopen_clear_with_null(self): + self.pr.get_peer.return_value = {'peer': 'a'} + with patch.object(self.mgr, 'apply_routes', return_value={'ok': True}): + result = self.mgr.set_peer_failopen('a', None) + self.assertTrue(result['ok']) + self.pr.update_peer.assert_called_once_with('a', {'exit_failopen': None}) + + def test_set_peer_failopen_unknown_peer(self): + self.pr.get_peer.return_value = None + result = self.mgr.set_peer_failopen('ghost', True) + self.assertFalse(result['ok']) + self.assertIn('not found', result['error']) + + def test_set_peer_failopen_rejects_non_bool(self): + self.pr.get_peer.return_value = {'peer': 'a'} + result = self.mgr.set_peer_failopen('a', 'yes') + self.assertFalse(result['ok']) + + +# --------------------------------------------------------------------------- +# apply_routes — fail-open / fail-closed under DOWN connections +# --------------------------------------------------------------------------- + +class TestApplyRoutesFallback(unittest.TestCase): + + def setUp(self): + self.tmp = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _mgr(self, conns, peers, peer_ips): + cm = MagicMock() + cm.get_identity.return_value = {'cell_name': 't', 'ip_range': '172.20.0.0/16'} + cm.list_connections.return_value = conns + pr = MagicMock() + pr.list_peers.return_value = peers + pr.get_peer.side_effect = lambda n: peer_ips.get(n) + return _make_manager(config_manager=cm, peer_registry=pr, tmp_dir=self.tmp) + + def test_down_failopen_peer_skips_mark_and_killswitch(self): + conns = [{'id': 'c1', 'type': 'wireguard_ext', 'enabled': True, + 'mark': 0x1000, 'table': 1000, 'iface': 'wgext_a', + 'redirect_port': None, 'status': {'health': 'down'}}] + peers = [{'peer': 'a', 'exit_via': 'c1', 'exit_failopen': True}] + ips = {'a': {'peer': 'a', 'ip': '172.20.0.50/32'}} + mgr = self._mgr(conns, peers, ips) + + with patch.object(mgr, '_add_mark_rule') as mark, \ + patch.object(mgr, '_add_killswitch') as ks, \ + patch.object(cm_module, 'subprocess') as sp: + sp.run.return_value = _cp() + mgr.apply_routes() + + mark.assert_not_called() + ks.assert_not_called() + + def test_down_failclosed_peer_keeps_mark_and_killswitch(self): + conns = [{'id': 'c1', 'type': 'wireguard_ext', 'enabled': True, + 'mark': 0x1000, 'table': 1000, 'iface': 'wgext_a', + 'redirect_port': None, 'status': {'health': 'down'}}] + # No override → wireguard_ext default is fail-closed. + peers = [{'peer': 'a', 'exit_via': 'c1'}] + ips = {'a': {'peer': 'a', 'ip': '172.20.0.50/32'}} + mgr = self._mgr(conns, peers, ips) + + with patch.object(mgr, '_add_mark_rule') as mark, \ + patch.object(mgr, '_add_killswitch') as ks, \ + patch.object(cm_module, 'subprocess') as sp: + sp.run.return_value = _cp() + mgr.apply_routes() + + mark.assert_called_once_with('172.20.0.50', 0x1000) + ks.assert_called_once_with(0x1000, 'wgext_a') + + def test_working_failopen_peer_routes_normally(self): + conns = [{'id': 'c1', 'type': 'wireguard_ext', 'enabled': True, + 'mark': 0x1000, 'table': 1000, 'iface': 'wgext_a', + 'redirect_port': None, 'status': {'health': 'working'}}] + peers = [{'peer': 'a', 'exit_via': 'c1', 'exit_failopen': True}] + ips = {'a': {'peer': 'a', 'ip': '172.20.0.50/32'}} + mgr = self._mgr(conns, peers, ips) + + with patch.object(mgr, '_add_mark_rule') as mark, \ + patch.object(mgr, '_add_killswitch') as ks, \ + patch.object(cm_module, 'subprocess') as sp: + sp.run.return_value = _cp() + mgr.apply_routes() + + mark.assert_called_once_with('172.20.0.50', 0x1000) + ks.assert_called_once_with(0x1000, 'wgext_a') + + def test_unknown_health_routes_normally(self): + """A never-probed connection (health unknown) must not silently drop.""" + conns = [{'id': 'c1', 'type': 'wireguard_ext', 'enabled': True, + 'mark': 0x1000, 'table': 1000, 'iface': 'wgext_a', + 'redirect_port': None, 'status': {'health': 'unknown'}}] + peers = [{'peer': 'a', 'exit_via': 'c1', 'exit_failopen': True}] + ips = {'a': {'peer': 'a', 'ip': '172.20.0.50/32'}} + mgr = self._mgr(conns, peers, ips) + + with patch.object(mgr, '_add_mark_rule') as mark, \ + patch.object(cm_module, 'subprocess') as sp: + sp.run.return_value = _cp() + mgr.apply_routes() + + mark.assert_called_once_with('172.20.0.50', 0x1000) + + def test_mixed_peers_failclosed_keeps_killswitch(self): + """When one peer fails open and another fails closed on the same DOWN + connection, the fail-closed peer keeps its mark and the killswitch + stays so its traffic is blocked while the tunnel is down.""" + conns = [{'id': 'c1', 'type': 'wireguard_ext', 'enabled': True, + 'mark': 0x1000, 'table': 1000, 'iface': 'wgext_a', + 'redirect_port': None, 'status': {'health': 'down'}}] + peers = [ + {'peer': 'a', 'exit_via': 'c1', 'exit_failopen': True}, + {'peer': 'b', 'exit_via': 'c1', 'exit_failopen': False}, + ] + ips = {'a': {'peer': 'a', 'ip': '172.20.0.50/32'}, + 'b': {'peer': 'b', 'ip': '172.20.0.51/32'}} + mgr = self._mgr(conns, peers, ips) + + marked = [] + with patch.object(mgr, '_add_mark_rule', + side_effect=lambda ip, m: marked.append(ip)), \ + patch.object(mgr, '_add_killswitch') as ks, \ + patch.object(cm_module, 'subprocess') as sp: + sp.run.return_value = _cp() + mgr.apply_routes() + + self.assertEqual(marked, ['172.20.0.51']) + ks.assert_called_once_with(0x1000, 'wgext_a') + + +if __name__ == '__main__': + unittest.main() diff --git a/webui/src/services/api.js b/webui/src/services/api.js index 40565d9..67c0ea5 100644 --- a/webui/src/services/api.js +++ b/webui/src/services/api.js @@ -381,6 +381,15 @@ export const connectivityAPI = { applyRoutes: () => api.post('/api/connectivity/exits/apply'), getPeerExits: () => api.get('/api/connectivity/peers'), setPeerExit: (peer_name, connection_id) => api.put(`/api/connectivity/peers/${peer_name}/exit`, { connection_id }), + // Connectivity v2 — generic connection CRUD + health + per-peer fallback + listConnections: () => api.get('/api/connectivity/connections'), + createConnection: (type, name, config = {}, secrets = {}) => + api.post('/api/connectivity/connections', { type, name, config, secrets }), + updateConnection: (id, fields) => api.put(`/api/connectivity/connections/${id}`, fields), + deleteConnection: (id) => api.delete(`/api/connectivity/connections/${id}`), + probeConnectionHealth: (id) => api.get(`/api/connectivity/connections/${id}/health`), + setPeerFailopen: (peer_name, failopen) => + api.put(`/api/connectivity/peers/${peer_name}/failopen`, { failopen }), }; // Container Management API