Files
pic/api/wireguard_manager.py
T
roof 4a9c4cc58b fix: add kernel routes for cell peers after wg set
wg set updates WireGuard peer state but does not add kernel routes —
unlike wg-quick. Without ip route add, traffic to a remote cell's
vpn_subnet is routed via the default gateway (internet) instead of wg0,
causing all cross-cell pushes to time out with HTTP 000.

- add_cell_peer() now calls _ensure_cell_route(vpn_subnet) after
  writing the peer config and running _syncconf
- _ensure_cell_route() runs docker exec cell-wireguard ip route add
  (idempotent, non-fatal); no-op inside test dirs
- sync_cell_routes() parses wg0.conf at startup to re-add any routes
  lost across container restarts; called from _apply_startup_enforcement
- 5 new unit tests covering both normal and test-dir no-op paths

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 14:47:22 -04:00

898 lines
38 KiB
Python

#!/usr/bin/env python3
"""
WireGuard Manager for Personal Internet Cell
"""
import os
import re
import json
import base64
import socket
import subprocess
import logging
import time
from datetime import datetime
from typing import Dict, List, Optional, Any
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from base_service_manager import BaseServiceManager
try:
import requests as _requests
except ImportError:
_requests = None
logger = logging.getLogger(__name__)
SERVER_ADDRESS = '10.0.0.1/24'
SERVER_NETWORK = '10.0.0.0/24'
DEFAULT_PORT = 51820
def _resolve_peer_dns() -> str:
"""Resolve cell-dns container IP at runtime; fall back to known default."""
for hostname in ('cell-dns',):
try:
return socket.gethostbyname(hostname)
except OSError:
pass
return '172.20.0.3'
class WireGuardManager(BaseServiceManager):
"""Manages WireGuard VPN configuration and peers"""
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('wireguard', data_dir, config_dir)
self.wireguard_dir = os.path.join(config_dir, 'wireguard')
self.keys_dir = os.path.join(data_dir, 'wireguard', 'keys')
self.peers_dir = os.path.join(data_dir, 'wireguard', 'peers')
self.safe_makedirs(self.wireguard_dir)
self.safe_makedirs(self.keys_dir)
self.safe_makedirs(os.path.join(self.keys_dir, 'peers'))
self.safe_makedirs(self.peers_dir)
self._ensure_server_keys()
# ── Key management ────────────────────────────────────────────────────────
@staticmethod
def _generate_keypair():
"""Return (private_bytes, public_bytes) using X25519."""
priv = X25519PrivateKey.generate()
return priv.private_bytes_raw(), priv.public_key().public_bytes_raw()
def _ensure_server_keys(self):
priv_file = os.path.join(self.keys_dir, 'private.key')
pub_file = os.path.join(self.keys_dir, 'public.key')
if not os.path.exists(priv_file):
try:
priv_bytes, pub_bytes = self._generate_keypair()
with open(priv_file, 'wb') as f:
f.write(priv_bytes)
with open(pub_file, 'wb') as f:
f.write(pub_bytes)
except (PermissionError, OSError):
pass
def get_keys(self) -> Dict[str, str]:
"""Return server public/private keys as base64 strings. Generates them if missing."""
priv_file = os.path.join(self.keys_dir, 'private.key')
pub_file = os.path.join(self.keys_dir, 'public.key')
if not os.path.exists(priv_file):
self._ensure_server_keys()
if not os.path.exists(priv_file):
return {'private_key': '', 'public_key': ''}
with open(priv_file, 'rb') as f:
priv = f.read()
with open(pub_file, 'rb') as f:
pub = f.read()
return {
'private_key': base64.b64encode(priv).decode(),
'public_key': base64.b64encode(pub).decode(),
}
def generate_peer_keys(self, peer_name: str) -> Dict[str, str]:
"""Generate a keypair for a peer, save to keys_dir/peers/, return as base64."""
if not isinstance(peer_name, str) or not re.match(r'^[A-Za-z0-9_.-]{1,64}$', peer_name):
raise ValueError(f"Invalid peer_name: {peer_name!r}")
priv_bytes, pub_bytes = self._generate_keypair()
priv_b64 = base64.b64encode(priv_bytes).decode()
pub_b64 = base64.b64encode(pub_bytes).decode()
peer_keys_dir = os.path.join(self.keys_dir, 'peers')
with open(os.path.join(peer_keys_dir, f'{peer_name}_private.key'), 'w') as f:
f.write(priv_b64)
with open(os.path.join(peer_keys_dir, f'{peer_name}_public.key'), 'w') as f:
f.write(pub_b64)
return {'private_key': priv_b64, 'public_key': pub_b64, 'peer_name': peer_name}
# ── Config generation ─────────────────────────────────────────────────────
def get_config(self, interface: str = 'wg0', port: int = DEFAULT_PORT):
"""Return server config (alias for generate_config, returns dict for API compat)."""
return {'config': self.generate_config(interface, port)}
def generate_config(self, interface: str = 'wg0', port: int = DEFAULT_PORT) -> str:
"""Return a WireGuard [Interface] config string for the server."""
import ipaddress
keys = self.get_keys()
ext_ip = self.get_external_ip() or ''
address = self._get_configured_address() if os.path.exists(self._config_file()) else SERVER_ADDRESS
server_ip = str(ipaddress.ip_interface(address).ip)
hairpin = (
f'iptables -t nat -A PREROUTING -i %i -d {ext_ip} -j DNAT --to-destination {server_ip}; '
if ext_ip else ''
)
hairpin_down = (
f'iptables -t nat -D PREROUTING -i %i -d {ext_ip} -j DNAT --to-destination {server_ip}; '
if ext_ip else ''
)
cfg_port = self._get_configured_port() if os.path.exists(self._config_file()) else port
return (
f'[Interface]\n'
f'PrivateKey = {keys["private_key"]}\n'
f'Address = {address}\n'
f'ListenPort = {cfg_port}\n'
f'PostUp = iptables -A FORWARD -i %i -j ACCEPT; '
f'iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE; '
f'{hairpin}'
f'sysctl -q net.ipv4.conf.all.rp_filter=0 || true\n'
f'PostDown = iptables -D FORWARD -i %i -j ACCEPT; '
f'iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE; '
f'{hairpin_down}'
f'sysctl -q net.ipv4.conf.all.rp_filter=1 || true\n'
)
def _config_file(self) -> str:
# linuxserver/wireguard stores configs in wg_confs/
wg_confs = os.path.join(self.wireguard_dir, 'wg_confs')
if os.path.isdir(wg_confs):
return os.path.join(wg_confs, 'wg0.conf')
return os.path.join(self.wireguard_dir, 'wg0.conf')
def _read_config(self) -> str:
cf = self._config_file()
if os.path.exists(cf):
with open(cf, 'r') as f:
return f.read()
return self.generate_config()
def _write_config(self, content: str):
with open(self._config_file(), 'w') as f:
f.write(content)
self._syncconf()
# ── Config value readers (always read from wg0.conf, never hardcode) ─────
def _read_iface_field(self, key: str) -> Optional[str]:
"""Return the value of a field from the [Interface] section of wg0.conf."""
cf = self._config_file()
if not os.path.exists(cf):
return None
with open(cf) as f:
in_iface = False
for line in f:
stripped = line.strip()
if stripped == '[Interface]':
in_iface = True
elif stripped.startswith('[') and stripped.endswith(']'):
in_iface = False
elif in_iface and '=' in stripped:
k, _, v = stripped.partition('=')
if k.strip() == key:
return v.strip()
return None
def _get_configured_port(self) -> int:
val = self._read_iface_field('ListenPort')
try:
return int(val) if val else DEFAULT_PORT
except (ValueError, TypeError):
return DEFAULT_PORT
def _get_configured_address(self) -> str:
return self._read_iface_field('Address') or SERVER_ADDRESS
def _get_configured_network(self) -> str:
import ipaddress
addr = self._get_configured_address()
try:
return str(ipaddress.ip_network(addr, strict=False))
except Exception:
return SERVER_NETWORK
def get_split_tunnel_ips(self) -> str:
"""Return split-tunnel AllowedIPs: VPN subnet + Docker bridge."""
return f'{self._get_configured_network()}, 172.20.0.0/16'
def _load_registered_peers(self) -> list:
"""Read active peers from peers.json for wg0.conf reconstruction after bootstrap."""
import json as _json
peers_file = os.path.join(self.data_dir, 'peers.json')
try:
with open(peers_file) as f:
peers = _json.load(f)
return [
p for p in peers
if isinstance(p, dict)
and p.get('active', True)
and p.get('public_key')
and p.get('ip')
]
except Exception:
return []
def _sync_keys_from_conf(self) -> None:
"""Sync the API's key store from wg0.conf so both agree on the server identity.
linuxserver/wireguard auto-generates a PrivateKey on first container start.
The API generates its own key independently. Any time apply_config() runs,
read the PrivateKey from wg0.conf (the container's authoritative source) and
update the API's key-store files to match — keeping get_keys() consistent.
"""
import base64 as _b64
cf = self._config_file()
if not os.path.exists(cf):
return
try:
with open(cf) as f:
raw = f.read()
for line in raw.splitlines():
stripped = line.strip()
if stripped.startswith('PrivateKey'):
conf_priv = stripped.split('=', 1)[1].strip()
api_keys = self.get_keys()
if conf_priv == api_keys.get('private_key'):
return # already in sync
# Derive public key from private key and update both files
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
priv_bytes = _b64.b64decode(conf_priv)
priv_obj = X25519PrivateKey.from_private_bytes(priv_bytes)
pub_bytes = priv_obj.public_key().public_bytes_raw()
pub_b64 = _b64.b64encode(pub_bytes).decode()
priv_file = os.path.join(self.keys_dir, 'private.key')
pub_file = os.path.join(self.keys_dir, 'public.key')
with open(priv_file, 'wb') as f:
f.write(priv_bytes)
with open(pub_file, 'wb') as f:
f.write(pub_bytes)
logger.info(f'wg: key-store synced from wg0.conf (new pub={pub_b64[:16]}...)')
return
except Exception as e:
logger.warning(f'_sync_keys_from_conf failed (non-fatal): {e}')
def apply_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Update wg0.conf interface fields and restart cell-wireguard."""
restarted = []
warnings = []
cf = self._config_file()
if not os.path.exists(cf):
warnings.append('wg0.conf not found — skipping')
return {'restarted': restarted, 'warnings': warnings}
try:
# Sync the API key-store from wg0.conf before doing anything else.
# linuxserver/wireguard auto-generates its own key; this keeps both in sync
# so get_peer_config() always embeds the correct server public key.
self._sync_keys_from_conf()
with open(cf) as f:
raw = f.read()
# Bootstrap from generate_config() if file is empty or has no [Interface]
if not raw.strip() or '[Interface]' not in raw:
raw = self.generate_config()
# Restore all registered peers so clients can reconnect immediately
for peer in self._load_registered_peers():
raw += (
f'\n[Peer]\n'
f'# {peer.get("peer", "unknown")}\n'
f'PublicKey = {peer["public_key"]}\n'
f'AllowedIPs = {peer["ip"]}/32\n'
f'PersistentKeepalive = 25\n'
)
with open(cf, 'w') as f:
f.write(raw)
warnings.append('wg0.conf was empty — regenerated from keys')
lines = raw.splitlines(keepends=True)
def _set_iface_field(lines, key, value):
result = []
for l in lines:
if l.strip().startswith(f'{key} =') or l.strip().startswith(f'{key}='):
result.append(f'{key} = {value}\n')
else:
result.append(l)
return result
changed = False
port_only_change = True
if 'port' in config and config['port']:
lines = _set_iface_field(lines, 'ListenPort', config['port'])
changed = True
if 'address' in config and config['address']:
lines = _set_iface_field(lines, 'Address', config['address'])
changed = True
port_only_change = False
if 'private_key' in config and config['private_key']:
lines = _set_iface_field(lines, 'PrivateKey', config['private_key'])
changed = True
port_only_change = False
if changed:
with open(cf, 'w') as f:
f.writelines(lines)
# Port-only changes: docker binding must be updated first via pending restart.
# Non-port changes (address, private_key) can restart immediately.
if not port_only_change:
self._restart_container('cell-wireguard')
restarted.append('cell-wireguard')
except Exception as e:
warnings.append(f"wg0.conf update failed: {e}")
logger.error(f"apply_config error: {e}")
return {'restarted': restarted, 'warnings': warnings}
def _syncconf(self):
"""Sync live WireGuard peers using 'wg set' — never touches [Interface] settings.
wg syncconf resets the ListenPort when given a peers-only config,
breaking client connections. We diff the config file against the live
interface and add/remove peers individually instead.
SAFETY: if the config file is not under the real wireguard config dir
(e.g. a test temp dir), bail out immediately — never touch the live container.
"""
import subprocess, re
real_conf = self._config_file()
if '/tmp/' in real_conf or 'pytest' in real_conf:
logger.debug('_syncconf: skipping — config path looks like a test dir')
return
try:
# Parse desired peers from config file
content = self._read_config()
desired: dict = {}
current_peer = None
for line in content.splitlines():
line = line.strip()
if line == '[Peer]':
current_peer = {}
elif current_peer is not None:
if line.startswith('PublicKey'):
current_peer['pub'] = line.split('=', 1)[1].strip()
elif line.startswith('AllowedIPs'):
current_peer['ips'] = line.split('=', 1)[1].strip()
elif line.startswith('PersistentKeepalive'):
current_peer['ka'] = line.split('=', 1)[1].strip()
elif line.startswith('Endpoint'):
current_peer['endpoint'] = line.split('=', 1)[1].strip()
elif line == '' and 'pub' in current_peer:
desired[current_peer['pub']] = current_peer
current_peer = None
if current_peer and 'pub' in current_peer:
desired[current_peer['pub']] = current_peer
# Get live peers
dump = subprocess.run(
['docker', 'exec', 'cell-wireguard', 'wg', 'show', 'wg0', 'dump'],
capture_output=True, text=True, timeout=5
)
live_pubs = set()
for line in dump.stdout.splitlines():
parts = line.split('\t')
if len(parts) >= 4 and parts[0] not in ('(none)', ''):
live_pubs.add(parts[0])
# Remove peers no longer in config
for pub in live_pubs - set(desired):
subprocess.run(
['docker', 'exec', 'cell-wireguard', 'wg', 'set', 'wg0',
'peer', pub, 'remove'],
capture_output=True, timeout=5
)
logger.info(f'wg: removed peer {pub[:16]}...')
# Add/update peers in config
for pub, p in desired.items():
args = ['docker', 'exec', 'cell-wireguard', 'wg', 'set', 'wg0',
'peer', pub,
'allowed-ips', p.get('ips', ''),
'persistent-keepalive', p.get('ka', '25')]
if p.get('endpoint'):
args += ['endpoint', p['endpoint']]
subprocess.run(args, capture_output=True, timeout=5)
logger.info(f'wg set applied: {len(desired)} peers')
except Exception as e:
logger.warning(f'_syncconf failed (non-fatal): {e}')
# ── Peer CRUD ─────────────────────────────────────────────────────────────
def add_peer(self, name: str, public_key: str, endpoint_ip: str,
allowed_ips: str = SERVER_NETWORK,
persistent_keepalive: int = 25) -> bool:
"""Add a [Peer] block to wg0.conf.
Server-side AllowedIPs must be the peer's specific VPN IP (/32).
Passing full-tunnel or split-tunnel CIDRs here would cause the server
to route all internet or LAN traffic to that peer — breaking everything.
"""
import ipaddress, re as _re
if not isinstance(public_key, str) or not _re.match(r'^[A-Za-z0-9+/]{43}=$', public_key.strip()):
return False # invalid WireGuard public key
if name and not _re.match(r'^[A-Za-z0-9_. -]{1,64}$', name):
return False # reject names with newlines/brackets
if endpoint_ip:
try:
ipaddress.ip_address(endpoint_ip.strip())
except ValueError:
return False
try:
# Enforce /32: reject any CIDR wider than a single host
for cidr in (c.strip() for c in allowed_ips.split(',')):
try:
net = ipaddress.ip_network(cidr, strict=False)
if net.prefixlen < 32 and not cidr.endswith('/32'):
raise ValueError(
f"Server-side AllowedIPs must be a /32 host address, got '{cidr}'. "
"Full/split tunnel CIDRs belong in the CLIENT config only."
)
except ValueError as ve:
raise ve
content = self._read_config()
peer_block = (
f'\n[Peer]\n'
f'# {name}\n'
f'PublicKey = {public_key}\n'
f'AllowedIPs = {allowed_ips}\n'
f'PersistentKeepalive = {persistent_keepalive}\n'
)
if endpoint_ip:
peer_block += f'Endpoint = {endpoint_ip}:{self._get_configured_port()}\n'
self._write_config(content + peer_block)
return True
except Exception as e:
logger.error(f'add_peer failed: {e}')
return False
def add_cell_peer(self, name: str, public_key: str, endpoint: str, vpn_subnet: str) -> bool:
"""Add a site-to-site [Peer] block for another PIC cell.
Unlike add_peer(), allows a subnet CIDR as AllowedIPs (whole remote VPN range).
The endpoint is expected to already include the port (e.g. '1.2.3.4:51820').
"""
import ipaddress, re as _re
# Validate public_key strictly — empty/garbled keys later cause remove_peer("")
# to wipe ALL peer blocks via substring match.
if not isinstance(public_key, str) or not _re.match(r'^[A-Za-z0-9+/]{43}=$', public_key.strip()):
logger.error(f'add_cell_peer: invalid public_key')
return False
# Validate name — reject newlines/brackets that could inject config blocks
if not isinstance(name, str) or not _re.match(r'^[A-Za-z0-9_. -]{1,64}$', name):
logger.error(f'add_cell_peer: invalid name {name!r}')
return False
# Validate endpoint as host:port — reject newlines and out-of-range ports
if endpoint:
if not isinstance(endpoint, str) or not _re.match(r'^[A-Za-z0-9._-]+:\d{1,5}$', endpoint):
logger.error(f'add_cell_peer: invalid endpoint {endpoint!r}')
return False
try:
_port = int(endpoint.rsplit(':', 1)[1])
if not (1 <= _port <= 65535):
logger.error(f'add_cell_peer: endpoint port out of range: {endpoint!r}')
return False
except (ValueError, IndexError):
logger.error(f'add_cell_peer: invalid endpoint port: {endpoint!r}')
return False
try:
remote_net = ipaddress.ip_network(vpn_subnet, strict=False)
except ValueError as e:
logger.error(f'add_cell_peer: invalid vpn_subnet {vpn_subnet!r}: {e}')
return False
# Reject any whitespace/newlines in vpn_subnet that ip_network() may have tolerated
if any(c.isspace() for c in vpn_subnet):
logger.error(f'add_cell_peer: vpn_subnet contains whitespace: {vpn_subnet!r}')
return False
# Reject subnets that overlap the local WG network — would create a routing blackhole
try:
local_net = ipaddress.ip_network(self._get_configured_network(), strict=False)
if local_net.overlaps(remote_net):
logger.error(
f'add_cell_peer: vpn_subnet {vpn_subnet!r} overlaps local WG network '
f'{str(local_net)!r} — use a distinct subnet on the remote cell'
)
return False
except Exception:
pass
try:
content = self._read_config()
peer_block = (
f'\n[Peer]\n'
f'# cell:{name}\n'
f'PublicKey = {public_key}\n'
f'AllowedIPs = {vpn_subnet}\n'
f'PersistentKeepalive = 25\n'
)
if endpoint:
peer_block += f'Endpoint = {endpoint}\n'
self._write_config(content + peer_block)
self._ensure_cell_route(vpn_subnet)
return True
except Exception as e:
logger.error(f'add_cell_peer failed: {e}')
return False
def _ensure_cell_route(self, vpn_subnet: str) -> None:
"""Add kernel route for vpn_subnet via wg0 inside cell-wireguard if missing.
'wg set' updates WireGuard peer state but does not add kernel routes.
wg-quick would do this automatically, but we manage WG live via 'wg set'.
"""
real_conf = self._config_file()
if '/tmp/' in real_conf or 'pytest' in real_conf:
return
try:
subprocess.run(
['docker', 'exec', 'cell-wireguard',
'ip', 'route', 'add', vpn_subnet, 'dev', 'wg0'],
capture_output=True, timeout=5
)
logger.info(f'_ensure_cell_route: {vpn_subnet} via wg0')
except Exception as e:
logger.warning(f'_ensure_cell_route failed (non-fatal): {e}')
def sync_cell_routes(self) -> None:
"""Ensure kernel routes exist for all cell peers defined in wg0.conf.
Called on startup so routes survive container restarts (kernel routes
are ephemeral; only the WG peer config in wg0.conf persists).
"""
real_conf = self._config_file()
if '/tmp/' in real_conf or 'pytest' in real_conf:
return
try:
content = self._read_config()
subnets = []
lines = content.splitlines()
in_cell_peer = False
for line in lines:
stripped = line.strip()
if stripped == '[Peer]':
in_cell_peer = False
elif stripped.startswith('# cell:'):
in_cell_peer = True
elif in_cell_peer and stripped.startswith('AllowedIPs'):
subnet = stripped.split('=', 1)[1].strip()
subnets.append(subnet)
for subnet in subnets:
self._ensure_cell_route(subnet)
if subnets:
logger.info(f'sync_cell_routes: ensured routes for {subnets}')
except Exception as e:
logger.warning(f'sync_cell_routes failed (non-fatal): {e}')
def remove_peer(self, public_key: str) -> bool:
"""Remove the [Peer] block matching public_key from wg0.conf."""
try:
content = self._read_config()
# Split on blank lines between blocks
raw_blocks = ('\n' + content).split('\n\n')
new_blocks = [
b for b in raw_blocks
if not (f'PublicKey = {public_key}' in b and '[Peer]' in b)
]
self._write_config('\n\n'.join(new_blocks).lstrip('\n'))
return True
except Exception as e:
logger.error(f'remove_peer failed: {e}')
return False
def get_peers(self) -> List[Dict[str, Any]]:
"""Parse wg0.conf and return list of peer dicts."""
content = self._read_config()
peers = []
sections = content.split('[Peer]')
for section in sections[1:]:
peer: Dict[str, Any] = {}
for line in section.strip().splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' not in line:
continue
key, _, value = line.partition('=')
key = key.strip().lower().replace(' ', '')
value = value.strip()
if key == 'publickey':
peer['public_key'] = value
elif key == 'allowedips':
peer['allowed_ips'] = value
elif key == 'persistentkeepalive':
try:
peer['persistent_keepalive'] = int(value)
except ValueError:
peer['persistent_keepalive'] = value
elif key == 'endpoint':
peer['endpoint'] = value
if peer:
peers.append(peer)
return peers
def update_peer_ip(self, public_key: str, new_ip: str) -> bool:
"""Update AllowedIPs for the peer with the given public key."""
import ipaddress
# Reject whitespace/newlines that ip_network() may tolerate but would inject config
if not isinstance(new_ip, str) or any(c.isspace() for c in new_ip):
logger.error(f'update_peer_ip: invalid new_ip {new_ip!r}')
return False
try:
ipaddress.ip_network(new_ip, strict=False)
except ValueError as e:
logger.error(f'update_peer_ip: invalid new_ip {new_ip!r}: {e}')
return False
content = self._read_config()
if f'PublicKey = {public_key}' not in content:
return False
lines = content.splitlines()
in_target = False
new_lines = []
for line in lines:
if line.strip() == f'PublicKey = {public_key}':
in_target = True
if in_target and line.strip().startswith('AllowedIPs'):
line = f'AllowedIPs = {new_ip}'
in_target = False
new_lines.append(line)
self._write_config('\n'.join(new_lines))
return True
SPLIT_TUNNEL_IPS = '10.0.0.0/24, 172.20.0.0/16' # legacy fallback; use get_split_tunnel_ips()
FULL_TUNNEL_IPS = '0.0.0.0/0, ::/0'
def get_peer_config(self, peer_name: str, peer_ip: str,
peer_private_key: str,
server_endpoint: str = '<SERVER_IP>',
allowed_ips: str = None) -> str:
"""Generate a WireGuard client config string (full-tunnel by default)."""
if allowed_ips is None:
allowed_ips = self.FULL_TUNNEL_IPS
server_keys = self.get_keys()
peer_dns = _resolve_peer_dns()
port = self._get_configured_port()
endpoint = server_endpoint if ':' in server_endpoint else f'{server_endpoint}:{port}'
addr = peer_ip if '/' in peer_ip else f'{peer_ip}/32'
return (
f'[Interface]\n'
f'PrivateKey = {peer_private_key}\n'
f'Address = {addr}\n'
f'DNS = {peer_dns}\n'
f'\n'
f'[Peer]\n'
f'PublicKey = {server_keys["public_key"]}\n'
f'AllowedIPs = {allowed_ips}\n'
f'Endpoint = {endpoint}\n'
f'PersistentKeepalive = 25\n'
)
# ── External IP & port ────────────────────────────────────────────────────
def _ip_cache_file(self) -> str:
return os.path.join(self.keys_dir, 'external_ip.json')
def get_external_ip(self, force_refresh: bool = False) -> Optional[str]:
"""Detect external IP, caching result for 1 hour."""
cache_file = self._ip_cache_file()
if not force_refresh and os.path.exists(cache_file):
try:
with open(cache_file) as f:
data = json.load(f)
if time.time() - data.get('ts', 0) < 3600:
return data.get('ip')
except Exception:
pass
ip = None
services = [
'https://api.ipify.org',
'https://ifconfig.me/ip',
'https://icanhazip.com',
]
if _requests:
for url in services:
try:
resp = _requests.get(url, timeout=5)
candidate = resp.text.strip()
if candidate and len(candidate) < 45:
ip = candidate
break
except Exception:
continue
if ip:
try:
with open(cache_file, 'w') as f:
json.dump({'ip': ip, 'ts': time.time()}, f)
except (PermissionError, OSError):
pass
return ip
def check_port_open(self, port: int = None) -> bool:
"""Check if WireGuard is running and listening on the configured UDP port."""
configured_port = port if port is not None else self._get_configured_port()
# Primary: verify wg0 is up AND listening on the configured port
try:
result = subprocess.run(
['docker', 'exec', 'cell-wireguard', 'wg', 'show', 'wg0'],
capture_output=True, text=True, timeout=5,
)
if result.returncode == 0 and f'listening port: {configured_port}' in result.stdout.lower():
return True
except Exception:
pass
# Fallback: recent peer handshake confirms external reachability
try:
statuses = self.get_all_peer_statuses()
for st in statuses.values():
if st.get('online'):
return True
except Exception:
pass
return False
def get_server_config(self) -> Dict[str, Any]:
"""Return server public key, external IP, endpoint, port, and tunnel info."""
keys = self.get_keys()
external_ip = self.get_external_ip()
port = self._get_configured_port()
endpoint = f'{external_ip}:{port}' if external_ip else None
return {
'public_key': keys['public_key'],
'external_ip': external_ip,
'endpoint': endpoint,
'port': port,
'port_open': None,
'dns_ip': _resolve_peer_dns(),
'split_tunnel_ips': self.get_split_tunnel_ips(),
'vpn_network': self._get_configured_network(),
}
def get_peer_status(self, public_key: str) -> Dict[str, Any]:
"""Return live handshake + transfer stats for a peer from `wg show`."""
try:
result = subprocess.run(
['docker', 'exec', 'cell-wireguard', 'wg', 'show', 'wg0', 'dump'],
capture_output=True, text=True, timeout=5,
)
for line in result.stdout.splitlines():
parts = line.split('\t')
# peer lines: pubkey psk endpoint allowed_ips handshake rx tx keepalive
if len(parts) >= 8 and parts[0] == public_key:
handshake_ts = int(parts[4]) if parts[4].isdigit() else 0
now = int(time.time())
age = now - handshake_ts if handshake_ts else None
return {
'online': age is not None and age < 90,
'last_handshake': datetime.utcfromtimestamp(handshake_ts).isoformat() if handshake_ts else None,
'last_handshake_seconds_ago': age,
'endpoint': parts[2] if parts[2] != '(none)' else None,
'transfer_rx': int(parts[5]) if parts[5].isdigit() else 0,
'transfer_tx': int(parts[6]) if parts[6].isdigit() else 0,
}
except Exception as e:
logger.debug(f'get_peer_status failed: {e}')
return {'online': None, 'last_handshake': None, 'transfer_rx': 0, 'transfer_tx': 0}
def get_all_peer_statuses(self) -> Dict[str, Any]:
"""Return {public_key: status_dict} for all known peers from wg show."""
statuses: Dict[str, Any] = {}
try:
result = subprocess.run(
['docker', 'exec', 'cell-wireguard', 'wg', 'show', 'wg0', 'dump'],
capture_output=True, text=True, timeout=5,
)
now = int(time.time())
for line in result.stdout.splitlines():
parts = line.split('\t')
if len(parts) >= 8:
pub = parts[0]
handshake_ts = int(parts[4]) if parts[4].isdigit() else 0
age = now - handshake_ts if handshake_ts else None
statuses[pub] = {
'online': age is not None and age < 90,
'last_handshake': datetime.utcfromtimestamp(handshake_ts).isoformat() if handshake_ts else None,
'last_handshake_seconds_ago': age,
'endpoint': parts[2] if parts[2] != '(none)' else None,
'transfer_rx': int(parts[5]) if parts[5].isdigit() else 0,
'transfer_tx': int(parts[6]) if parts[6].isdigit() else 0,
}
except Exception as e:
logger.debug(f'get_all_peer_statuses failed: {e}')
return statuses
# ── Status & connectivity ─────────────────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
"""Return service status by checking whether the Docker container is up."""
try:
result = subprocess.run(
['docker', 'ps', '--filter', 'name=cell-wireguard', '--format', '{{.Names}}'],
capture_output=True, text=True, timeout=5,
)
running = 'cell-wireguard' in result.stdout
return {
'running': running,
'status': 'online' if running else 'offline',
'interface': 'wg0',
'ip_info': {'address': SERVER_ADDRESS} if running else {},
'peers_count': len(self.get_peers()),
'timestamp': datetime.utcnow().isoformat(),
}
except Exception as e:
return self.handle_error(e, 'get_status')
def test_connectivity(self, peer_ip: str = None) -> Dict[str, Any]:
"""Ping a peer IP and return results. Called with no args from health_check."""
if not peer_ip:
status = self.get_status()
running = status.get('running', False)
return {'success': running, 'reachable': running, 'status': status.get('status')}
# Validate target_ip — reject argv injection (any string starting with '-' would
# be parsed by ping as a flag) and any non-IP input.
import ipaddress
if not isinstance(peer_ip, str) or peer_ip.startswith('-'):
return {
'peer_ip': peer_ip,
'ping_success': False,
'ping_output': '',
'ping_error': 'invalid peer_ip',
}
try:
ipaddress.ip_address(peer_ip)
except ValueError:
return {
'peer_ip': peer_ip,
'ping_success': False,
'ping_output': '',
'ping_error': 'invalid peer_ip',
}
try:
result = subprocess.run(
['ping', '-c', '1', '-W', '2', peer_ip],
capture_output=True, text=True, timeout=5,
)
return {
'peer_ip': peer_ip,
'ping_success': result.returncode == 0,
'ping_output': result.stdout,
'ping_error': result.stderr,
}
except Exception as e:
return {
'peer_ip': peer_ip,
'ping_success': False,
'ping_output': '',
'ping_error': str(e),
}
def get_metrics(self) -> Dict[str, Any]:
status = self.get_status()
return {
'service': 'wireguard',
'timestamp': datetime.utcnow().isoformat(),
'status': status.get('status', 'unknown'),
'peers_count': status.get('peers_count', 0),
}
def restart_service(self) -> bool:
try:
result = subprocess.run(
['docker', 'restart', 'cell-wireguard'],
capture_output=True, text=True, timeout=30,
)
return result.returncode == 0
except Exception as e:
logger.error(f'restart_service failed: {e}')
return False