Files
pic/tests/e2e/wg/test_cell_to_cell_routing.py
T
2026-05-05 13:31:53 -04:00

464 lines
18 KiB
Python

"""
E2E test: cross-cell routing for a split-tunnel VPN peer.
Creates a temporary WireGuard peer on cell2 (the first connected cell), brings up
a real WireGuard tunnel from the test-runner host, and verifies that cell1 (the
local cell) is reachable end-to-end via the cell-to-cell link.
Why this test is meaningful
---------------------------
Cell1's WireGuard server IP is reachable ONLY inside cell1's cell-wireguard Docker
container. It is NOT reachable directly from the test-runner host. If a ping to
that IP succeeds, the full path was taken:
[test-runner wg-e2e] → cell2 WireGuard → [cell-to-cell tunnel] → cell1 WG IP
Prerequisites
-------------
* /home/roof/pic/data/api/cell_links.json must have at least one connected cell
* /home/roof/pic/config/wireguard/wg_confs/wg0.conf must exist
* SSH access to cell2's LAN IP as 'roof' with no passphrase
* `wg-quick`, `dig`, and `sudo` available on the test runner
All configuration is read dynamically from config files — no hardcoded IPs or ports.
Skip conditions are checked at module level; no manual flag needed.
"""
import ipaddress
import json
import os
import re
import subprocess
import time
import pytest
# -------------------------------------------------------------------------
# Dynamic configuration loading
# -------------------------------------------------------------------------
_CELL_LINKS_FILE = '/home/roof/pic/data/api/cell_links.json'
_WG_CONF_FILE = '/home/roof/pic/config/wireguard/wg_confs/wg0.conf'
_CELL_CONFIG_FILE = '/home/roof/pic/config/api/cell_config.json'
def _load_cfg() -> dict:
"""Load all test parameters from local config files. Returns {} on any error."""
cfg = {}
# --- cell1 (local/our) identity ---
try:
with open(_CELL_CONFIG_FILE) as f:
identity = json.load(f).get('_identity', {})
cfg['cell1_domain'] = identity.get('domain', '')
cfg['cell1_wg_port'] = int(identity.get('wireguard_port', 51820))
except Exception:
pass
# --- cell1 WG server IP from wg0.conf [Interface] Address ---
try:
with open(_WG_CONF_FILE) as f:
in_iface = False
for line in f:
line = line.strip()
if line == '[Interface]':
in_iface = True
elif line.startswith('[') and line.endswith(']'):
in_iface = False
elif in_iface and line.startswith('Address') and '=' in line:
addr = line.split('=', 1)[1].strip()
net = ipaddress.ip_interface(addr)
cfg['cell1_wg_ip'] = str(net.ip)
cfg['cell1_vpn_subnet'] = str(net.network)
break
except Exception:
pass
# --- cell2 (connected peer) from cell_links.json (first entry) ---
try:
with open(_CELL_LINKS_FILE) as f:
links = json.load(f)
if links:
link = links[0]
endpoint = link.get('endpoint', '')
if endpoint:
host, _, port = endpoint.rpartition(':')
cfg['cell2_lan_ip'] = host
cfg['cell2_wg_port'] = int(port)
cfg['cell2_pubkey'] = link.get('public_key', '')
cfg['cell2_wg_ip'] = link.get('dns_ip', '')
cfg['cell2_vpn_subnet'] = link.get('vpn_subnet', '')
cfg['cell2_domain'] = link.get('domain', '')
except Exception:
pass
# --- Derive TEST_PEER_IP: a high-range host in cell2's VPN subnet ---
# Use .250 (e.g., 10.0.2.250 for 10.0.2.0/24)
try:
net = ipaddress.ip_network(cfg['cell2_vpn_subnet'], strict=False)
cfg['test_peer_ip'] = str(net.network_address + 250)
except Exception:
pass
return cfg
_CFG = _load_cfg()
IFACE_NAME = 'pic-e2e-c2c'
IPTABLES_COMMENT = 'pic-e2e-c2c-test'
# Maximum acceptable average RTT for cells on the same LAN
MAX_LATENCY_MS = 10.0
pytestmark = pytest.mark.wg
# -------------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------------
def _run(cmd, **kw):
return subprocess.run(cmd, capture_output=True, text=True, **kw)
def _ssh(cmd, timeout=15):
"""Run a command on cell2 via SSH and return the CompletedProcess."""
lan_ip = _CFG.get('cell2_lan_ip', '')
return _run(
['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'BatchMode=yes',
'-o', 'ConnectTimeout=5', f'roof@{lan_ip}', cmd],
timeout=timeout,
)
def _pic2_wg(args, timeout=10):
"""Run a command inside cell2's cell-wireguard container via SSH."""
return _ssh('docker exec cell-wireguard ' + args, timeout=timeout)
def _ping(ip, count=3, wait=2):
r = _run(['ping', '-c', str(count), '-W', str(wait), ip], timeout=count * wait + 5)
return r.returncode == 0
def _cleanup_iface():
_run(['sudo', 'ip', 'link', 'delete', IFACE_NAME], timeout=5)
def _cleanup_pic2_peer(pubkey):
_pic2_wg(f'wg set wg0 peer {pubkey} remove')
def _cleanup_pic2_iptables(peer_ip):
_pic2_wg(
f'iptables -D FORWARD -s {peer_ip} -j ACCEPT '
f'-m comment --comment {IPTABLES_COMMENT}'
)
# -------------------------------------------------------------------------
# Skip checks
# -------------------------------------------------------------------------
def _check_prerequisites():
"""Return a skip reason string, or None if all prereqs are met."""
required_keys = ('cell1_wg_ip', 'cell2_lan_ip', 'cell2_pubkey',
'cell2_wg_ip', 'test_peer_ip', 'cell2_vpn_subnet',
'cell1_vpn_subnet')
missing = [k for k in required_keys if not _CFG.get(k)]
if missing:
return f'Config incomplete (missing: {", ".join(missing)}). ' \
f'Ensure cell_links.json and wg0.conf exist and are populated.'
if _run(['which', 'wg-quick']).returncode != 0:
return 'wg-quick not found on test runner'
if _run(['which', 'dig']).returncode != 0:
return 'dig not found on test runner'
if _run(['sudo', '-n', 'true']).returncode != 0:
return 'passwordless sudo not available on test runner'
r = _ssh('echo ok', timeout=6)
if r.returncode != 0 or 'ok' not in r.stdout:
lan = _CFG.get('cell2_lan_ip', '?')
return f'SSH to {lan} failed: {r.stderr.strip() or r.stdout.strip()}'
return None
_SKIP_REASON = _check_prerequisites()
# -------------------------------------------------------------------------
# Fixtures
# -------------------------------------------------------------------------
@pytest.fixture(scope='module')
def wg_setup(tmp_path_factory):
"""
Module-scoped fixture: adds test peer to cell2, brings up wg interface on
cell1 (test runner), yields config dict, then tears everything down.
"""
if _SKIP_REASON:
pytest.skip(_SKIP_REASON)
cell2_lan_ip = _CFG['cell2_lan_ip']
cell2_wg_port = _CFG['cell2_wg_port']
cell2_pubkey = _CFG['cell2_pubkey']
cell2_vpn_subnet = _CFG['cell2_vpn_subnet']
cell1_vpn_subnet = _CFG['cell1_vpn_subnet']
test_peer_ip = _CFG['test_peer_ip']
test_peer_cidr = f'{test_peer_ip}/32'
# AllowedIPs: cell2's subnet + cell1's subnet (split-tunnel cross-cell)
allowed_ips = f'{cell2_vpn_subnet}, {cell1_vpn_subnet}'
tmp_path = tmp_path_factory.mktemp('wg_e2e_c2c')
# --- Generate a WireGuard key pair ---
priv_r = _run(['wg', 'genkey'], timeout=5)
assert priv_r.returncode == 0, f'wg genkey failed: {priv_r.stderr}'
privkey = priv_r.stdout.strip()
pub_r = subprocess.run(['wg', 'pubkey'], input=privkey, capture_output=True,
text=True, timeout=5)
assert pub_r.returncode == 0, f'wg pubkey failed: {pub_r.stderr}'
pubkey = pub_r.stdout.strip()
# --- Add peer to cell2's wg0 (live, no restart needed) ---
r = _pic2_wg(f'wg set wg0 peer {pubkey} allowed-ips {test_peer_cidr} persistent-keepalive 25')
assert r.returncode == 0, f'wg set peer failed on cell2: {r.stderr}'
# --- Add permissive iptables ACCEPT so test traffic passes cell2's FORWARD ---
r = _pic2_wg(
f'iptables -I FORWARD 1 -s {test_peer_ip} -j ACCEPT '
f'-m comment --comment {IPTABLES_COMMENT}'
)
assert r.returncode == 0, f'iptables -I FORWARD failed on cell2: {r.stderr}'
# --- Write wg-quick config on the test runner ---
conf_path = str(tmp_path / f'{IFACE_NAME}.conf')
# Table=off: let wg-quick create the interface without managing routes.
# We add routes manually below so that existing host routes (added by
# ensure_cell_subnet_routes) don't conflict with wg-quick's route additions.
conf = (
f'[Interface]\n'
f'PrivateKey = {privkey}\n'
f'Address = {test_peer_ip}/32\n'
f'Table = off\n'
f'\n'
f'[Peer]\n'
f'PublicKey = {cell2_pubkey}\n'
f'Endpoint = {cell2_lan_ip}:{cell2_wg_port}\n'
f'AllowedIPs = {allowed_ips}\n'
f'PersistentKeepalive = 25\n'
)
with open(conf_path, 'w') as f:
f.write(conf)
os.chmod(conf_path, 0o600)
# --- Bring up the WireGuard interface ---
up_r = _run(['sudo', 'wg-quick', 'up', conf_path], timeout=15)
assert up_r.returncode == 0, f'wg-quick up failed: {up_r.stderr}\n{up_r.stdout}'
# --- Add routes manually (replace is idempotent, handles pre-existing routes) ---
for subnet in allowed_ips.split(','):
_run(['sudo', 'ip', 'route', 'replace', subnet.strip(), 'dev', IFACE_NAME], timeout=5)
time.sleep(3)
yield {
'test_peer_ip': test_peer_ip,
'allowed_ips': allowed_ips,
'privkey': privkey,
'pubkey': pubkey,
'conf_path': conf_path,
'cell1_wg_ip': _CFG['cell1_wg_ip'],
'cell2_wg_ip': _CFG['cell2_wg_ip'],
'cell1_domain': _CFG.get('cell1_domain', ''),
}
# --- Teardown ---
_run(['sudo', 'wg-quick', 'down', conf_path], timeout=15)
try:
os.unlink(conf_path)
except Exception:
pass
_cleanup_pic2_iptables(test_peer_ip)
_cleanup_pic2_peer(pubkey)
# -------------------------------------------------------------------------
# Tests
# -------------------------------------------------------------------------
class TestCellToCellRouting:
"""
Full end-to-end: split-tunnel peer on cell2 reaches cell1 via cell-to-cell tunnel.
"""
def test_prerequisites_cell1_not_reachable_directly(self):
"""Confirm cell1's WG IP is NOT reachable from host without VPN (test validity check)."""
cell1_wg_ip = _CFG.get('cell1_wg_ip', '10.0.0.1')
assert not _ping(cell1_wg_ip, count=1, wait=1), (
f'{cell1_wg_ip} is reachable WITHOUT the VPN — test would be a false positive. '
f'The test is only meaningful when this IP is unreachable without the tunnel.'
)
def test_cell2_wg_ip_reachable(self, wg_setup):
"""Cell2's WireGuard server IP is reachable (basic tunnel sanity)."""
cell2_wg_ip = wg_setup['cell2_wg_ip']
assert _ping(cell2_wg_ip), (
f'Cell2 WG server IP {cell2_wg_ip} not reachable. '
f'Handshake may not have established. '
f'Peer allowed-ips: {wg_setup["allowed_ips"]}'
)
def test_handshake_established(self, wg_setup):
"""A WireGuard handshake with cell2 has completed (within 30 s)."""
deadline = time.time() + 30
while time.time() < deadline:
r = _run(['sudo', 'wg', 'show', IFACE_NAME], timeout=5)
if 'latest handshake' in r.stdout:
return
time.sleep(2)
pytest.fail(
f'No WireGuard handshake with cell2 after 30 s.\n'
f'wg show output:\n{r.stdout}'
)
def test_cross_cell_wg_ip_reachable(self, wg_setup):
"""
Cell1's WireGuard IP is reachable from a peer connected to cell2.
This is the critical cross-cell routing test. The full path is:
test-runner → wg-e2e → cell2 FORWARD → cell-to-cell tunnel → cell1 WG IP
"""
cell1_wg_ip = wg_setup['cell1_wg_ip']
assert _ping(cell1_wg_ip, count=3, wait=3), (
f'Cell1 WG IP {cell1_wg_ip} NOT reachable from split-tunnel peer on cell2. '
f'\nAllowed IPs: {wg_setup["allowed_ips"]}'
f'\nThis means the cell-to-cell routing is broken. Check:'
f'\n 1. cell2 FORWARD chain has ESTABLISHED,RELATED ACCEPT'
f'\n 2. cell2 wg0.conf has AllowedIPs covering cell1 subnet'
f'\n 3. Cell-to-cell WireGuard handshake is recent (wg show on cell2)'
)
def test_cross_cell_ping_latency(self, wg_setup):
"""Cross-cell ping RTT is under 10ms — both cells are on the same LAN.
High latency (>10ms) indicates traffic is routing via the internet instead
of directly over the LAN WireGuard tunnel. Check cell_links.json endpoints.
"""
cell1_wg_ip = wg_setup['cell1_wg_ip']
r = _run(['ping', '-c', '10', '-W', '2', cell1_wg_ip], timeout=30)
assert r.returncode == 0, (
f'Ping to {cell1_wg_ip} failed completely: {r.stderr}'
)
m = re.search(
r'rtt min/avg/max/mdev = [\d.]+/([\d.]+)/[\d.]+/[\d.]+ ms',
r.stdout
)
assert m, f'Could not parse ping RTT from output:\n{r.stdout}'
avg_ms = float(m.group(1))
assert avg_ms < MAX_LATENCY_MS, (
f'Cross-cell avg RTT {avg_ms:.2f}ms exceeds {MAX_LATENCY_MS}ms. '
f'Both cells are on the same LAN — high latency means traffic routes '
f'via the internet. Check cell_links.json uses LAN IPs, not public IPs.'
)
def test_cross_cell_api_reachable(self, wg_setup):
"""Cell1's API /health is reachable through the cell-to-cell tunnel."""
import urllib.request, urllib.error
cell1_wg_ip = wg_setup['cell1_wg_ip']
url = f'http://{cell1_wg_ip}:3000/health'
try:
with urllib.request.urlopen(url, timeout=8) as resp:
import json as _json
body = _json.loads(resp.read())
assert body.get('status') == 'healthy', (
f'Cell1 API returned unexpected health: {body}'
)
except urllib.error.URLError as e:
pytest.fail(
f'Cell1 API at {url} not reachable via cell-to-cell tunnel: {e}. '
f'\nNote: if test_cross_cell_wg_ip_reachable passed but this fails, '
f'the tunnel is up but port 3000 may be blocked by cell1\'s firewall.'
)
def test_cross_cell_web_reachable(self, wg_setup):
"""Cell1's web service (port 80 via Caddy) is reachable through the tunnel."""
import urllib.request, urllib.error
cell1_wg_ip = wg_setup['cell1_wg_ip']
url = f'http://{cell1_wg_ip}/'
try:
with urllib.request.urlopen(url, timeout=8) as resp:
assert resp.status in (200, 301, 302, 307, 308), (
f'Unexpected HTTP status from cell1 Caddy: {resp.status}'
)
except urllib.error.HTTPError as e:
assert e.code < 500, (
f'Cell1 Caddy returned server error {e.code} — may indicate a Caddy issue'
)
except urllib.error.URLError as e:
pytest.fail(
f'Cell1 web (Caddy) at {url} not reachable via tunnel: {e}'
)
def test_cross_cell_domain_accessible(self, wg_setup):
"""A service domain from cell1 is resolvable via cell2's DNS and HTTP-reachable.
DNS chain:
test-peer → cell2_wg_ip:53 (DNAT → cell-dns on cell2)
→ cell2 Corefile forwards cell1_domain → cell1_wg_ip:53
→ cell1 cell-dns returns A record → cell1_wg_ip
HTTP:
test-peer → cell1_wg_ip:80 (Host: calendar.<cell1_domain>)
→ cell-to-cell tunnel → cell1 Caddy
Requires: scoped DNAT (wg0 PREROUTING -d server_ip) on both cells
and Docker→wg0 routing on cell2 (host route + MASQUERADE).
"""
cell1_domain = wg_setup.get('cell1_domain', '')
cell2_wg_ip = wg_setup['cell2_wg_ip']
cell1_wg_ip = wg_setup['cell1_wg_ip']
if not cell1_domain:
pytest.skip('cell1_domain not configured — cannot test domain access')
calendar_host = f'calendar.{cell1_domain}'
# --- DNS resolution via cell2's DNS ---
r = _run(
['dig', f'@{cell2_wg_ip}', calendar_host, '+short', '+time=5', '+tries=2'],
timeout=15
)
assert r.returncode == 0, (
f'dig @{cell2_wg_ip} {calendar_host} failed: {r.stderr.strip()}\n'
f'DNS chain: test-peer → {cell2_wg_ip}:53 → cell-dns(cell2) '
f'{cell1_wg_ip}:53 (cell1). '
f'If this fails, check: (1) DNAT on cell2 scoped to -d {cell2_wg_ip}, '
f'(2) Docker→wg0 routing on cell2 (host route + MASQUERADE).'
)
resolved = r.stdout.strip()
assert resolved == cell1_wg_ip, (
f'DNS resolved {calendar_host!r} to {resolved!r}, '
f'expected {cell1_wg_ip!r}. '
f'cell1 zone: all {cell1_domain} names should point to {cell1_wg_ip}.'
)
# --- HTTP access via domain name (Host header → Caddy routing) ---
import urllib.request, urllib.error
url = f'http://{cell1_wg_ip}/'
req = urllib.request.Request(url, headers={'Host': calendar_host})
try:
with urllib.request.urlopen(req, timeout=8) as resp:
assert resp.status < 500, (
f'cell1 Caddy returned {resp.status} for Host:{calendar_host}'
)
except urllib.error.HTTPError as e:
assert e.code < 500, (
f'cell1 Caddy server error {e.code} for Host:{calendar_host}'
)
except urllib.error.URLError as e:
pytest.fail(
f'HTTP to {url} (Host:{calendar_host}) via tunnel failed: {e}'
)