7391d7f7a2
Sends 50 pings at 0.2s intervals through the cell-to-cell tunnel and asserts that ≤5% exceed 3× the median RTT (floor 15ms). Catches server-side packet processing regressions on wired paths. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
491 lines
19 KiB
Python
491 lines
19 KiB
Python
"""
|
||
E2E test: cross-cell routing for a split-tunnel VPN peer.
|
||
|
||
Creates a temporary WireGuard peer on cell2 (the first connected cell), brings up
|
||
a real WireGuard tunnel from the test-runner host, and verifies that cell1 (the
|
||
local cell) is reachable end-to-end via the cell-to-cell link.
|
||
|
||
Why this test is meaningful
|
||
---------------------------
|
||
Cell1's WireGuard server IP is reachable ONLY inside cell1's cell-wireguard Docker
|
||
container. It is NOT reachable directly from the test-runner host. If a ping to
|
||
that IP succeeds, the full path was taken:
|
||
|
||
[test-runner wg-e2e] → cell2 WireGuard → [cell-to-cell tunnel] → cell1 WG IP
|
||
|
||
Prerequisites
|
||
-------------
|
||
* /home/roof/pic/data/api/cell_links.json must have at least one connected cell
|
||
* /home/roof/pic/config/wireguard/wg_confs/wg0.conf must exist
|
||
* SSH access to cell2's LAN IP as 'roof' with no passphrase
|
||
* `wg-quick`, `dig`, and `sudo` available on the test runner
|
||
|
||
All configuration is read dynamically from config files — no hardcoded IPs or ports.
|
||
Skip conditions are checked at module level; no manual flag needed.
|
||
"""
|
||
import ipaddress
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import time
|
||
|
||
import pytest
|
||
|
||
# -------------------------------------------------------------------------
|
||
# Dynamic configuration loading
|
||
# -------------------------------------------------------------------------
|
||
|
||
_CELL_LINKS_FILE = '/home/roof/pic/data/api/cell_links.json'
|
||
_WG_CONF_FILE = '/home/roof/pic/config/wireguard/wg_confs/wg0.conf'
|
||
_CELL_CONFIG_FILE = '/home/roof/pic/config/api/cell_config.json'
|
||
|
||
|
||
def _load_cfg() -> dict:
|
||
"""Load all test parameters from local config files. Returns {} on any error."""
|
||
cfg = {}
|
||
|
||
# --- cell1 (local/our) identity ---
|
||
try:
|
||
with open(_CELL_CONFIG_FILE) as f:
|
||
identity = json.load(f).get('_identity', {})
|
||
cfg['cell1_domain'] = identity.get('domain', '')
|
||
cfg['cell1_wg_port'] = int(identity.get('wireguard_port', 51820))
|
||
except Exception:
|
||
pass
|
||
|
||
# --- cell1 WG server IP from wg0.conf [Interface] Address ---
|
||
try:
|
||
with open(_WG_CONF_FILE) as f:
|
||
in_iface = False
|
||
for line in f:
|
||
line = line.strip()
|
||
if line == '[Interface]':
|
||
in_iface = True
|
||
elif line.startswith('[') and line.endswith(']'):
|
||
in_iface = False
|
||
elif in_iface and line.startswith('Address') and '=' in line:
|
||
addr = line.split('=', 1)[1].strip()
|
||
net = ipaddress.ip_interface(addr)
|
||
cfg['cell1_wg_ip'] = str(net.ip)
|
||
cfg['cell1_vpn_subnet'] = str(net.network)
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
# --- cell2 (connected peer) from cell_links.json (first entry) ---
|
||
try:
|
||
with open(_CELL_LINKS_FILE) as f:
|
||
links = json.load(f)
|
||
if links:
|
||
link = links[0]
|
||
endpoint = link.get('endpoint', '')
|
||
if endpoint:
|
||
host, _, port = endpoint.rpartition(':')
|
||
cfg['cell2_lan_ip'] = host
|
||
cfg['cell2_wg_port'] = int(port)
|
||
cfg['cell2_pubkey'] = link.get('public_key', '')
|
||
cfg['cell2_wg_ip'] = link.get('dns_ip', '')
|
||
cfg['cell2_vpn_subnet'] = link.get('vpn_subnet', '')
|
||
cfg['cell2_domain'] = link.get('domain', '')
|
||
except Exception:
|
||
pass
|
||
|
||
# --- Derive TEST_PEER_IP: a high-range host in cell2's VPN subnet ---
|
||
# Use .250 (e.g., 10.0.2.250 for 10.0.2.0/24)
|
||
try:
|
||
net = ipaddress.ip_network(cfg['cell2_vpn_subnet'], strict=False)
|
||
cfg['test_peer_ip'] = str(net.network_address + 250)
|
||
except Exception:
|
||
pass
|
||
|
||
return cfg
|
||
|
||
|
||
_CFG = _load_cfg()
|
||
|
||
IFACE_NAME = 'pic-e2e-c2c'
|
||
IPTABLES_COMMENT = 'pic-e2e-c2c-test'
|
||
|
||
# Maximum acceptable average RTT for cells on the same LAN
|
||
MAX_LATENCY_MS = 10.0
|
||
|
||
pytestmark = pytest.mark.wg
|
||
|
||
|
||
# -------------------------------------------------------------------------
|
||
# Helpers
|
||
# -------------------------------------------------------------------------
|
||
|
||
def _run(cmd, **kw):
|
||
return subprocess.run(cmd, capture_output=True, text=True, **kw)
|
||
|
||
|
||
def _ssh(cmd, timeout=15):
|
||
"""Run a command on cell2 via SSH and return the CompletedProcess."""
|
||
lan_ip = _CFG.get('cell2_lan_ip', '')
|
||
return _run(
|
||
['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'BatchMode=yes',
|
||
'-o', 'ConnectTimeout=5', f'roof@{lan_ip}', cmd],
|
||
timeout=timeout,
|
||
)
|
||
|
||
|
||
def _pic2_wg(args, timeout=10):
|
||
"""Run a command inside cell2's cell-wireguard container via SSH."""
|
||
return _ssh('docker exec cell-wireguard ' + args, timeout=timeout)
|
||
|
||
|
||
def _ping(ip, count=3, wait=2):
|
||
r = _run(['ping', '-c', str(count), '-W', str(wait), ip], timeout=count * wait + 5)
|
||
return r.returncode == 0
|
||
|
||
|
||
def _cleanup_iface():
|
||
_run(['sudo', 'ip', 'link', 'delete', IFACE_NAME], timeout=5)
|
||
|
||
|
||
def _cleanup_pic2_peer(pubkey):
|
||
_pic2_wg(f'wg set wg0 peer {pubkey} remove')
|
||
|
||
|
||
def _cleanup_pic2_iptables(peer_ip):
|
||
_pic2_wg(
|
||
f'iptables -D FORWARD -s {peer_ip} -j ACCEPT '
|
||
f'-m comment --comment {IPTABLES_COMMENT}'
|
||
)
|
||
|
||
|
||
# -------------------------------------------------------------------------
|
||
# Skip checks
|
||
# -------------------------------------------------------------------------
|
||
|
||
def _check_prerequisites():
|
||
"""Return a skip reason string, or None if all prereqs are met."""
|
||
required_keys = ('cell1_wg_ip', 'cell2_lan_ip', 'cell2_pubkey',
|
||
'cell2_wg_ip', 'test_peer_ip', 'cell2_vpn_subnet',
|
||
'cell1_vpn_subnet')
|
||
missing = [k for k in required_keys if not _CFG.get(k)]
|
||
if missing:
|
||
return f'Config incomplete (missing: {", ".join(missing)}). ' \
|
||
f'Ensure cell_links.json and wg0.conf exist and are populated.'
|
||
if _run(['which', 'wg-quick']).returncode != 0:
|
||
return 'wg-quick not found on test runner'
|
||
if _run(['which', 'dig']).returncode != 0:
|
||
return 'dig not found on test runner'
|
||
if _run(['sudo', '-n', 'true']).returncode != 0:
|
||
return 'passwordless sudo not available on test runner'
|
||
r = _ssh('echo ok', timeout=6)
|
||
if r.returncode != 0 or 'ok' not in r.stdout:
|
||
lan = _CFG.get('cell2_lan_ip', '?')
|
||
return f'SSH to {lan} failed: {r.stderr.strip() or r.stdout.strip()}'
|
||
return None
|
||
|
||
|
||
_SKIP_REASON = _check_prerequisites()
|
||
|
||
|
||
# -------------------------------------------------------------------------
|
||
# Fixtures
|
||
# -------------------------------------------------------------------------
|
||
|
||
@pytest.fixture(scope='module')
|
||
def wg_setup(tmp_path_factory):
|
||
"""
|
||
Module-scoped fixture: adds test peer to cell2, brings up wg interface on
|
||
cell1 (test runner), yields config dict, then tears everything down.
|
||
"""
|
||
if _SKIP_REASON:
|
||
pytest.skip(_SKIP_REASON)
|
||
|
||
cell2_lan_ip = _CFG['cell2_lan_ip']
|
||
cell2_wg_port = _CFG['cell2_wg_port']
|
||
cell2_pubkey = _CFG['cell2_pubkey']
|
||
cell2_vpn_subnet = _CFG['cell2_vpn_subnet']
|
||
cell1_vpn_subnet = _CFG['cell1_vpn_subnet']
|
||
test_peer_ip = _CFG['test_peer_ip']
|
||
test_peer_cidr = f'{test_peer_ip}/32'
|
||
|
||
# AllowedIPs: cell2's subnet + cell1's subnet (split-tunnel cross-cell)
|
||
allowed_ips = f'{cell2_vpn_subnet}, {cell1_vpn_subnet}'
|
||
|
||
tmp_path = tmp_path_factory.mktemp('wg_e2e_c2c')
|
||
|
||
# --- Generate a WireGuard key pair ---
|
||
priv_r = _run(['wg', 'genkey'], timeout=5)
|
||
assert priv_r.returncode == 0, f'wg genkey failed: {priv_r.stderr}'
|
||
privkey = priv_r.stdout.strip()
|
||
|
||
pub_r = subprocess.run(['wg', 'pubkey'], input=privkey, capture_output=True,
|
||
text=True, timeout=5)
|
||
assert pub_r.returncode == 0, f'wg pubkey failed: {pub_r.stderr}'
|
||
pubkey = pub_r.stdout.strip()
|
||
|
||
# --- Add peer to cell2's wg0 (live, no restart needed) ---
|
||
r = _pic2_wg(f'wg set wg0 peer {pubkey} allowed-ips {test_peer_cidr} persistent-keepalive 25')
|
||
assert r.returncode == 0, f'wg set peer failed on cell2: {r.stderr}'
|
||
|
||
# --- Add permissive iptables ACCEPT so test traffic passes cell2's FORWARD ---
|
||
r = _pic2_wg(
|
||
f'iptables -I FORWARD 1 -s {test_peer_ip} -j ACCEPT '
|
||
f'-m comment --comment {IPTABLES_COMMENT}'
|
||
)
|
||
assert r.returncode == 0, f'iptables -I FORWARD failed on cell2: {r.stderr}'
|
||
|
||
# --- Write wg-quick config on the test runner ---
|
||
conf_path = str(tmp_path / f'{IFACE_NAME}.conf')
|
||
# Table=off: let wg-quick create the interface without managing routes.
|
||
# We add routes manually below so that existing host routes (added by
|
||
# ensure_cell_subnet_routes) don't conflict with wg-quick's route additions.
|
||
conf = (
|
||
f'[Interface]\n'
|
||
f'PrivateKey = {privkey}\n'
|
||
f'Address = {test_peer_ip}/32\n'
|
||
f'Table = off\n'
|
||
f'\n'
|
||
f'[Peer]\n'
|
||
f'PublicKey = {cell2_pubkey}\n'
|
||
f'Endpoint = {cell2_lan_ip}:{cell2_wg_port}\n'
|
||
f'AllowedIPs = {allowed_ips}\n'
|
||
f'PersistentKeepalive = 25\n'
|
||
)
|
||
with open(conf_path, 'w') as f:
|
||
f.write(conf)
|
||
os.chmod(conf_path, 0o600)
|
||
|
||
# --- Bring up the WireGuard interface ---
|
||
up_r = _run(['sudo', 'wg-quick', 'up', conf_path], timeout=15)
|
||
assert up_r.returncode == 0, f'wg-quick up failed: {up_r.stderr}\n{up_r.stdout}'
|
||
|
||
# --- Add routes manually (replace is idempotent, handles pre-existing routes) ---
|
||
for subnet in allowed_ips.split(','):
|
||
_run(['sudo', 'ip', 'route', 'replace', subnet.strip(), 'dev', IFACE_NAME], timeout=5)
|
||
|
||
time.sleep(3)
|
||
|
||
yield {
|
||
'test_peer_ip': test_peer_ip,
|
||
'allowed_ips': allowed_ips,
|
||
'privkey': privkey,
|
||
'pubkey': pubkey,
|
||
'conf_path': conf_path,
|
||
'cell1_wg_ip': _CFG['cell1_wg_ip'],
|
||
'cell2_wg_ip': _CFG['cell2_wg_ip'],
|
||
'cell1_domain': _CFG.get('cell1_domain', ''),
|
||
}
|
||
|
||
# --- Teardown ---
|
||
_run(['sudo', 'wg-quick', 'down', conf_path], timeout=15)
|
||
try:
|
||
os.unlink(conf_path)
|
||
except Exception:
|
||
pass
|
||
_cleanup_pic2_iptables(test_peer_ip)
|
||
_cleanup_pic2_peer(pubkey)
|
||
|
||
|
||
# -------------------------------------------------------------------------
|
||
# Tests
|
||
# -------------------------------------------------------------------------
|
||
|
||
class TestCellToCellRouting:
|
||
"""
|
||
Full end-to-end: split-tunnel peer on cell2 reaches cell1 via cell-to-cell tunnel.
|
||
"""
|
||
|
||
def test_prerequisites_cell1_not_reachable_directly(self):
|
||
"""Confirm cell1's WG IP is NOT reachable from host without VPN (test validity check)."""
|
||
cell1_wg_ip = _CFG.get('cell1_wg_ip', '10.0.0.1')
|
||
assert not _ping(cell1_wg_ip, count=1, wait=1), (
|
||
f'{cell1_wg_ip} is reachable WITHOUT the VPN — test would be a false positive. '
|
||
f'The test is only meaningful when this IP is unreachable without the tunnel.'
|
||
)
|
||
|
||
def test_cell2_wg_ip_reachable(self, wg_setup):
|
||
"""Cell2's WireGuard server IP is reachable (basic tunnel sanity)."""
|
||
cell2_wg_ip = wg_setup['cell2_wg_ip']
|
||
assert _ping(cell2_wg_ip), (
|
||
f'Cell2 WG server IP {cell2_wg_ip} not reachable. '
|
||
f'Handshake may not have established. '
|
||
f'Peer allowed-ips: {wg_setup["allowed_ips"]}'
|
||
)
|
||
|
||
def test_handshake_established(self, wg_setup):
|
||
"""A WireGuard handshake with cell2 has completed (within 30 s)."""
|
||
deadline = time.time() + 30
|
||
while time.time() < deadline:
|
||
r = _run(['sudo', 'wg', 'show', IFACE_NAME], timeout=5)
|
||
if 'latest handshake' in r.stdout:
|
||
return
|
||
time.sleep(2)
|
||
pytest.fail(
|
||
f'No WireGuard handshake with cell2 after 30 s.\n'
|
||
f'wg show output:\n{r.stdout}'
|
||
)
|
||
|
||
def test_cross_cell_wg_ip_reachable(self, wg_setup):
|
||
"""
|
||
Cell1's WireGuard IP is reachable from a peer connected to cell2.
|
||
|
||
This is the critical cross-cell routing test. The full path is:
|
||
test-runner → wg-e2e → cell2 FORWARD → cell-to-cell tunnel → cell1 WG IP
|
||
"""
|
||
cell1_wg_ip = wg_setup['cell1_wg_ip']
|
||
assert _ping(cell1_wg_ip, count=3, wait=3), (
|
||
f'Cell1 WG IP {cell1_wg_ip} NOT reachable from split-tunnel peer on cell2. '
|
||
f'\nAllowed IPs: {wg_setup["allowed_ips"]}'
|
||
f'\nThis means the cell-to-cell routing is broken. Check:'
|
||
f'\n 1. cell2 FORWARD chain has ESTABLISHED,RELATED ACCEPT'
|
||
f'\n 2. cell2 wg0.conf has AllowedIPs covering cell1 subnet'
|
||
f'\n 3. Cell-to-cell WireGuard handshake is recent (wg show on cell2)'
|
||
)
|
||
|
||
def test_cross_cell_ping_latency(self, wg_setup):
|
||
"""Cross-cell ping RTT is under 10ms — both cells are on the same LAN.
|
||
|
||
High latency (>10ms) indicates traffic is routing via the internet instead
|
||
of directly over the LAN WireGuard tunnel. Check cell_links.json endpoints.
|
||
"""
|
||
cell1_wg_ip = wg_setup['cell1_wg_ip']
|
||
r = _run(['ping', '-c', '10', '-W', '2', cell1_wg_ip], timeout=30)
|
||
assert r.returncode == 0, (
|
||
f'Ping to {cell1_wg_ip} failed completely: {r.stderr}'
|
||
)
|
||
m = re.search(
|
||
r'rtt min/avg/max/mdev = [\d.]+/([\d.]+)/[\d.]+/[\d.]+ ms',
|
||
r.stdout
|
||
)
|
||
assert m, f'Could not parse ping RTT from output:\n{r.stdout}'
|
||
avg_ms = float(m.group(1))
|
||
assert avg_ms < MAX_LATENCY_MS, (
|
||
f'Cross-cell avg RTT {avg_ms:.2f}ms exceeds {MAX_LATENCY_MS}ms. '
|
||
f'Both cells are on the same LAN — high latency means traffic routes '
|
||
f'via the internet. Check cell_links.json uses LAN IPs, not public IPs.'
|
||
)
|
||
|
||
def test_cross_cell_api_reachable(self, wg_setup):
|
||
"""Cell1's API /health is reachable through the cell-to-cell tunnel."""
|
||
import urllib.request, urllib.error
|
||
cell1_wg_ip = wg_setup['cell1_wg_ip']
|
||
url = f'http://{cell1_wg_ip}:3000/health'
|
||
try:
|
||
with urllib.request.urlopen(url, timeout=8) as resp:
|
||
import json as _json
|
||
body = _json.loads(resp.read())
|
||
assert body.get('status') == 'healthy', (
|
||
f'Cell1 API returned unexpected health: {body}'
|
||
)
|
||
except urllib.error.URLError as e:
|
||
pytest.fail(
|
||
f'Cell1 API at {url} not reachable via cell-to-cell tunnel: {e}. '
|
||
f'\nNote: if test_cross_cell_wg_ip_reachable passed but this fails, '
|
||
f'the tunnel is up but port 3000 may be blocked by cell1\'s firewall.'
|
||
)
|
||
|
||
def test_cross_cell_web_reachable(self, wg_setup):
|
||
"""Cell1's web service (port 80 via Caddy) is reachable through the tunnel."""
|
||
import urllib.request, urllib.error
|
||
cell1_wg_ip = wg_setup['cell1_wg_ip']
|
||
url = f'http://{cell1_wg_ip}/'
|
||
try:
|
||
with urllib.request.urlopen(url, timeout=8) as resp:
|
||
assert resp.status in (200, 301, 302, 307, 308), (
|
||
f'Unexpected HTTP status from cell1 Caddy: {resp.status}'
|
||
)
|
||
except urllib.error.HTTPError as e:
|
||
assert e.code < 500, (
|
||
f'Cell1 Caddy returned server error {e.code} — may indicate a Caddy issue'
|
||
)
|
||
except urllib.error.URLError as e:
|
||
pytest.fail(
|
||
f'Cell1 web (Caddy) at {url} not reachable via tunnel: {e}'
|
||
)
|
||
|
||
def test_tunnel_latency_consistency(self, wg_setup):
|
||
"""WireGuard tunnel has no significant latency spikes on a local wired network.
|
||
|
||
Sends 50 pings at 0.2s intervals to cell2's WG server IP. Pass condition:
|
||
≤ 5% of pings (≤ 2 out of 50) exceed max(3× median RTT, 15ms).
|
||
"""
|
||
cell2_wg_ip = wg_setup['cell2_wg_ip']
|
||
r = _run(['ping', '-c', '50', '-i', '0.2', '-W', '2', cell2_wg_ip], timeout=25)
|
||
assert r.returncode == 0, f'All pings to {cell2_wg_ip} failed: {r.stderr}'
|
||
|
||
rtts = [float(m.group(1)) for m in re.finditer(r'time=([\d.]+) ms', r.stdout)]
|
||
assert len(rtts) >= 40, (
|
||
f'Too few ping replies ({len(rtts)}/50) — packet loss may mask latency issues'
|
||
)
|
||
|
||
sorted_rtts = sorted(rtts)
|
||
median = sorted_rtts[len(sorted_rtts) // 2]
|
||
spike_threshold = max(median * 3.0, 15.0)
|
||
spikes = [rtt for rtt in rtts if rtt > spike_threshold]
|
||
spike_ratio = len(spikes) / len(rtts)
|
||
|
||
assert spike_ratio <= 0.05, (
|
||
f'Latency spikes: {len(spikes)}/{len(rtts)} packets ({spike_ratio:.0%}) '
|
||
f'exceeded {spike_threshold:.1f}ms (3× median {median:.1f}ms). '
|
||
f'Spike values: {[f"{s:.1f}ms" for s in sorted(spikes)]}'
|
||
)
|
||
|
||
def test_cross_cell_domain_accessible(self, wg_setup):
|
||
"""A service domain from cell1 is resolvable via cell2's DNS and HTTP-reachable.
|
||
|
||
DNS chain:
|
||
test-peer → cell2_wg_ip:53 (DNAT → cell-dns on cell2)
|
||
→ cell2 Corefile forwards cell1_domain → cell1_wg_ip:53
|
||
→ cell1 cell-dns returns A record → cell1_wg_ip
|
||
|
||
HTTP:
|
||
test-peer → cell1_wg_ip:80 (Host: calendar.<cell1_domain>)
|
||
→ cell-to-cell tunnel → cell1 Caddy
|
||
|
||
Requires: scoped DNAT (wg0 PREROUTING -d server_ip) on both cells
|
||
and Docker→wg0 routing on cell2 (host route + MASQUERADE).
|
||
"""
|
||
cell1_domain = wg_setup.get('cell1_domain', '')
|
||
cell2_wg_ip = wg_setup['cell2_wg_ip']
|
||
cell1_wg_ip = wg_setup['cell1_wg_ip']
|
||
|
||
if not cell1_domain:
|
||
pytest.skip('cell1_domain not configured — cannot test domain access')
|
||
|
||
calendar_host = f'calendar.{cell1_domain}'
|
||
|
||
# --- DNS resolution via cell2's DNS ---
|
||
r = _run(
|
||
['dig', f'@{cell2_wg_ip}', calendar_host, '+short', '+time=5', '+tries=2'],
|
||
timeout=15
|
||
)
|
||
assert r.returncode == 0, (
|
||
f'dig @{cell2_wg_ip} {calendar_host} failed: {r.stderr.strip()}\n'
|
||
f'DNS chain: test-peer → {cell2_wg_ip}:53 → cell-dns(cell2) '
|
||
f'→ {cell1_wg_ip}:53 (cell1). '
|
||
f'If this fails, check: (1) DNAT on cell2 scoped to -d {cell2_wg_ip}, '
|
||
f'(2) Docker→wg0 routing on cell2 (host route + MASQUERADE).'
|
||
)
|
||
resolved = r.stdout.strip()
|
||
assert resolved == cell1_wg_ip, (
|
||
f'DNS resolved {calendar_host!r} to {resolved!r}, '
|
||
f'expected {cell1_wg_ip!r}. '
|
||
f'cell1 zone: all {cell1_domain} names should point to {cell1_wg_ip}.'
|
||
)
|
||
|
||
# --- HTTP access via domain name (Host header → Caddy routing) ---
|
||
import urllib.request, urllib.error
|
||
url = f'http://{cell1_wg_ip}/'
|
||
req = urllib.request.Request(url, headers={'Host': calendar_host})
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||
assert resp.status < 500, (
|
||
f'cell1 Caddy returned {resp.status} for Host:{calendar_host}'
|
||
)
|
||
except urllib.error.HTTPError as e:
|
||
assert e.code < 500, (
|
||
f'cell1 Caddy server error {e.code} for Host:{calendar_host}'
|
||
)
|
||
except urllib.error.URLError as e:
|
||
pytest.fail(
|
||
f'HTTP to {url} (Host:{calendar_host}) via tunnel failed: {e}'
|
||
)
|