Files
pic/api/connectivity_manager.py
T
roof 6232ef23a9 feat: connectivity — registry-driven peer table, sshuttle/proxy egress, egress UI
The peer table was empty because it was not consulting the peer registry;
now peers are driven by PeerRegistry so the Connectivity page reflects actual
connected cells.

Exit-key handling is unified: all code paths now use the same key derivation
so a store-service exit bridge and a manual WireGuard peer both produce
consistent routing state.

Two new egress exit types are added (sshuttle via SSH tunnel and proxy via
redsocks SOCKS5), wiring through connectivity_manager, egress_manager, and
app.py routes. This lets a cell route its traffic through an SSH host or a
SOCKS5 proxy as an alternative to WireGuard exit nodes.

ServiceStoreManager and ServiceBus updated so the egress lifecycle (install /
uninstall) is cleanly signalled between components.

Connectivity.jsx gains the Service Egress section, letting operators assign
and reassign egress methods from the UI without touching config files.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 08:36:15 -04:00

976 lines
41 KiB
Python

#!/usr/bin/env python3
"""
Connectivity Manager for Personal Internet Cell — Phase 5 Extended Connectivity.
Provides per-peer egress routing through alternate exits (WireGuard external,
OpenVPN, Tor, sshuttle SSH tunnel, upstream proxy) via Linux policy routing
(fwmark + ip rule + dedicated routing tables) and dedicated iptables chains.
Architecture
------------
- A peer's `exit_via` field selects the egress path: "default", "wireguard_ext",
"openvpn", "tor", "sshuttle", or "proxy".
- Each non-default exit type is assigned a unique fwmark and a dedicated routing
table:
wireguard_ext mark 0x10 table 110 iface wg_ext0
openvpn mark 0x20 table 120 iface tun0
tor mark 0x30 table 130 (transparent proxy → 9040)
sshuttle mark 0x40 table 140 (transparent proxy → 12300)
proxy mark 0x50 table 150 (transparent proxy → 12345, redsocks)
- All rules live in dedicated PIC_CONNECTIVITY chains in the `mangle` and `nat`
tables so they can be flushed/rebuilt without touching firewall_manager rules.
- A kill-switch FORWARD DROP rule prevents leaks if the exit interface is down.
Container model
---------------
Each exit type runs in its own separate container; this manager only programs
policy routing rules in the WireGuard server container (cell-wireguard) where
peer traffic flows through.
Config files
------------
- WireGuard external: {config_dir}/connectivity/wireguard_ext/wg_ext0.conf
- OpenVPN: {config_dir}/connectivity/openvpn/<name>.ovpn
Both are validated to strip / reject hook directives that could execute
arbitrary commands on the host.
"""
import ipaddress
import logging
import os
import re
import subprocess
from typing import Any, Dict, List, Optional
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
WIREGUARD_CONTAINER = 'cell-wireguard'
# Lines we strip from uploaded WireGuard configs — these can run arbitrary
# host commands when wg-quick brings the interface up/down.
_WG_FORBIDDEN_PREFIXES = ('PostUp', 'PostDown', 'PreUp', 'PreDown')
# Lines we strip from uploaded OpenVPN configs — these execute external
# scripts/binaries on connect/disconnect.
_OVPN_FORBIDDEN_DIRECTIVES = (
'up', 'down', 'script-security', 'plugin',
'route-up', 'route-pre-down',
)
_NAME_RE = re.compile(r'^[a-z0-9_-]{1,32}$')
# sshuttle / proxy configuration validation
_HOST_RE = re.compile(r'^[A-Za-z0-9]([A-Za-z0-9.-]{0,252}[A-Za-z0-9])?$')
_SSH_USER_RE = re.compile(r'^[a-z_][a-z0-9_-]{0,31}$')
_PROXY_USER_RE = re.compile(r'^[A-Za-z0-9._-]{1,64}$')
# Printable ASCII excluding double quote and backslash — safe inside the
# quoted strings of a redsocks.conf without any escaping ambiguity.
_PROXY_PASSWORD_RE = re.compile(r'^[\x20\x21\x23-\x5B\x5D-\x7E]{1,128}$')
_B64_RE = re.compile(r'^[A-Za-z0-9+/]+={0,2}$')
_KNOWN_HOSTS_HOSTS_RE = re.compile(r'^[A-Za-z0-9\[\]:.,*?_-]{1,512}$')
_SSH_KEYTYPES = (
'ssh-ed25519', 'ssh-rsa',
'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521',
)
def _contains_strict_hostkey_disable(value: str) -> bool:
"""Detect attempts to disable SSH host-key pinning (any spacing/case)."""
normalized = re.sub(r'\s+', '', value.lower())
return ('stricthostkeychecking=no' in normalized
or 'stricthostkeycheckingno' in normalized)
def _validate_host(host) -> Optional[str]:
"""Return a validated hostname/IP, or None when invalid."""
if not isinstance(host, str):
return None
host = host.strip()
if not host or '..' in host or not _HOST_RE.match(host):
return None
return host
def _validate_port(port) -> Optional[int]:
"""Return a validated TCP port (1-65535), or None when invalid."""
try:
port = int(port)
except (TypeError, ValueError):
return None
if not 1 <= port <= 65535:
return None
return port
class ConnectivityManager(BaseServiceManager):
"""Manages alternate egress paths (extended connectivity) for peers."""
EXIT_TYPES = ("default", "wireguard_ext", "openvpn", "tor",
"sshuttle", "proxy")
MARKS = {"wireguard_ext": 0x10, "openvpn": 0x20, "tor": 0x30,
"sshuttle": 0x40, "proxy": 0x50}
TABLES = {"wireguard_ext": 110, "openvpn": 120, "tor": 130,
"sshuttle": 140, "proxy": 150}
IFACES = {"wireguard_ext": "wg_ext0", "openvpn": "tun0"}
TOR_TRANS_PORT = 9040
TOR_DNS_PORT = 5353
SSHUTTLE_PORT = 12300
REDSOCKS_PORT = 12345
# Exits that work as pure iptables REDIRECTs to a local transparent-proxy
# port (no exit interface, no kill-switch interface).
REDIRECT_PORTS = {"tor": TOR_TRANS_PORT, "sshuttle": SSHUTTLE_PORT,
"proxy": REDSOCKS_PORT}
# Store-service ids / container names backing each exit type — used to
# report an exit as configured when it was installed via the Service Store
# rather than through a legacy config upload.
STORE_SERVICE_IDS = {
"wireguard_ext": "wireguard-ext",
"openvpn": "openvpn-client",
"tor": "tor",
"sshuttle": "sshuttle",
"proxy": "proxy",
}
EXIT_CONTAINERS = {
"wireguard_ext": "cell-wg-ext",
"openvpn": "cell-openvpn",
"tor": "cell-tor",
"sshuttle": "cell-sshuttle",
"proxy": "cell-redsocks",
}
# RFC1918 ranges excluded from the sshuttle tunnel by default so cell-local
# and LAN traffic is never tunneled.
RFC1918_SUBNETS = ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16")
CONNECTIVITY_CHAIN = 'PIC_CONNECTIVITY'
def __init__(self, config_manager=None, peer_registry=None,
vault_manager=None,
data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('connectivity', data_dir, config_dir)
self.config_manager = config_manager
self.peer_registry = peer_registry
self.vault_manager = vault_manager
# Connectivity configs live under the per-service data dir so that
# ${PIC_DATA_DIR}/services/<id>/config bind mounts in store compose
# templates can read them (Docker daemon resolves paths on the HOST,
# so they must be reachable via data_dir, not config_dir).
services_dir = os.path.join(data_dir, 'services')
self.wireguard_ext_dir = os.path.join(services_dir, 'wireguard-ext', 'config')
self.openvpn_dir = os.path.join(services_dir, 'openvpn-client', 'config')
self.sshuttle_dir = os.path.join(services_dir, 'sshuttle', 'config')
self.proxy_dir = os.path.join(services_dir, 'proxy', 'config')
for d in (self.wireguard_ext_dir, self.openvpn_dir,
self.sshuttle_dir, self.proxy_dir):
self.safe_makedirs(d)
# One-shot migration from the legacy config_dir/connectivity/ location.
_legacy_base = os.path.join(config_dir, 'connectivity')
self._migrate_legacy_configs(_legacy_base)
# Subscribe to ServiceBus CONFIG_CHANGED events so routes are
# reapplied if the underlying network changes. Done lazily —
# service_bus is a singleton imported at app startup.
self._subscribe_to_events()
# ── Legacy migration ──────────────────────────────────────────────────
def _migrate_legacy_configs(self, legacy_base: str) -> None:
"""Copy files from the old config_dir/connectivity/ tree to the new data_dir locations.
The old layout stored WireGuard and OpenVPN configs under the API container's
config_dir, which Docker cannot bind-mount into store-service containers. Files
are copied (not moved) so the legacy location still works until the operator
removes it manually.
"""
import shutil
pairs = (
(os.path.join(legacy_base, 'wireguard_ext'), self.wireguard_ext_dir),
(os.path.join(legacy_base, 'openvpn'), self.openvpn_dir),
)
for src_dir, dst_dir in pairs:
if not os.path.isdir(src_dir):
continue
try:
for fname in os.listdir(src_dir):
src_file = os.path.join(src_dir, fname)
dst_file = os.path.join(dst_dir, fname)
if os.path.isfile(src_file) and not os.path.exists(dst_file):
shutil.copy2(src_file, dst_file)
os.chmod(dst_file, 0o600)
logger.info('connectivity: migrated %s%s', src_file, dst_file)
except OSError as e:
logger.warning('connectivity: migration from %s failed: %s', src_dir, e)
# ── Event wiring ──────────────────────────────────────────────────────
def _subscribe_to_events(self) -> None:
"""Subscribe to network change events so routes auto-reapply."""
try:
from managers import service_bus, EventType
service_bus.subscribe_to_event(
EventType.CONFIG_CHANGED, self._on_network_changed
)
except Exception as e:
# Non-fatal: subscription is best-effort, manual apply still works.
logger.debug(f"connectivity: event subscribe skipped: {e}")
def _on_network_changed(self, event) -> None:
"""ServiceBus handler: re-apply routes when network config changes."""
try:
source = getattr(event, 'source', '')
if source not in ('network', 'wireguard', 'connectivity'):
return
logger.info(f"connectivity: re-applying routes due to {source} change")
self.apply_routes()
except Exception as e:
logger.warning(f"connectivity: on_network_changed failed (non-fatal): {e}")
# ── BaseServiceManager required ───────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
"""Return status summary including configured exits and peer count."""
try:
exits_status: Dict[str, Dict[str, Any]] = {}
for exit_type in self.EXIT_TYPES:
if exit_type == "default":
continue
exits_status[exit_type] = self._exit_status(exit_type)
peers_with_exit = 0
if self.peer_registry is not None:
try:
for peer in self.peer_registry.list_peers():
if peer.get('exit_via', 'default') != 'default':
peers_with_exit += 1
except Exception as e:
logger.warning(f"get_status: peer count failed: {e}")
return {
'service': 'connectivity',
'running': True,
'exits': exits_status,
'peers_with_exit': peers_with_exit,
}
except Exception as e:
return self.handle_error(e, 'get_status')
def test_connectivity(self) -> Dict[str, Any]:
"""Minimal connectivity self-test."""
return {'success': True}
def get_config(self) -> Dict[str, Any]:
"""Return current connectivity config from config_manager."""
try:
if self.config_manager is not None and hasattr(
self.config_manager, 'get_connectivity_config'
):
return self.config_manager.get_connectivity_config()
except Exception as e:
logger.warning(f"get_config: config_manager lookup failed: {e}")
return {'exits': {}, 'peer_exit_map': {}}
# ── Public API ────────────────────────────────────────────────────────
def list_exits(self) -> List[Dict[str, Any]]:
"""List configured exits with current status."""
result: List[Dict[str, Any]] = []
for exit_type in self.EXIT_TYPES:
if exit_type == "default":
continue
entry = {'type': exit_type}
entry.update(self._exit_status(exit_type))
result.append(entry)
return result
def get_peer_exits(self) -> Dict[str, str]:
"""Return {peer_name: exit_type} for all peers."""
out: Dict[str, str] = {}
if self.peer_registry is None:
return out
try:
for peer in self.peer_registry.list_peers():
name = peer.get('peer')
if name:
out[name] = peer.get('exit_via', 'default')
except Exception as e:
logger.warning(f"get_peer_exits: {e}")
return out
def set_peer_exit(self, peer_name: str, exit_type: str) -> Dict[str, Any]:
"""Assign a peer to an egress path and apply the rule changes."""
if exit_type not in self.EXIT_TYPES:
return {
'ok': False,
'error': f"invalid exit_type {exit_type!r}; "
f"must be one of {self.EXIT_TYPES}",
}
if not isinstance(peer_name, str) or not re.match(r'^[A-Za-z0-9_.-]{1,64}$', peer_name):
return {'ok': False, 'error': f'invalid peer_name {peer_name!r}'}
if self.peer_registry is None:
return {'ok': False, 'error': 'peer_registry not available'}
try:
ok = self.peer_registry.set_peer_exit_via(peer_name, exit_type)
except Exception as e:
logger.error(f"set_peer_exit: registry update failed: {e}")
return {'ok': False, 'error': str(e)}
if not ok:
return {'ok': False, 'error': f'peer {peer_name!r} not found'}
try:
self.apply_routes()
except Exception as e:
logger.warning(f"set_peer_exit: apply_routes failed (non-fatal): {e}")
return {'ok': True, 'peer': peer_name, 'exit_via': exit_type}
def upload_wireguard_ext(self, conf_text: str) -> Dict[str, Any]:
"""Validate and store an external WireGuard config."""
try:
cleaned = self._validate_wg_conf(conf_text)
except ValueError as e:
return {'ok': False, 'error': str(e)}
path = os.path.join(self.wireguard_ext_dir, 'wg_ext0.conf')
try:
self._write_secure(path, cleaned)
except Exception as e:
logger.error(f"upload_wireguard_ext: write failed: {e}")
return {'ok': False, 'error': str(e)}
logger.info(f"connectivity: stored wg_ext0.conf ({len(cleaned)} bytes)")
return {'ok': True}
def upload_openvpn(self, ovpn_text: str, name: str = 'default') -> Dict[str, Any]:
"""Validate and store an OpenVPN profile."""
if not isinstance(name, str) or not _NAME_RE.match(name):
return {
'ok': False,
'error': f'invalid name {name!r}; must match [a-z0-9_-]{{1,32}}',
}
try:
cleaned = self._validate_ovpn(ovpn_text)
except ValueError as e:
return {'ok': False, 'error': str(e)}
path = os.path.join(self.openvpn_dir, f'{name}.ovpn')
try:
self._write_secure(path, cleaned)
except Exception as e:
logger.error(f"upload_openvpn: write failed: {e}")
return {'ok': False, 'error': str(e)}
logger.info(f"connectivity: stored {name}.ovpn ({len(cleaned)} bytes)")
return {'ok': True}
def configure_sshuttle(self, cfg: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and store an sshuttle (SSH tunnel) exit configuration.
Requires a pinned host key (a single known_hosts line); rejects any
attempt to disable strict host-key checking. Secrets (private key /
password) are written 0o600 under data/services/sshuttle/config/ and
mirrored into the vault — they are never placed in cell_config.json.
"""
if not isinstance(cfg, dict):
return {'ok': False, 'error': 'config must be a JSON object'}
for value in cfg.values():
if isinstance(value, str) and _contains_strict_hostkey_disable(value):
return {
'ok': False,
'error': 'StrictHostKeyChecking=no is not allowed; '
'a pinned host key (known_hosts line) is required',
}
host = _validate_host(cfg.get('host'))
if not host:
return {'ok': False, 'error': 'invalid host: must be a hostname or IP'}
port = _validate_port(cfg.get('port', 22))
if port is None:
return {'ok': False, 'error': 'invalid port: must be 1-65535'}
user = cfg.get('user')
if not isinstance(user, str) or not _SSH_USER_RE.match(user):
return {
'ok': False,
'error': 'invalid user: must match ^[a-z_][a-z0-9_-]{0,31}$',
}
auth = cfg.get('auth', 'key')
if auth not in ('key', 'password'):
return {'ok': False, 'error': "invalid auth: must be 'key' or 'password'"}
known_hosts = cfg.get('known_hosts')
err = self._validate_known_hosts_line(known_hosts)
if err:
return {'ok': False, 'error': err}
known_hosts = known_hosts.strip()
private_key = ''
password = ''
if auth == 'key':
private_key = cfg.get('private_key', '')
if not isinstance(private_key, str) or 'PRIVATE KEY' not in private_key:
return {
'ok': False,
'error': 'private_key is required for key auth and must be '
'a PEM/OpenSSH private key',
}
else:
password = cfg.get('password', '')
if not isinstance(password, str) or not password or '\n' in password:
return {'ok': False, 'error': 'password is required for password auth'}
exclude_subnets = cfg.get('exclude_subnets')
if exclude_subnets is None:
exclude_subnets = self._default_exclude_subnets()
if not isinstance(exclude_subnets, list):
return {'ok': False, 'error': 'exclude_subnets must be a list of CIDRs'}
validated_excludes = []
for net in exclude_subnets:
try:
validated_excludes.append(str(ipaddress.ip_network(str(net), strict=False)))
except ValueError:
return {'ok': False, 'error': f'invalid exclude subnet: {net!r}'}
conf_lines = [
f'HOST={host}',
f'PORT={port}',
f'USER={user}',
f'AUTH={auth}',
f'LISTEN_PORT={self.SSHUTTLE_PORT}',
f'EXCLUDE={",".join(validated_excludes)}',
]
try:
self._write_secure(
os.path.join(self.sshuttle_dir, 'known_hosts'),
known_hosts + '\n',
)
if auth == 'key':
key_text = private_key.rstrip('\n') + '\n'
self._write_secure(os.path.join(self.sshuttle_dir, 'id_pic'), key_text)
else:
self._write_secure(
os.path.join(self.sshuttle_dir, 'password'), password + '\n')
self._write_secure(
os.path.join(self.sshuttle_dir, 'sshuttle.conf'),
'\n'.join(conf_lines) + '\n',
)
except Exception as e:
logger.error(f"configure_sshuttle: write failed: {e}")
return {'ok': False, 'error': 'failed to write sshuttle configuration'}
if self.vault_manager is not None:
try:
if auth == 'key':
self.vault_manager.store_secret('connectivity_sshuttle_key',
private_key)
else:
self.vault_manager.store_secret('connectivity_sshuttle_password',
password)
except Exception as e:
logger.warning(f"configure_sshuttle: vault store failed: {e}")
self._persist_exit_config('sshuttle', {
'host': host,
'port': port,
'user': user,
'auth': auth,
'exclude_subnets': validated_excludes,
})
logger.info(f"connectivity: configured sshuttle exit ({user}@{host}:{port})")
return {'ok': True}
def configure_proxy(self, cfg: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and store an upstream proxy (redsocks) exit configuration.
Generates redsocks.conf from strictly validated fields only — no value
that could break out of the quoted config strings is accepted. The
password lives in the 0o600 conf file, never in compose env.
"""
if not isinstance(cfg, dict):
return {'ok': False, 'error': 'config must be a JSON object'}
scheme = cfg.get('scheme')
if scheme not in ('http', 'socks5'):
return {'ok': False, 'error': "invalid scheme: must be 'http' or 'socks5'"}
host = _validate_host(cfg.get('host'))
if not host:
return {'ok': False, 'error': 'invalid host: must be a hostname or IP'}
port = _validate_port(cfg.get('port'))
if port is None:
return {'ok': False, 'error': 'invalid port: must be 1-65535'}
user = cfg.get('user') or ''
password = cfg.get('password') or ''
if user and not (isinstance(user, str) and _PROXY_USER_RE.match(user)):
return {
'ok': False,
'error': 'invalid user: must match ^[A-Za-z0-9._-]{1,64}$',
}
if password and not (isinstance(password, str)
and _PROXY_PASSWORD_RE.match(password)):
return {
'ok': False,
'error': 'invalid password: 1-128 printable ASCII characters '
'excluding double quotes and backslashes',
}
if password and not user:
return {'ok': False, 'error': 'password requires a user'}
conf = self._render_redsocks_conf(scheme, host, port, user, password)
try:
self._write_secure(os.path.join(self.proxy_dir, 'redsocks.conf'), conf)
except Exception as e:
logger.error(f"configure_proxy: write failed: {e}")
return {'ok': False, 'error': 'failed to write redsocks configuration'}
self._persist_exit_config('proxy', {
'scheme': scheme,
'host': host,
'port': port,
'user': user,
})
logger.info(f"connectivity: configured proxy exit ({scheme}://{host}:{port})")
return {'ok': True}
def _render_redsocks_conf(self, scheme: str, host: str, port: int,
user: str, password: str) -> str:
"""Build a redsocks.conf from already-validated fields."""
redsocks_type = 'socks5' if scheme == 'socks5' else 'http-connect'
lines = [
'base {',
' log_debug = off;',
' log_info = on;',
' log = stderr;',
' daemon = off;',
' redirector = iptables;',
'}',
'',
'redsocks {',
' local_ip = 0.0.0.0;',
f' local_port = {self.REDSOCKS_PORT};',
f' ip = {host};',
f' port = {port};',
f' type = {redsocks_type};',
]
if user:
lines.append(f' login = "{user}";')
if password:
lines.append(f' password = "{password}";')
lines.append('}')
return '\n'.join(lines) + '\n'
@staticmethod
def _validate_known_hosts_line(line) -> Optional[str]:
"""Validate a single known_hosts line; return an error string or None.
Expected format: host[,ip] keytype base64key [comment]
"""
if not isinstance(line, str) or not line.strip():
return ('known_hosts is required: a pinned host key line '
'(host[,ip] keytype base64key)')
line = line.strip()
if '\n' in line or '\r' in line:
return 'known_hosts must be a single line'
parts = line.split()
if len(parts) < 3:
return ('invalid known_hosts line: expected '
'host[,ip] keytype base64key')
hosts, keytype, key = parts[0], parts[1], parts[2]
if not _KNOWN_HOSTS_HOSTS_RE.match(hosts):
return f'invalid known_hosts host field: {hosts!r}'
if keytype not in _SSH_KEYTYPES:
return (f'invalid known_hosts key type {keytype!r}; '
f'must be one of {_SSH_KEYTYPES}')
if not _B64_RE.match(key):
return 'invalid known_hosts key: not valid base64'
return None
def _default_exclude_subnets(self) -> List[str]:
"""Cell subnet + RFC1918 ranges — internal traffic is never tunneled."""
excludes = list(self.RFC1918_SUBNETS)
try:
if self.config_manager is not None:
identity = self.config_manager.get_identity()
if isinstance(identity, dict):
ip_range = identity.get('ip_range', '')
if isinstance(ip_range, str) and '/' in ip_range \
and ip_range not in excludes:
excludes.insert(0, ip_range)
except Exception as e:
logger.debug(f"_default_exclude_subnets: {e}")
return excludes
def _persist_exit_config(self, exit_type: str, fields: Dict[str, Any]) -> None:
"""Persist non-secret exit fields under connectivity.exits in config."""
if self.config_manager is None:
return
try:
cfg = self.config_manager.get_connectivity_config()
exits = cfg.get('exits') if isinstance(cfg, dict) else None
exits = dict(exits) if isinstance(exits, dict) else {}
exits[exit_type] = fields
self.config_manager.set_connectivity_field('exits', exits)
except Exception as e:
logger.warning(f"_persist_exit_config({exit_type}): {e}")
# ── Routing application ───────────────────────────────────────────────
def apply_routes(self) -> Dict[str, Any]:
"""Idempotently rebuild all connectivity rules and policy routing."""
rules_applied = 0
try:
self._ensure_chains()
except Exception as e:
logger.warning(f"apply_routes: _ensure_chains failed: {e}")
# Flush our dedicated chains (without deleting them)
for table, chain in (('mangle', self.CONNECTIVITY_CHAIN),
('nat', self.CONNECTIVITY_CHAIN)):
try:
self._flush_chain(table, chain)
except Exception as e:
logger.warning(f"apply_routes: flush {table}/{chain} failed: {e}")
# Idempotent ip rule registration for each non-default exit
for exit_type in self.MARKS:
mark = self.MARKS[exit_type]
table = self.TABLES[exit_type]
try:
self._remove_ip_rule(mark, table)
self._add_ip_rule(mark, table)
rules_applied += 1
except Exception as e:
logger.warning(f"apply_routes: ip rule {exit_type} failed: {e}")
# Per-peer marking + nat redirect (Tor only)
if self.peer_registry is not None:
try:
peers = self.peer_registry.list_peers()
except Exception as e:
logger.warning(f"apply_routes: list_peers failed: {e}")
peers = []
for peer in peers:
exit_via = peer.get('exit_via', 'default')
if exit_via == 'default' or exit_via not in self.MARKS:
continue
src_ip = self._peer_source_ip(peer.get('peer', ''))
if not src_ip:
continue
mark = self.MARKS[exit_via]
try:
self._add_mark_rule(src_ip, mark)
rules_applied += 1
except Exception as e:
logger.warning(
f"apply_routes: mark rule for {src_ip}/{exit_via}: {e}"
)
# Tor / sshuttle / proxy: redirect TCP to the local
# transparent-proxy port for that exit.
if exit_via in self.REDIRECT_PORTS:
try:
self._add_redirect(src_ip, self.REDIRECT_PORTS[exit_via])
rules_applied += 1
except Exception as e:
logger.warning(
f"apply_routes: {exit_via} redirect for {src_ip}: {e}"
)
# Kill-switch: drop marked packets that would otherwise leak via the
# default route if the exit interface is down.
for exit_type, iface in self.IFACES.items():
mark = self.MARKS[exit_type]
try:
self._add_killswitch(mark, iface)
rules_applied += 1
except Exception as e:
logger.warning(f"apply_routes: killswitch {exit_type}: {e}")
return {'ok': True, 'rules_applied': rules_applied}
# ── iptables / ip rule helpers ────────────────────────────────────────
def _wg_iptables(self, args: List[str], timeout: int = 10) -> subprocess.CompletedProcess:
"""Run iptables inside the WireGuard container (where peer traffic forwards)."""
cmd = ['docker', 'exec', WIREGUARD_CONTAINER, 'iptables'] + args
return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
def _wg_ip(self, args: List[str], timeout: int = 10) -> subprocess.CompletedProcess:
"""Run `ip` inside the WireGuard container."""
cmd = ['docker', 'exec', WIREGUARD_CONTAINER, 'ip'] + args
return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
def _ensure_chains(self) -> None:
"""Create PIC_CONNECTIVITY chains in mangle and nat (idempotent)."""
for table, parent_chain in (
('mangle', 'PREROUTING'),
('nat', 'PREROUTING'),
):
# Create chain if it doesn't already exist
check = self._wg_iptables(
['-t', table, '-L', self.CONNECTIVITY_CHAIN, '-n']
)
if check.returncode != 0:
create = self._wg_iptables(
['-t', table, '-N', self.CONNECTIVITY_CHAIN]
)
if create.returncode != 0 and 'exists' not in (create.stderr or ''):
logger.warning(
f"_ensure_chains: cannot create {table}/{self.CONNECTIVITY_CHAIN}: "
f"{create.stderr.strip()}"
)
# Insert jump from parent chain at position 1, idempotent.
jump_args = ['-t', table, '-C', parent_chain, '-j', self.CONNECTIVITY_CHAIN]
exists = self._wg_iptables(jump_args)
if exists.returncode != 0:
self._wg_iptables(
['-t', table, '-I', parent_chain, '1',
'-j', self.CONNECTIVITY_CHAIN]
)
def _flush_chain(self, table: str, chain: str) -> None:
"""Flush a chain in-place (`iptables -F`) without deleting it."""
self._wg_iptables(['-t', table, '-F', chain])
def _add_ip_rule(self, mark: int, table: int) -> None:
"""Add `ip rule fwmark <mark> lookup <table>`."""
self._wg_ip(['rule', 'add', 'fwmark', hex(mark), 'lookup', str(table)])
def _remove_ip_rule(self, mark: int, table: int) -> None:
"""Remove all matching `ip rule fwmark <mark> lookup <table>` (idempotent)."""
# `ip rule del` returns nonzero when no matching rule exists; loop
# until it fails to drain duplicates.
for _ in range(8):
r = self._wg_ip(['rule', 'del', 'fwmark', hex(mark), 'lookup', str(table)])
if r.returncode != 0:
break
def _add_mark_rule(self, src_ip: str, mark: int) -> None:
"""Mark packets from src_ip with mark in the mangle PIC_CONNECTIVITY chain."""
self._wg_iptables([
'-t', 'mangle', '-A', self.CONNECTIVITY_CHAIN,
'-s', src_ip,
'-j', 'MARK', '--set-mark', hex(mark),
])
def _add_redirect(self, src_ip: str, port: int) -> None:
"""Redirect peer's TCP traffic to a local transparent-proxy port."""
self._wg_iptables([
'-t', 'nat', '-A', self.CONNECTIVITY_CHAIN,
'-s', src_ip, '-p', 'tcp',
'-j', 'REDIRECT', '--to-ports', str(port),
])
def _add_tor_redirect(self, src_ip: str) -> None:
"""Redirect peer's TCP traffic to local Tor TransPort."""
self._add_redirect(src_ip, self.TOR_TRANS_PORT)
def _add_killswitch(self, mark: int, iface: Optional[str]) -> None:
"""Drop marked packets that would egress via any interface other than iface.
For Tor (no exit iface), skip — Tor traffic is fully redirected at
nat/REDIRECT and never reaches FORWARD.
"""
if not iface:
return
# Use -C to test, -A to add — idempotent.
check_args = ['-C', 'FORWARD',
'-m', 'mark', '--mark', hex(mark),
'!', '-o', iface, '-j', 'DROP']
exists = self._wg_iptables(check_args)
if exists.returncode != 0:
self._wg_iptables(['-A', 'FORWARD',
'-m', 'mark', '--mark', hex(mark),
'!', '-o', iface, '-j', 'DROP'])
def _exit_status(self, exit_type: str) -> Dict[str, Any]:
"""Return per-exit status (config presence + interface up/down).
An exit counts as configured when a legacy uploaded config file
exists, OR the backing store service is installed, OR its container
is running — exits installed via the Service Store never create the
legacy upload files.
"""
info: Dict[str, Any] = {'configured': False, 'iface_up': False}
if exit_type == 'wireguard_ext':
path = os.path.join(self.wireguard_ext_dir, 'wg_ext0.conf')
info['configured'] = os.path.isfile(path)
elif exit_type == 'openvpn':
try:
info['configured'] = any(
f.endswith('.ovpn')
for f in os.listdir(self.openvpn_dir)
)
except OSError:
info['configured'] = False
elif exit_type == 'tor':
info['configured'] = True # Tor uses defaults; no per-cell config
elif exit_type == 'sshuttle':
info['configured'] = os.path.isfile(
os.path.join(self.sshuttle_dir, 'sshuttle.conf'))
elif exit_type == 'proxy':
info['configured'] = os.path.isfile(
os.path.join(self.proxy_dir, 'redsocks.conf'))
if not info['configured'] and (
self._store_service_installed(exit_type)
or self._exit_container_running(exit_type)
):
info['configured'] = True
iface = self.IFACES.get(exit_type)
if iface:
try:
r = self._wg_ip(['link', 'show', iface], timeout=5)
info['iface_up'] = r.returncode == 0 and 'UP' in (r.stdout or '')
except Exception:
info['iface_up'] = False
if info['iface_up']:
info['status'] = 'active'
elif info['configured']:
info['status'] = 'configured'
else:
info['status'] = 'not_configured'
return info
def _store_service_installed(self, exit_type: str) -> bool:
"""True when the store service backing this exit type is installed."""
svc_id = self.STORE_SERVICE_IDS.get(exit_type)
if not svc_id or self.config_manager is None:
return False
try:
installed = self.config_manager.get_installed_services()
except Exception as e:
logger.debug(f"_store_service_installed({exit_type}): {e}")
return False
return isinstance(installed, dict) and svc_id in installed
def _exit_container_running(self, exit_type: str) -> bool:
"""True when the exit's container is currently running."""
cname = self.EXIT_CONTAINERS.get(exit_type)
if not cname:
return False
try:
r = subprocess.run(
['docker', 'inspect', '-f', '{{.State.Running}}', cname],
capture_output=True, text=True, timeout=5,
)
return r.returncode == 0 and r.stdout.strip() == 'true'
except Exception:
return False
def _peer_source_ip(self, peer_name: str) -> Optional[str]:
"""Return a peer's WireGuard IP (no /CIDR suffix)."""
if not peer_name or self.peer_registry is None:
return None
try:
peer = self.peer_registry.get_peer(peer_name)
except Exception as e:
logger.warning(f"_peer_source_ip({peer_name}): {e}")
return None
if not peer:
return None
ip = peer.get('ip', '')
if not ip:
return None
return ip.split('/')[0]
# ── Config validation ─────────────────────────────────────────────────
def _validate_wg_conf(self, text: str) -> str:
"""Strip Pre/Post-Up/Down hooks and reject conflicting wg0 interface.
Raises ValueError if the config tries to define `Interface = wg0`
(which would clash with the existing peer-server interface).
"""
if not isinstance(text, str):
raise ValueError('wg conf must be a string')
cleaned: List[str] = []
for raw_line in text.splitlines():
stripped = raw_line.strip()
# Reject wg0 interface declaration that would conflict with the
# existing WireGuard server interface.
if stripped.lower().startswith('interface'):
# Look ahead in subsequent lines for `= wg0` would be hard;
# the [Interface] section header itself is fine. We only
# reject explicit Name/Interface = wg0 directives.
pass
# Match assignments like `PostUp = ...`
if '=' in stripped:
key = stripped.split('=', 1)[0].strip()
if key in _WG_FORBIDDEN_PREFIXES:
logger.info(f"_validate_wg_conf: dropped {key} hook")
continue
# Detect Name = wg0 or Interface = wg0 inside Interface section
if key.lower() in ('name', 'interface') and \
stripped.split('=', 1)[1].strip().lower() == 'wg0':
raise ValueError(
"config defines interface 'wg0' which conflicts "
"with the peer-server interface"
)
cleaned.append(raw_line)
return '\n'.join(cleaned).rstrip() + '\n'
def _validate_ovpn(self, text: str) -> str:
"""Strip directives that execute external scripts/binaries."""
if not isinstance(text, str):
raise ValueError('ovpn conf must be a string')
cleaned: List[str] = []
for raw_line in text.splitlines():
stripped = raw_line.strip()
# Match the directive name (first whitespace-delimited token).
if stripped and not stripped.startswith('#'):
first = stripped.split(None, 1)[0]
if first in _OVPN_FORBIDDEN_DIRECTIVES:
logger.info(f"_validate_ovpn: dropped {first} directive")
continue
cleaned.append(raw_line)
return '\n'.join(cleaned).rstrip() + '\n'
# ── Filesystem helpers ────────────────────────────────────────────────
@staticmethod
def _write_secure(path: str, text: str) -> None:
"""Atomic 0o600 write — secrets in these configs must not be world-readable."""
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = path + '.tmp'
fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
try:
with os.fdopen(fd, 'w') as f:
f.write(text)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise
os.chmod(tmp, 0o600)
os.replace(tmp, path)
os.chmod(path, 0o600)