Files
pic/tests/e2e/wg/test_cell_to_cell_routing.py
roof 7391d7f7a2 Add e2e latency consistency test for WireGuard tunnel
Sends 50 pings at 0.2s intervals through the cell-to-cell tunnel and
asserts that ≤5% exceed 3× the median RTT (floor 15ms). Catches
server-side packet processing regressions on wired paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-07 15:13:27 -04:00

491 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
E2E test: cross-cell routing for a split-tunnel VPN peer.
Creates a temporary WireGuard peer on cell2 (the first connected cell), brings up
a real WireGuard tunnel from the test-runner host, and verifies that cell1 (the
local cell) is reachable end-to-end via the cell-to-cell link.
Why this test is meaningful
---------------------------
Cell1's WireGuard server IP is reachable ONLY inside cell1's cell-wireguard Docker
container. It is NOT reachable directly from the test-runner host. If a ping to
that IP succeeds, the full path was taken:
[test-runner wg-e2e] → cell2 WireGuard → [cell-to-cell tunnel] → cell1 WG IP
Prerequisites
-------------
* /home/roof/pic/data/api/cell_links.json must have at least one connected cell
* /home/roof/pic/config/wireguard/wg_confs/wg0.conf must exist
* SSH access to cell2's LAN IP as 'roof' with no passphrase
* `wg-quick`, `dig`, and `sudo` available on the test runner
All configuration is read dynamically from config files — no hardcoded IPs or ports.
Skip conditions are checked at module level; no manual flag needed.
"""
import ipaddress
import json
import os
import re
import subprocess
import time
import pytest
# -------------------------------------------------------------------------
# Dynamic configuration loading
# -------------------------------------------------------------------------
_CELL_LINKS_FILE = '/home/roof/pic/data/api/cell_links.json'
_WG_CONF_FILE = '/home/roof/pic/config/wireguard/wg_confs/wg0.conf'
_CELL_CONFIG_FILE = '/home/roof/pic/config/api/cell_config.json'
def _load_cfg() -> dict:
"""Load all test parameters from local config files. Returns {} on any error."""
cfg = {}
# --- cell1 (local/our) identity ---
try:
with open(_CELL_CONFIG_FILE) as f:
identity = json.load(f).get('_identity', {})
cfg['cell1_domain'] = identity.get('domain', '')
cfg['cell1_wg_port'] = int(identity.get('wireguard_port', 51820))
except Exception:
pass
# --- cell1 WG server IP from wg0.conf [Interface] Address ---
try:
with open(_WG_CONF_FILE) as f:
in_iface = False
for line in f:
line = line.strip()
if line == '[Interface]':
in_iface = True
elif line.startswith('[') and line.endswith(']'):
in_iface = False
elif in_iface and line.startswith('Address') and '=' in line:
addr = line.split('=', 1)[1].strip()
net = ipaddress.ip_interface(addr)
cfg['cell1_wg_ip'] = str(net.ip)
cfg['cell1_vpn_subnet'] = str(net.network)
break
except Exception:
pass
# --- cell2 (connected peer) from cell_links.json (first entry) ---
try:
with open(_CELL_LINKS_FILE) as f:
links = json.load(f)
if links:
link = links[0]
endpoint = link.get('endpoint', '')
if endpoint:
host, _, port = endpoint.rpartition(':')
cfg['cell2_lan_ip'] = host
cfg['cell2_wg_port'] = int(port)
cfg['cell2_pubkey'] = link.get('public_key', '')
cfg['cell2_wg_ip'] = link.get('dns_ip', '')
cfg['cell2_vpn_subnet'] = link.get('vpn_subnet', '')
cfg['cell2_domain'] = link.get('domain', '')
except Exception:
pass
# --- Derive TEST_PEER_IP: a high-range host in cell2's VPN subnet ---
# Use .250 (e.g., 10.0.2.250 for 10.0.2.0/24)
try:
net = ipaddress.ip_network(cfg['cell2_vpn_subnet'], strict=False)
cfg['test_peer_ip'] = str(net.network_address + 250)
except Exception:
pass
return cfg
_CFG = _load_cfg()
IFACE_NAME = 'pic-e2e-c2c'
IPTABLES_COMMENT = 'pic-e2e-c2c-test'
# Maximum acceptable average RTT for cells on the same LAN
MAX_LATENCY_MS = 10.0
pytestmark = pytest.mark.wg
# -------------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------------
def _run(cmd, **kw):
return subprocess.run(cmd, capture_output=True, text=True, **kw)
def _ssh(cmd, timeout=15):
"""Run a command on cell2 via SSH and return the CompletedProcess."""
lan_ip = _CFG.get('cell2_lan_ip', '')
return _run(
['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'BatchMode=yes',
'-o', 'ConnectTimeout=5', f'roof@{lan_ip}', cmd],
timeout=timeout,
)
def _pic2_wg(args, timeout=10):
"""Run a command inside cell2's cell-wireguard container via SSH."""
return _ssh('docker exec cell-wireguard ' + args, timeout=timeout)
def _ping(ip, count=3, wait=2):
r = _run(['ping', '-c', str(count), '-W', str(wait), ip], timeout=count * wait + 5)
return r.returncode == 0
def _cleanup_iface():
_run(['sudo', 'ip', 'link', 'delete', IFACE_NAME], timeout=5)
def _cleanup_pic2_peer(pubkey):
_pic2_wg(f'wg set wg0 peer {pubkey} remove')
def _cleanup_pic2_iptables(peer_ip):
_pic2_wg(
f'iptables -D FORWARD -s {peer_ip} -j ACCEPT '
f'-m comment --comment {IPTABLES_COMMENT}'
)
# -------------------------------------------------------------------------
# Skip checks
# -------------------------------------------------------------------------
def _check_prerequisites():
"""Return a skip reason string, or None if all prereqs are met."""
required_keys = ('cell1_wg_ip', 'cell2_lan_ip', 'cell2_pubkey',
'cell2_wg_ip', 'test_peer_ip', 'cell2_vpn_subnet',
'cell1_vpn_subnet')
missing = [k for k in required_keys if not _CFG.get(k)]
if missing:
return f'Config incomplete (missing: {", ".join(missing)}). ' \
f'Ensure cell_links.json and wg0.conf exist and are populated.'
if _run(['which', 'wg-quick']).returncode != 0:
return 'wg-quick not found on test runner'
if _run(['which', 'dig']).returncode != 0:
return 'dig not found on test runner'
if _run(['sudo', '-n', 'true']).returncode != 0:
return 'passwordless sudo not available on test runner'
r = _ssh('echo ok', timeout=6)
if r.returncode != 0 or 'ok' not in r.stdout:
lan = _CFG.get('cell2_lan_ip', '?')
return f'SSH to {lan} failed: {r.stderr.strip() or r.stdout.strip()}'
return None
_SKIP_REASON = _check_prerequisites()
# -------------------------------------------------------------------------
# Fixtures
# -------------------------------------------------------------------------
@pytest.fixture(scope='module')
def wg_setup(tmp_path_factory):
"""
Module-scoped fixture: adds test peer to cell2, brings up wg interface on
cell1 (test runner), yields config dict, then tears everything down.
"""
if _SKIP_REASON:
pytest.skip(_SKIP_REASON)
cell2_lan_ip = _CFG['cell2_lan_ip']
cell2_wg_port = _CFG['cell2_wg_port']
cell2_pubkey = _CFG['cell2_pubkey']
cell2_vpn_subnet = _CFG['cell2_vpn_subnet']
cell1_vpn_subnet = _CFG['cell1_vpn_subnet']
test_peer_ip = _CFG['test_peer_ip']
test_peer_cidr = f'{test_peer_ip}/32'
# AllowedIPs: cell2's subnet + cell1's subnet (split-tunnel cross-cell)
allowed_ips = f'{cell2_vpn_subnet}, {cell1_vpn_subnet}'
tmp_path = tmp_path_factory.mktemp('wg_e2e_c2c')
# --- Generate a WireGuard key pair ---
priv_r = _run(['wg', 'genkey'], timeout=5)
assert priv_r.returncode == 0, f'wg genkey failed: {priv_r.stderr}'
privkey = priv_r.stdout.strip()
pub_r = subprocess.run(['wg', 'pubkey'], input=privkey, capture_output=True,
text=True, timeout=5)
assert pub_r.returncode == 0, f'wg pubkey failed: {pub_r.stderr}'
pubkey = pub_r.stdout.strip()
# --- Add peer to cell2's wg0 (live, no restart needed) ---
r = _pic2_wg(f'wg set wg0 peer {pubkey} allowed-ips {test_peer_cidr} persistent-keepalive 25')
assert r.returncode == 0, f'wg set peer failed on cell2: {r.stderr}'
# --- Add permissive iptables ACCEPT so test traffic passes cell2's FORWARD ---
r = _pic2_wg(
f'iptables -I FORWARD 1 -s {test_peer_ip} -j ACCEPT '
f'-m comment --comment {IPTABLES_COMMENT}'
)
assert r.returncode == 0, f'iptables -I FORWARD failed on cell2: {r.stderr}'
# --- Write wg-quick config on the test runner ---
conf_path = str(tmp_path / f'{IFACE_NAME}.conf')
# Table=off: let wg-quick create the interface without managing routes.
# We add routes manually below so that existing host routes (added by
# ensure_cell_subnet_routes) don't conflict with wg-quick's route additions.
conf = (
f'[Interface]\n'
f'PrivateKey = {privkey}\n'
f'Address = {test_peer_ip}/32\n'
f'Table = off\n'
f'\n'
f'[Peer]\n'
f'PublicKey = {cell2_pubkey}\n'
f'Endpoint = {cell2_lan_ip}:{cell2_wg_port}\n'
f'AllowedIPs = {allowed_ips}\n'
f'PersistentKeepalive = 25\n'
)
with open(conf_path, 'w') as f:
f.write(conf)
os.chmod(conf_path, 0o600)
# --- Bring up the WireGuard interface ---
up_r = _run(['sudo', 'wg-quick', 'up', conf_path], timeout=15)
assert up_r.returncode == 0, f'wg-quick up failed: {up_r.stderr}\n{up_r.stdout}'
# --- Add routes manually (replace is idempotent, handles pre-existing routes) ---
for subnet in allowed_ips.split(','):
_run(['sudo', 'ip', 'route', 'replace', subnet.strip(), 'dev', IFACE_NAME], timeout=5)
time.sleep(3)
yield {
'test_peer_ip': test_peer_ip,
'allowed_ips': allowed_ips,
'privkey': privkey,
'pubkey': pubkey,
'conf_path': conf_path,
'cell1_wg_ip': _CFG['cell1_wg_ip'],
'cell2_wg_ip': _CFG['cell2_wg_ip'],
'cell1_domain': _CFG.get('cell1_domain', ''),
}
# --- Teardown ---
_run(['sudo', 'wg-quick', 'down', conf_path], timeout=15)
try:
os.unlink(conf_path)
except Exception:
pass
_cleanup_pic2_iptables(test_peer_ip)
_cleanup_pic2_peer(pubkey)
# -------------------------------------------------------------------------
# Tests
# -------------------------------------------------------------------------
class TestCellToCellRouting:
"""
Full end-to-end: split-tunnel peer on cell2 reaches cell1 via cell-to-cell tunnel.
"""
def test_prerequisites_cell1_not_reachable_directly(self):
"""Confirm cell1's WG IP is NOT reachable from host without VPN (test validity check)."""
cell1_wg_ip = _CFG.get('cell1_wg_ip', '10.0.0.1')
assert not _ping(cell1_wg_ip, count=1, wait=1), (
f'{cell1_wg_ip} is reachable WITHOUT the VPN — test would be a false positive. '
f'The test is only meaningful when this IP is unreachable without the tunnel.'
)
def test_cell2_wg_ip_reachable(self, wg_setup):
"""Cell2's WireGuard server IP is reachable (basic tunnel sanity)."""
cell2_wg_ip = wg_setup['cell2_wg_ip']
assert _ping(cell2_wg_ip), (
f'Cell2 WG server IP {cell2_wg_ip} not reachable. '
f'Handshake may not have established. '
f'Peer allowed-ips: {wg_setup["allowed_ips"]}'
)
def test_handshake_established(self, wg_setup):
"""A WireGuard handshake with cell2 has completed (within 30 s)."""
deadline = time.time() + 30
while time.time() < deadline:
r = _run(['sudo', 'wg', 'show', IFACE_NAME], timeout=5)
if 'latest handshake' in r.stdout:
return
time.sleep(2)
pytest.fail(
f'No WireGuard handshake with cell2 after 30 s.\n'
f'wg show output:\n{r.stdout}'
)
def test_cross_cell_wg_ip_reachable(self, wg_setup):
"""
Cell1's WireGuard IP is reachable from a peer connected to cell2.
This is the critical cross-cell routing test. The full path is:
test-runner → wg-e2e → cell2 FORWARD → cell-to-cell tunnel → cell1 WG IP
"""
cell1_wg_ip = wg_setup['cell1_wg_ip']
assert _ping(cell1_wg_ip, count=3, wait=3), (
f'Cell1 WG IP {cell1_wg_ip} NOT reachable from split-tunnel peer on cell2. '
f'\nAllowed IPs: {wg_setup["allowed_ips"]}'
f'\nThis means the cell-to-cell routing is broken. Check:'
f'\n 1. cell2 FORWARD chain has ESTABLISHED,RELATED ACCEPT'
f'\n 2. cell2 wg0.conf has AllowedIPs covering cell1 subnet'
f'\n 3. Cell-to-cell WireGuard handshake is recent (wg show on cell2)'
)
def test_cross_cell_ping_latency(self, wg_setup):
"""Cross-cell ping RTT is under 10ms — both cells are on the same LAN.
High latency (>10ms) indicates traffic is routing via the internet instead
of directly over the LAN WireGuard tunnel. Check cell_links.json endpoints.
"""
cell1_wg_ip = wg_setup['cell1_wg_ip']
r = _run(['ping', '-c', '10', '-W', '2', cell1_wg_ip], timeout=30)
assert r.returncode == 0, (
f'Ping to {cell1_wg_ip} failed completely: {r.stderr}'
)
m = re.search(
r'rtt min/avg/max/mdev = [\d.]+/([\d.]+)/[\d.]+/[\d.]+ ms',
r.stdout
)
assert m, f'Could not parse ping RTT from output:\n{r.stdout}'
avg_ms = float(m.group(1))
assert avg_ms < MAX_LATENCY_MS, (
f'Cross-cell avg RTT {avg_ms:.2f}ms exceeds {MAX_LATENCY_MS}ms. '
f'Both cells are on the same LAN — high latency means traffic routes '
f'via the internet. Check cell_links.json uses LAN IPs, not public IPs.'
)
def test_cross_cell_api_reachable(self, wg_setup):
"""Cell1's API /health is reachable through the cell-to-cell tunnel."""
import urllib.request, urllib.error
cell1_wg_ip = wg_setup['cell1_wg_ip']
url = f'http://{cell1_wg_ip}:3000/health'
try:
with urllib.request.urlopen(url, timeout=8) as resp:
import json as _json
body = _json.loads(resp.read())
assert body.get('status') == 'healthy', (
f'Cell1 API returned unexpected health: {body}'
)
except urllib.error.URLError as e:
pytest.fail(
f'Cell1 API at {url} not reachable via cell-to-cell tunnel: {e}. '
f'\nNote: if test_cross_cell_wg_ip_reachable passed but this fails, '
f'the tunnel is up but port 3000 may be blocked by cell1\'s firewall.'
)
def test_cross_cell_web_reachable(self, wg_setup):
"""Cell1's web service (port 80 via Caddy) is reachable through the tunnel."""
import urllib.request, urllib.error
cell1_wg_ip = wg_setup['cell1_wg_ip']
url = f'http://{cell1_wg_ip}/'
try:
with urllib.request.urlopen(url, timeout=8) as resp:
assert resp.status in (200, 301, 302, 307, 308), (
f'Unexpected HTTP status from cell1 Caddy: {resp.status}'
)
except urllib.error.HTTPError as e:
assert e.code < 500, (
f'Cell1 Caddy returned server error {e.code} — may indicate a Caddy issue'
)
except urllib.error.URLError as e:
pytest.fail(
f'Cell1 web (Caddy) at {url} not reachable via tunnel: {e}'
)
def test_tunnel_latency_consistency(self, wg_setup):
"""WireGuard tunnel has no significant latency spikes on a local wired network.
Sends 50 pings at 0.2s intervals to cell2's WG server IP. Pass condition:
≤ 5% of pings (≤ 2 out of 50) exceed max(3× median RTT, 15ms).
"""
cell2_wg_ip = wg_setup['cell2_wg_ip']
r = _run(['ping', '-c', '50', '-i', '0.2', '-W', '2', cell2_wg_ip], timeout=25)
assert r.returncode == 0, f'All pings to {cell2_wg_ip} failed: {r.stderr}'
rtts = [float(m.group(1)) for m in re.finditer(r'time=([\d.]+) ms', r.stdout)]
assert len(rtts) >= 40, (
f'Too few ping replies ({len(rtts)}/50) — packet loss may mask latency issues'
)
sorted_rtts = sorted(rtts)
median = sorted_rtts[len(sorted_rtts) // 2]
spike_threshold = max(median * 3.0, 15.0)
spikes = [rtt for rtt in rtts if rtt > spike_threshold]
spike_ratio = len(spikes) / len(rtts)
assert spike_ratio <= 0.05, (
f'Latency spikes: {len(spikes)}/{len(rtts)} packets ({spike_ratio:.0%}) '
f'exceeded {spike_threshold:.1f}ms (3× median {median:.1f}ms). '
f'Spike values: {[f"{s:.1f}ms" for s in sorted(spikes)]}'
)
def test_cross_cell_domain_accessible(self, wg_setup):
"""A service domain from cell1 is resolvable via cell2's DNS and HTTP-reachable.
DNS chain:
test-peer → cell2_wg_ip:53 (DNAT → cell-dns on cell2)
→ cell2 Corefile forwards cell1_domain → cell1_wg_ip:53
→ cell1 cell-dns returns A record → cell1_wg_ip
HTTP:
test-peer → cell1_wg_ip:80 (Host: calendar.<cell1_domain>)
→ cell-to-cell tunnel → cell1 Caddy
Requires: scoped DNAT (wg0 PREROUTING -d server_ip) on both cells
and Docker→wg0 routing on cell2 (host route + MASQUERADE).
"""
cell1_domain = wg_setup.get('cell1_domain', '')
cell2_wg_ip = wg_setup['cell2_wg_ip']
cell1_wg_ip = wg_setup['cell1_wg_ip']
if not cell1_domain:
pytest.skip('cell1_domain not configured — cannot test domain access')
calendar_host = f'calendar.{cell1_domain}'
# --- DNS resolution via cell2's DNS ---
r = _run(
['dig', f'@{cell2_wg_ip}', calendar_host, '+short', '+time=5', '+tries=2'],
timeout=15
)
assert r.returncode == 0, (
f'dig @{cell2_wg_ip} {calendar_host} failed: {r.stderr.strip()}\n'
f'DNS chain: test-peer → {cell2_wg_ip}:53 → cell-dns(cell2) '
f'{cell1_wg_ip}:53 (cell1). '
f'If this fails, check: (1) DNAT on cell2 scoped to -d {cell2_wg_ip}, '
f'(2) Docker→wg0 routing on cell2 (host route + MASQUERADE).'
)
resolved = r.stdout.strip()
assert resolved == cell1_wg_ip, (
f'DNS resolved {calendar_host!r} to {resolved!r}, '
f'expected {cell1_wg_ip!r}. '
f'cell1 zone: all {cell1_domain} names should point to {cell1_wg_ip}.'
)
# --- HTTP access via domain name (Host header → Caddy routing) ---
import urllib.request, urllib.error
url = f'http://{cell1_wg_ip}/'
req = urllib.request.Request(url, headers={'Host': calendar_host})
try:
with urllib.request.urlopen(req, timeout=8) as resp:
assert resp.status < 500, (
f'cell1 Caddy returned {resp.status} for Host:{calendar_host}'
)
except urllib.error.HTTPError as e:
assert e.code < 500, (
f'cell1 Caddy server error {e.code} for Host:{calendar_host}'
)
except urllib.error.URLError as e:
pytest.fail(
f'HTTP to {url} (Host:{calendar_host}) via tunnel failed: {e}'
)