test: add E2E coverage for peer dashboard/services, DNS records, and WG domain access

- test_peer_dashboard_services.py (63 tests): unit tests for all API fixes
  * peer_dashboard field names (name/transfer_rx/transfer_tx vs old stale names)
  * peer_dashboard service_urls dict with correct domain-keyed URLs
  * peer_services email structure (nested smtp/imap, address not username)
  * peer_services files key (not webdav), caldav URL (calendar.dev not radicale.dev:5232)
  * peer_services wireguard DNS (not 10.0.0.1), config text with DNS line
  * DNS zone records (api/webui → Caddy, VIPs for calendar/files/mail/webdav)
  * Caddyfile generation (all service blocks including webui.dev)
  * Access control (401 anon, 403 admin on peer-only routes, 404 missing peer)
- e2e/api/test_peer_endpoints.py: fix stale field assertions, add structure checks
- e2e/wg/test_wg_domain_access.py: E2E WG tests for DNS resolution via VPN tunnel
  * All *.dev domains resolve to correct IPs via CoreDNS
  * api.dev/webui.dev must resolve to Caddy, not container direct IPs
  * CoreDNS reachability through VPN tunnel
  * Peer config DNS field correctness
- e2e/ui/test_peer_dashboard.py: UI checks for service icon links, CalDAV URL, email

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-26 17:41:21 -04:00
parent 3690c6d955
commit 32272420cb
4 changed files with 956 additions and 9 deletions
+65 -4
View File
@@ -4,8 +4,8 @@ Scenarios 20, 21: Peer role access scoping.
Tests cover:
- Peer is blocked from admin-only routes (config, wireguard, peer list)
- Peer can access /api/peer/dashboard and /api/peer/services
- Dashboard response shape (peer_name, online, rx_bytes, tx_bytes, allowed_ips)
- Services response shape (wireguard, email, caldav, webdav sections)
- Dashboard response shape (name, online, transfer_rx, transfer_tx, service_urls)
- Services response shape (wireguard, email, caldav, files sections)
- Peer can change their own password and use the new credential
- Peer cannot call admin/reset-password
"""
@@ -54,12 +54,24 @@ def test_peer_dashboard_has_expected_fields(peer_client):
r = peer_client.get('/api/peer/dashboard')
assert r.status_code == 200
data = r.json()
missing = [f for f in ('peer_name', 'online', 'rx_bytes', 'tx_bytes', 'allowed_ips') if f not in data]
missing = [f for f in ('name', 'online', 'transfer_rx', 'transfer_tx', 'allowed_ips', 'service_urls') if f not in data]
assert not missing, (
f"Dashboard response missing fields {missing}. Got keys: {list(data.keys())}"
)
def test_peer_dashboard_no_stale_field_names(peer_client):
"""Verify renamed fields are gone — old names cause silent UI blanks."""
r = peer_client.get('/api/peer/dashboard')
assert r.status_code == 200
data = r.json()
stale = [f for f in ('peer_name', 'rx_bytes', 'tx_bytes') if f in data]
assert not stale, (
f"Dashboard response still has stale fields {stale}"
"PeerDashboard.jsx reads name/transfer_rx/transfer_tx"
)
def test_peer_can_access_own_services(peer_client):
r = peer_client.get('/api/peer/services')
assert r.status_code == 200, (
@@ -71,12 +83,61 @@ def test_peer_services_has_expected_sections(peer_client):
r = peer_client.get('/api/peer/services')
assert r.status_code == 200
data = r.json()
missing = [k for k in ('wireguard', 'email', 'caldav', 'webdav') if k not in data]
missing = [k for k in ('wireguard', 'email', 'caldav', 'files') if k not in data]
assert not missing, (
f"Services response missing sections {missing}. Got keys: {list(data.keys())}"
)
def test_peer_services_no_stale_keys(peer_client):
"""Verify renamed keys are gone — old names cause silent UI blanks."""
r = peer_client.get('/api/peer/services')
assert r.status_code == 200
data = r.json()
assert 'webdav' not in data, (
"'webdav' still present at top level — MyServices.jsx reads 'files'"
)
def test_peer_services_email_structure(peer_client):
"""Email section must use nested smtp/imap objects and email.address."""
r = peer_client.get('/api/peer/services')
assert r.status_code == 200
email = r.json().get('email', {})
assert 'address' in email, f"email.address missing; email keys: {list(email)}"
assert 'smtp' in email and isinstance(email['smtp'], dict), \
f"email.smtp must be a dict; got: {email.get('smtp')}"
assert 'imap' in email and isinstance(email['imap'], dict), \
f"email.imap must be a dict; got: {email.get('imap')}"
assert 'host' in email['smtp'], "email.smtp.host missing"
assert 'host' in email['imap'], "email.imap.host missing"
assert 'imap_host' not in email, "'imap_host' still flat — should be email.imap.host"
assert 'smtp_host' not in email, "'smtp_host' still flat — should be email.smtp.host"
def test_peer_services_caldav_url_uses_calendar_domain(peer_client):
"""CalDAV URL must be calendar.dev, not radicale.dev:5232."""
r = peer_client.get('/api/peer/services')
assert r.status_code == 200
url = r.json().get('caldav', {}).get('url', '')
assert 'radicale' not in url, \
f"CalDAV URL must not contain 'radicale' — no radicale.dev DNS record; got: {url}"
assert ':5232' not in url, \
f"CalDAV URL exposes port 5232 — use Caddy-proxied URL; got: {url}"
def test_peer_services_wireguard_dns_not_vpn_gateway(peer_client):
"""WireGuard DNS must be the CoreDNS IP, not the VPN gateway 10.0.0.1."""
r = peer_client.get('/api/peer/services')
assert r.status_code == 200
dns = r.json().get('wireguard', {}).get('dns', '')
assert dns != '10.0.0.1', (
"wireguard.dns is 10.0.0.1 (WireGuard VPN gateway) — "
"DNS queries to 10.0.0.1 fail because the VPN server doesn't run a DNS resolver; "
"must be the CoreDNS container IP"
)
# ---------------------------------------------------------------------------
# Auth management — scoping
# ---------------------------------------------------------------------------
+86 -5
View File
@@ -3,16 +3,22 @@ Peer dashboard and My Services page tests.
Scenarios:
12. Peer sees their own dashboard (PeerDashboard.jsx renders peer.name as <h1>)
13. Peer's My Services page loads and shows the WireGuard VPN section
13. Peer's My Services page loads and shows all service sections
14. Peer dashboard shows service icon links (calendar, files, mail, webdav)
15. My Services shows correct CalDAV URL (calendar.dev not radicale.dev:5232)
16. My Services shows email address field (not username)
Key selectors from PeerDashboard.jsx:
- h1 shows peer.name (line 61: `{peer.name || 'My Dashboard'}`)
- "VPN Address" stat card label (line 76)
- "Quick Access" "My Services" link (line 117-119)
- h1 shows peer.name (peer.name from /api/peer/dashboard)
- "VPN Address" stat card label
- "Quick Access" section with service icon links from service_urls
- "My Services" link
Key selectors from MyServices.jsx:
- h2 "WireGuard VPN" (line 93)
- h2 "WireGuard VPN"
- h2 "Email", h2 "Calendar & Contacts", h2 "Files"
- "Address" label for email (not "Username")
- "CalDAV URL" label with calendar.dev value
"""
import pytest
@@ -131,3 +137,78 @@ def test_peer_my_services_shows_files_section(peer_page, webui_base):
pytest.xfail(
"Files section heading not found on /my-services"
)
# ── 14. Service icon links ────────────────────────────────────────────────────
def test_peer_dashboard_has_calendar_link(peer_page, webui_base):
"""PeerDashboard Quick Access section renders a Calendar icon link."""
page, _ = peer_page
page.wait_for_load_state('networkidle')
try:
page.wait_for_selector('a:has-text("Calendar")', timeout=5000)
except Exception:
pytest.xfail(
"Calendar link not found on peer dashboard Quick Access — "
"check that service_urls.calendar is populated and PeerDashboard.jsx renders it"
)
def test_peer_dashboard_has_files_link(peer_page, webui_base):
"""PeerDashboard Quick Access section renders a Files icon link."""
page, _ = peer_page
page.wait_for_load_state('networkidle')
try:
page.wait_for_selector('a:has-text("Files")', timeout=5000)
except Exception:
pytest.xfail(
"Files link not found on peer dashboard Quick Access"
)
def test_peer_dashboard_has_mail_link(peer_page, webui_base):
"""PeerDashboard Quick Access section renders a Mail icon link."""
page, _ = peer_page
page.wait_for_load_state('networkidle')
try:
page.wait_for_selector('a:has-text("Mail")', timeout=5000)
except Exception:
pytest.xfail(
"Mail link not found on peer dashboard Quick Access"
)
# ── 15. CalDAV URL correctness ────────────────────────────────────────────────
def test_peer_my_services_caldav_url_no_radicale(peer_page, webui_base):
"""CalDAV URL shown in My Services must not contain 'radicale' (no DNS record)."""
page, _ = peer_page
page.goto(f"{webui_base}/my-services")
page.wait_for_load_state('networkidle')
try:
# If radicale.dev appears as CalDAV URL it means the bug is back
radicale_url = page.query_selector('text=radicale')
assert radicale_url is None, (
"Found 'radicale' text on My Services page — "
"CalDAV URL should be calendar.dev, not radicale.dev:5232"
)
except AssertionError:
raise
except Exception:
pass # page didn't load — other tests cover that
# ── 16. Email address display ─────────────────────────────────────────────────
def test_peer_my_services_shows_address_label(peer_page, webui_base):
"""MyServices.jsx renders 'Address' label for email (reads email.address)."""
page, _ = peer_page
page.goto(f"{webui_base}/my-services")
page.wait_for_load_state('networkidle')
try:
page.wait_for_selector('text=Address', timeout=5000)
except Exception:
pytest.xfail(
"'Address' label not found on My Services email section — "
"check that email.address is populated in /api/peer/services"
)
+189
View File
@@ -0,0 +1,189 @@
"""
WireGuard E2E: domain name resolution and HTTP access through the VPN tunnel.
Scenarios covered:
30. All *.dev domains resolve to the expected IPs via the CoreDNS server
31. Direct HTTP access to each service IP works through the VPN
32. HTTP access via domain names works through the VPN (DNS + routing)
33. WireGuard config downloaded via /api/peer/services has correct DNS field
34. Peer config DNS points to CoreDNS, not the WireGuard VPN gateway
These tests require a live PIC stack with WireGuard and are marked `wg`.
They run via `make test-e2e-wg` or `pytest tests/e2e/wg/ -m wg`.
"""
import subprocess
import pytest
pytestmark = pytest.mark.wg
# Expected domain→IP mapping for the current default config
DOMAIN_IPS = {
'pic0': '172.20.0.2', # Caddy (main cell domain)
'api': '172.20.0.2', # Caddy reverse-proxy for API
'webui': '172.20.0.2', # Caddy reverse-proxy for WebUI
'calendar': '172.20.0.21', # Caddy VIP for CalDAV
'files': '172.20.0.22', # Caddy VIP for Filegator
'mail': '172.20.0.23', # Caddy VIP for Rainloop
'webmail': '172.20.0.23', # alias for mail VIP
'webdav': '172.20.0.24', # Caddy VIP for WebDAV
}
def _dns_ip(admin_client) -> str:
r = admin_client.get('/api/config')
if r.status_code == 200:
ips = r.json().get('service_ips', {})
if ips.get('dns'):
return ips['dns']
return '172.20.0.3'
def _domain(admin_client) -> str:
r = admin_client.get('/api/config')
if r.status_code == 200:
return r.json().get('domain', 'dev')
return 'dev'
# ── Scenario 30: DNS resolution ───────────────────────────────────────────────
@pytest.mark.parametrize('subdomain,expected_ip', list(DOMAIN_IPS.items()))
def test_dev_domain_resolves_to_expected_ip(connected_peer, admin_client, subdomain, expected_ip):
"""Every .dev subdomain resolves to the correct IP via CoreDNS."""
dns_ip = _dns_ip(admin_client)
dom = _domain(admin_client)
fqdn = f'{subdomain}.{dom}'
result = subprocess.run(
['dig', f'@{dns_ip}', fqdn, 'A', '+short', '+time=5'],
capture_output=True, text=True, timeout=10,
)
assert result.returncode == 0, f"dig failed for {fqdn}: {result.stderr}"
resolved = result.stdout.strip()
assert resolved == expected_ip, (
f"{fqdn} resolved to {resolved!r}, expected {expected_ip}. "
f"DNS server: {dns_ip}"
)
def test_api_domain_does_not_resolve_to_api_container(connected_peer, admin_client):
"""api.dev must route through Caddy (172.20.0.2) — API container listens on :3000, not :80."""
dns_ip = _dns_ip(admin_client)
dom = _domain(admin_client)
result = subprocess.run(
['dig', f'@{dns_ip}', f'api.{dom}', 'A', '+short', '+time=5'],
capture_output=True, text=True, timeout=10,
)
resolved = result.stdout.strip()
assert resolved != '172.20.0.10', (
f"api.{dom} resolves to 172.20.0.10 (API container direct) — "
"this bypasses Caddy so HTTP requests to api.{dom}:80 return nothing; "
"must resolve to Caddy 172.20.0.2"
)
assert resolved == '172.20.0.2', (
f"api.{dom} should resolve to Caddy 172.20.0.2; got {resolved}"
)
def test_webui_domain_does_not_resolve_to_webui_container(connected_peer, admin_client):
"""webui.dev must route through Caddy — WebUI container also doesn't listen on :80 directly."""
dns_ip = _dns_ip(admin_client)
dom = _domain(admin_client)
result = subprocess.run(
['dig', f'@{dns_ip}', f'webui.{dom}', 'A', '+short', '+time=5'],
capture_output=True, text=True, timeout=10,
)
resolved = result.stdout.strip()
assert resolved == '172.20.0.2', (
f"webui.{dom} should resolve to Caddy 172.20.0.2; got {resolved}"
)
# ── Scenario 31: HTTP via IP ───────────────────────────────────────────────────
def test_caddy_ip_serves_http(connected_peer):
"""Caddy IP 172.20.0.2 returns an HTTP response (not connection refused)."""
result = subprocess.run(
['curl', '-s', '-o', '/dev/null', '-w', '%{http_code}', '--connect-timeout', '5',
'http://172.20.0.2/'],
capture_output=True, text=True, timeout=10,
)
code = result.stdout.strip()
assert code not in ('000', ''), f"No HTTP response from 172.20.0.2; curl exit {result.returncode}"
# ── Scenario 32: HTTP via domain ──────────────────────────────────────────────
def test_http_api_domain_reaches_api(connected_peer, admin_client):
"""curl http://api.dev/api/status returns a JSON response via Caddy."""
dom = _domain(admin_client)
dns_ip = _dns_ip(admin_client)
result = subprocess.run(
['curl', '-s', '--connect-timeout', '5',
f'--dns-servers', dns_ip,
f'http://api.{dom}/api/status'],
capture_output=True, text=True, timeout=10,
)
# Any valid JSON response is acceptable (200, 401, etc.)
assert result.stdout.strip(), (
f"curl http://api.{dom}/api/status returned no output via DNS {dns_ip}. "
f"stderr: {result.stderr[:200]}"
)
# ── Scenario 33: Config DNS field ─────────────────────────────────────────────
def test_peer_services_config_has_coredns(admin_client, make_peer):
"""Config returned by /api/peer/services must use CoreDNS IP, not WireGuard VPN gateway."""
from helpers.api_client import PicAPIClient
import os
peer = make_peer('e2etest-dns-config', password='DnsTest123!')
peer_client = PicAPIClient(os.environ.get('PIC_API_BASE', 'http://192.168.31.51:3000'))
peer_client.login(peer['name'], 'DnsTest123!')
r = peer_client.get('/api/peer/services')
assert r.status_code == 200, f"peer services returned {r.status_code}: {r.text}"
data = r.json()
dns = data.get('wireguard', {}).get('dns', '')
assert dns != '10.0.0.1', (
"wireguard.dns is 10.0.0.1 — this is the WireGuard VPN gateway, NOT a DNS server; "
"VPN clients using this as DNS will fail to resolve any domain"
)
config = data.get('wireguard', {}).get('config', '')
if config:
assert 'DNS = 10.0.0.1' not in config, (
"WireGuard config has DNS = 10.0.0.1 — VPN clients will fail to resolve domains"
)
# DNS should be reachable from VPN (must be on the Docker network, not VPN subnet)
dns_from_config = None
for line in config.splitlines():
if line.strip().startswith('DNS ='):
dns_from_config = line.split('=', 1)[1].strip()
break
if dns_from_config:
assert dns_from_config.startswith('172.'), (
f"DNS in config is {dns_from_config} — expected a 172.x.x.x Docker network IP "
f"(CoreDNS is on the Docker bridge, not the WireGuard VPN subnet)"
)
# ── Scenario 34: DNS reachability from VPN ────────────────────────────────────
def test_coredns_reachable_via_vpn(connected_peer, admin_client):
"""CoreDNS at 172.20.0.3 is reachable through the VPN tunnel."""
dns_ip = _dns_ip(admin_client)
result = subprocess.run(
['dig', f'@{dns_ip}', 'health.check', '+time=3', '+tries=1'],
capture_output=True, text=True, timeout=8,
)
# NXDOMAIN means DNS responded — that's a success (connectivity is what we're testing)
responded = 'status:' in result.stdout or result.returncode in (0, 9)
assert responded, (
f"CoreDNS at {dns_ip} did not respond via VPN tunnel. "
f"Check that AllowedIPs in peer config covers 172.20.0.0/16 or 0.0.0.0/0. "
f"stdout: {result.stdout[:200]}"
)
+616
View File
@@ -0,0 +1,616 @@
#!/usr/bin/env python3
"""
Unit tests for /api/peer/dashboard and /api/peer/services.
These tests verify the exact JSON field names and structure returned by
both endpoints so UI/API mismatches surface here before reaching users.
Coverage:
- peer_dashboard returns name/transfer_rx/transfer_tx (not peer_name/rx_bytes/tx_bytes)
- peer_dashboard includes service_urls dict keyed by service name
- peer_services uses files (not webdav) as the file storage key
- peer_services email block uses nested smtp/imap objects with host/port
- peer_services email.address is the full email address (not username)
- peer_services caldav URL uses calendar.{domain}, not radicale.{domain}:5232
- peer_services wireguard block includes a config text field with DNS = <coredns-ip>
- peer_services wireguard DNS is not 10.0.0.1 (WireGuard VPN IP, not CoreDNS)
- Unauthenticated requests return 401; admin sessions return 403 (peer-only zone)
- 404 when session has peer_name but peer not in registry
"""
import os
import sys
import json
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / 'api'))
from app import app
from auth_manager import AuthManager
# ─────────────────────────── helpers ──────────────────────────────────────────
def _make_auth(tmp_path):
data_dir = str(tmp_path / 'data')
cfg_dir = str(tmp_path / 'cfg')
os.makedirs(data_dir, exist_ok=True)
os.makedirs(cfg_dir, exist_ok=True)
mgr = AuthManager(data_dir=data_dir, config_dir=cfg_dir)
mgr.create_user('admin', 'AdminPass123!', 'admin')
mgr.create_user('alice', 'AlicePass123!', 'peer')
return mgr
def _login(client, username, password):
return client.post('/api/auth/login',
data=json.dumps({'username': username, 'password': password}),
content_type='application/json')
FAKE_PEER = {
'peer': 'alice',
'ip': '10.0.0.5',
'public_key': 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=',
'private_key': 'BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB=',
'allowed_ips': '10.0.0.5/32',
'internet_access': True,
'service_access': ['calendar', 'files', 'mail', 'webdav'],
'active': True,
'config_needs_reinstall': False,
}
FAKE_WG_STATS = {
'online': True,
'transfer_rx': 1048576, # 1 MiB
'transfer_tx': 524288, # 512 KiB
'last_handshake': '2026-04-26T18:00:00',
}
DOMAIN = 'dev'
_REGISTRY_SENTINEL = object()
def _mock_registry(peer=_REGISTRY_SENTINEL):
reg = MagicMock()
reg.get_peer.return_value = FAKE_PEER if peer is _REGISTRY_SENTINEL else peer
return reg
def _mock_wg(dns='172.20.0.3'):
wg = MagicMock()
wg.get_peer_status.return_value = FAKE_WG_STATS
wg.get_keys.return_value = {'public_key': 'SERVERPUBKEY=='}
wg.get_server_config.return_value = {'endpoint': '1.2.3.4:51820'}
wg.FULL_TUNNEL_IPS = '0.0.0.0/0, ::/0'
wg.get_split_tunnel_ips.return_value = '10.0.0.0/24, 172.20.0.0/16'
wg.get_peer_config.return_value = (
'[Interface]\n'
f'PrivateKey = BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB=\n'
f'Address = 10.0.0.5/32\n'
f'DNS = {dns}\n'
'\n'
'[Peer]\n'
'PublicKey = SERVERPUBKEY==\n'
'AllowedIPs = 0.0.0.0/0, ::/0\n'
'Endpoint = 1.2.3.4:51820\n'
'PersistentKeepalive = 25\n'
)
return wg
def _mock_config(domain=DOMAIN):
cfg = MagicMock()
cfg.configs = {
'_identity': {'domain': domain, 'cell_name': 'pic0', 'ip_range': '172.20.0.0/16'}
}
return cfg
# ─────────────────────────── fixtures ─────────────────────────────────────────
@pytest.fixture
def auth_mgr(tmp_path):
return _make_auth(tmp_path)
@pytest.fixture
def peer_client(auth_mgr):
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
with patch('app.auth_manager', auth_mgr):
try:
import auth_routes
with patch.object(auth_routes, 'auth_manager', auth_mgr, create=True):
with app.test_client() as c:
r = _login(c, 'alice', 'AlicePass123!')
assert r.status_code == 200, f'peer login failed: {r.data}'
yield c
except (ImportError, AttributeError):
with app.test_client() as c:
r = _login(c, 'alice', 'AlicePass123!')
assert r.status_code == 200, f'peer login failed: {r.data}'
yield c
@pytest.fixture
def admin_client(auth_mgr):
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
with patch('app.auth_manager', auth_mgr):
try:
import auth_routes
with patch.object(auth_routes, 'auth_manager', auth_mgr, create=True):
with app.test_client() as c:
r = _login(c, 'admin', 'AdminPass123!')
assert r.status_code == 200, f'admin login failed: {r.data}'
yield c
except (ImportError, AttributeError):
with app.test_client() as c:
r = _login(c, 'admin', 'AdminPass123!')
assert r.status_code == 200, f'admin login failed: {r.data}'
yield c
# ─────────────────── peer_dashboard field names ────────────────────────────────
class TestPeerDashboardFieldNames:
"""
peer_dashboard() must return the field names PeerDashboard.jsx reads.
A mismatch causes silent zeros/blanks in the UI without any error.
"""
def _get(self, peer_client):
wg = _mock_wg()
reg = _mock_registry()
cfg = _mock_config()
with patch('app.peer_registry', reg), \
patch('app.wireguard_manager', wg), \
patch('app.config_manager', cfg), \
patch('app._resolve_peer_dns', return_value='172.20.0.3'):
return peer_client.get('/api/peer/dashboard')
def test_returns_200(self, peer_client):
r = self._get(peer_client)
assert r.status_code == 200, r.data
def test_has_name_not_peer_name(self, peer_client):
"""PeerDashboard.jsx reads peer.name — must NOT be peer_name."""
r = self._get(peer_client)
data = r.get_json()
assert 'name' in data, f"'name' missing from dashboard; keys: {list(data)}"
assert 'peer_name' not in data, \
"'peer_name' still present — UI reads 'name', not 'peer_name'"
def test_name_value_is_peer_username(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['name'] == 'alice'
def test_has_transfer_rx_not_rx_bytes(self, peer_client):
"""PeerDashboard.jsx reads peer.transfer_rx — must NOT be rx_bytes."""
r = self._get(peer_client)
data = r.get_json()
assert 'transfer_rx' in data, f"'transfer_rx' missing; keys: {list(data)}"
assert 'rx_bytes' not in data, \
"'rx_bytes' still present — UI reads 'transfer_rx'"
def test_has_transfer_tx_not_tx_bytes(self, peer_client):
"""PeerDashboard.jsx reads peer.transfer_tx — must NOT be tx_bytes."""
r = self._get(peer_client)
data = r.get_json()
assert 'transfer_tx' in data, f"'transfer_tx' missing; keys: {list(data)}"
assert 'tx_bytes' not in data, \
"'tx_bytes' still present — UI reads 'transfer_tx'"
def test_transfer_rx_value_from_wg_stats(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['transfer_rx'] == 1048576
def test_transfer_tx_value_from_wg_stats(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['transfer_tx'] == 524288
def test_has_service_urls(self, peer_client):
"""Dashboard must include service_urls so UI can render direct service links."""
r = self._get(peer_client)
data = r.get_json()
assert 'service_urls' in data, f"'service_urls' missing; keys: {list(data)}"
assert isinstance(data['service_urls'], dict)
def test_service_urls_keyed_by_service(self, peer_client):
r = self._get(peer_client)
urls = r.get_json()['service_urls']
for svc in ('calendar', 'files', 'mail', 'webdav'):
assert svc in urls, f"service_urls missing '{svc}'; got: {list(urls)}"
def test_service_urls_use_configured_domain(self, peer_client):
r = self._get(peer_client)
urls = r.get_json()['service_urls']
assert urls['calendar'] == 'http://calendar.dev'
assert urls['files'] == 'http://files.dev'
assert urls['mail'] == 'http://mail.dev'
assert urls['webdav'] == 'http://webdav.dev'
def test_online_and_last_handshake_present(self, peer_client):
r = self._get(peer_client)
data = r.get_json()
assert 'online' in data
assert 'last_handshake' in data
def test_peer_not_in_registry_returns_404(self, peer_client):
reg = _mock_registry(peer=None)
cfg = _mock_config()
with patch('app.peer_registry', reg), patch('app.config_manager', cfg):
r = peer_client.get('/api/peer/dashboard')
assert r.status_code == 404
# ─────────────────── peer_services structure ──────────────────────────────────
class TestPeerServicesStructure:
"""
peer_services() must return the exact structure MyServices.jsx reads.
All field-name mismatches cause silent blanks in the UI.
"""
def _get(self, peer_client, dns='172.20.0.3'):
wg = _mock_wg(dns)
reg = _mock_registry()
cfg = _mock_config()
with patch('app.peer_registry', reg), \
patch('app.wireguard_manager', wg), \
patch('app.config_manager', cfg), \
patch('app._resolve_peer_dns', return_value=dns):
return peer_client.get('/api/peer/services')
def test_returns_200(self, peer_client):
r = self._get(peer_client)
assert r.status_code == 200, r.data
# -- top-level keys --------------------------------------------------------
def test_has_username_at_top_level(self, peer_client):
"""MyServices.jsx uses data?.username for the WireGuard config download filename."""
r = self._get(peer_client)
data = r.get_json()
assert 'username' in data, f"'username' missing from top level; keys: {list(data)}"
assert data['username'] == 'alice'
def test_has_files_not_webdav(self, peer_client):
"""MyServices.jsx reads data?.files — key must be 'files', not 'webdav'."""
r = self._get(peer_client)
data = r.get_json()
assert 'files' in data, f"'files' key missing; keys: {list(data)}"
assert 'webdav' not in data, \
"'webdav' key still present — MyServices.jsx reads 'files'"
def test_has_wireguard_email_caldav_files(self, peer_client):
r = self._get(peer_client)
data = r.get_json()
for section in ('wireguard', 'email', 'caldav', 'files'):
assert section in data, f"'{section}' section missing; keys: {list(data)}"
# -- email section ---------------------------------------------------------
def test_email_address_not_username(self, peer_client):
"""MyServices.jsx reads email.address — must NOT be email.username."""
r = self._get(peer_client)
email = r.get_json()['email']
assert 'address' in email, f"'address' missing from email; keys: {list(email)}"
assert 'username' not in email, \
"'username' still in email section — MyServices.jsx reads 'address'"
def test_email_address_is_full_address(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['email']['address'] == 'alice@dev'
def test_email_has_nested_smtp(self, peer_client):
"""MyServices.jsx reads email.smtp.host and email.smtp.port as nested objects."""
r = self._get(peer_client)
email = r.get_json()['email']
assert 'smtp' in email, f"'smtp' missing from email; keys: {list(email)}"
smtp = email['smtp']
assert 'host' in smtp, f"'host' missing from email.smtp; keys: {list(smtp)}"
assert 'port' in smtp, f"'port' missing from email.smtp; keys: {list(smtp)}"
def test_email_has_nested_imap(self, peer_client):
"""MyServices.jsx reads email.imap.host and email.imap.port as nested objects."""
r = self._get(peer_client)
email = r.get_json()['email']
assert 'imap' in email, f"'imap' missing from email; keys: {list(email)}"
imap = email['imap']
assert 'host' in imap, f"'host' missing from email.imap; keys: {list(imap)}"
assert 'port' in imap, f"'port' missing from email.imap; keys: {list(imap)}"
def test_email_no_flat_host_fields(self, peer_client):
"""Flat imap_host/smtp_host fields must not be present."""
r = self._get(peer_client)
email = r.get_json()['email']
assert 'imap_host' not in email, \
"'imap_host' still flat — MyServices.jsx reads email.imap.host"
assert 'smtp_host' not in email, \
"'smtp_host' still flat — MyServices.jsx reads email.smtp.host"
def test_email_smtp_host_is_mail_domain(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['email']['smtp']['host'] == 'mail.dev'
def test_email_imap_host_is_mail_domain(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['email']['imap']['host'] == 'mail.dev'
def test_email_smtp_port(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['email']['smtp']['port'] == 587
def test_email_imap_port(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['email']['imap']['port'] == 993
# -- caldav section --------------------------------------------------------
def test_caldav_url_uses_calendar_subdomain(self, peer_client):
"""CalDAV URL must be http://calendar.{domain}, not radicale.{domain}:5232.
radicale.dev has no DNS record; calendar.dev is the Caddy-proxied entry."""
r = self._get(peer_client)
url = r.get_json()['caldav']['url']
assert 'radicale' not in url, \
f"CalDAV URL contains 'radicale' — no DNS record exists for radicale.dev; got: {url}"
assert ':5232' not in url, \
f"CalDAV URL exposes internal port 5232 — should use Caddy-proxied URL; got: {url}"
assert url == f'http://calendar.dev', f"CalDAV URL wrong: {url}"
def test_caldav_username_is_peer_name(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['caldav']['username'] == 'alice'
# -- files section ---------------------------------------------------------
def test_files_url_uses_files_subdomain(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['files']['url'] == 'http://files.dev'
def test_files_username_is_peer_name(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['files']['username'] == 'alice'
# -- wireguard section -----------------------------------------------------
def test_wireguard_dns_is_not_vpn_gateway(self, peer_client):
"""DNS must be the CoreDNS container IP, not the WireGuard VPN gateway 10.0.0.1.
10.0.0.1 is the WireGuard server-side VPN IP which does NOT serve DNS."""
r = self._get(peer_client)
dns = r.get_json()['wireguard'].get('dns', '')
assert dns != '10.0.0.1', \
"wireguard.dns is 10.0.0.1 (WireGuard VPN gateway) — should be CoreDNS IP"
def test_wireguard_dns_is_coredns_ip(self, peer_client):
"""DNS must be 172.20.0.3 (the CoreDNS container on the Docker bridge)."""
r = self._get(peer_client)
assert r.get_json()['wireguard']['dns'] == '172.20.0.3'
def test_wireguard_has_config_field(self, peer_client):
"""wg.config field allows peer to download/copy their WireGuard config."""
r = self._get(peer_client)
wg = r.get_json()['wireguard']
assert 'config' in wg, f"'config' missing from wireguard section; keys: {list(wg)}"
def test_wireguard_config_has_dns_line(self, peer_client):
"""The config text must contain a DNS = line pointing to CoreDNS."""
r = self._get(peer_client)
config = r.get_json()['wireguard'].get('config', '')
assert 'DNS = 172.20.0.3' in config, \
f"Config missing 'DNS = 172.20.0.3'; config:\n{config}"
def test_wireguard_config_has_interface_section(self, peer_client):
r = self._get(peer_client)
config = r.get_json()['wireguard'].get('config', '')
assert '[Interface]' in config and '[Peer]' in config, \
f"Config missing [Interface] or [Peer] section; config:\n{config}"
def test_wireguard_config_has_full_tunnel_allowed_ips(self, peer_client):
"""Full-tunnel peers must have AllowedIPs = 0.0.0.0/0 so all traffic goes via VPN."""
r = self._get(peer_client)
config = r.get_json()['wireguard'].get('config', '')
assert '0.0.0.0/0' in config, \
f"Config missing 0.0.0.0/0 AllowedIPs for full-tunnel peer; config:\n{config}"
def test_wireguard_has_ip_field(self, peer_client):
r = self._get(peer_client)
assert 'ip' in r.get_json()['wireguard']
def test_wireguard_ip_is_peer_vpn_ip(self, peer_client):
r = self._get(peer_client)
assert r.get_json()['wireguard']['ip'] == '10.0.0.5'
# ─────────────────── auth / access control ────────────────────────────────────
class TestPeerEndpointAccessControl:
"""Peer-only routes must block unauthenticated and admin sessions."""
def test_unauthenticated_dashboard_returns_401(self, auth_mgr):
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
with patch('app.auth_manager', auth_mgr):
with app.test_client() as c:
r = c.get('/api/peer/dashboard')
assert r.status_code == 401
def test_unauthenticated_services_returns_401(self, auth_mgr):
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
with patch('app.auth_manager', auth_mgr):
with app.test_client() as c:
r = c.get('/api/peer/services')
assert r.status_code == 401
def test_admin_dashboard_returns_403(self, admin_client):
r = admin_client.get('/api/peer/dashboard')
assert r.status_code == 403, \
f"Admin accessing peer-only /api/peer/dashboard should get 403, got {r.status_code}"
def test_admin_services_returns_403(self, admin_client):
r = admin_client.get('/api/peer/services')
assert r.status_code == 403, \
f"Admin accessing peer-only /api/peer/services should get 403, got {r.status_code}"
# ─────────────────── DNS zone records ─────────────────────────────────────────
class TestDNSZoneRecords:
"""
Verify that network_manager._build_dns_records() generates the correct IPs.
api and webui must point to Caddy (not their container IPs) so Caddy can
reverse-proxy them — their containers don't listen on port 80.
"""
def setUp(self):
pass
def _records(self, ip_range='172.20.0.0/16', cell_name='pic0'):
import network_manager as nm
mgr = nm.NetworkManager.__new__(nm.NetworkManager)
return mgr._build_dns_records(cell_name, ip_range)
def test_api_resolves_to_caddy_not_api_container(self):
records = self._records()
api_rec = next((r for r in records if r['name'] == 'api'), None)
assert api_rec is not None, "No DNS record for 'api'"
assert api_rec['value'] == '172.20.0.2', (
f"api.dev should resolve to Caddy (172.20.0.2), not the API container "
f"(172.20.0.10); got {api_rec['value']}"
)
def test_webui_resolves_to_caddy_not_webui_container(self):
records = self._records()
rec = next((r for r in records if r['name'] == 'webui'), None)
assert rec is not None, "No DNS record for 'webui'"
assert rec['value'] == '172.20.0.2', (
f"webui.dev should resolve to Caddy (172.20.0.2), not the WebUI container "
f"(172.20.0.11); got {rec['value']}"
)
def test_calendar_uses_vip(self):
records = self._records()
rec = next((r for r in records if r['name'] == 'calendar'), None)
assert rec and rec['value'] == '172.20.0.21', \
f"calendar.dev VIP should be 172.20.0.21; got {rec}"
def test_files_uses_vip(self):
records = self._records()
rec = next((r for r in records if r['name'] == 'files'), None)
assert rec and rec['value'] == '172.20.0.22', \
f"files.dev VIP should be 172.20.0.22; got {rec}"
def test_mail_uses_vip(self):
records = self._records()
rec = next((r for r in records if r['name'] == 'mail'), None)
assert rec and rec['value'] == '172.20.0.23', \
f"mail.dev VIP should be 172.20.0.23; got {rec}"
def test_webmail_uses_mail_vip(self):
records = self._records()
rec = next((r for r in records if r['name'] == 'webmail'), None)
assert rec and rec['value'] == '172.20.0.23', \
f"webmail.dev should share the mail VIP 172.20.0.23; got {rec}"
def test_webdav_uses_vip(self):
records = self._records()
rec = next((r for r in records if r['name'] == 'webdav'), None)
assert rec and rec['value'] == '172.20.0.24', \
f"webdav.dev VIP should be 172.20.0.24; got {rec}"
def test_cell_name_resolves_to_caddy(self):
records = self._records(cell_name='mypic')
rec = next((r for r in records if r['name'] == 'mypic'), None)
assert rec and rec['value'] == '172.20.0.2', \
f"mypic.dev should resolve to Caddy (172.20.0.2); got {rec}"
def test_all_records_are_type_a(self):
records = self._records()
for rec in records:
assert rec.get('type') == 'A', f"Record {rec} is not type A"
class TestDNSZoneRecordsWithPytest:
"""Same as above but using pytest-style (no setUp/tearDown)."""
@pytest.fixture
def records(self):
import network_manager as nm
mgr = nm.NetworkManager.__new__(nm.NetworkManager)
return mgr._build_dns_records('pic0', '172.20.0.0/16')
def test_api_resolves_to_caddy(self, records):
rec = next((r for r in records if r['name'] == 'api'), None)
assert rec and rec['value'] == '172.20.0.2', \
f"api.dev should point to Caddy (172.20.0.2); got {rec}"
def test_webui_resolves_to_caddy(self, records):
rec = next((r for r in records if r['name'] == 'webui'), None)
assert rec and rec['value'] == '172.20.0.2', \
f"webui.dev should point to Caddy (172.20.0.2); got {rec}"
# ─────────────────── Caddyfile generation ─────────────────────────────────────
class TestCaddyfileGeneration:
"""
write_caddyfile() must produce a Caddyfile that Caddy can use to route
all service domains including webui.dev.
"""
@pytest.fixture
def caddyfile(self, tmp_path):
from ip_utils import write_caddyfile
path = str(tmp_path / 'Caddyfile')
write_caddyfile('172.20.0.0/16', 'pic0', 'dev', path)
with open(path) as f:
return f.read()
def test_main_domain_block_present(self, caddyfile):
assert 'http://pic0.dev' in caddyfile
def test_api_block_present(self, caddyfile):
assert 'http://api.dev' in caddyfile
def test_webui_block_present(self, caddyfile):
assert 'http://webui.dev' in caddyfile, \
"Missing webui.dev Caddy block — webui is unreachable by domain name"
def test_calendar_block_present(self, caddyfile):
assert 'http://calendar.dev' in caddyfile
def test_files_block_present(self, caddyfile):
assert 'http://files.dev' in caddyfile
def test_mail_block_present(self, caddyfile):
assert 'http://mail.dev' in caddyfile
def test_webdav_block_present(self, caddyfile):
assert 'http://webdav.dev' in caddyfile
def test_caddy_vips_present(self, caddyfile):
assert '172.20.0.21' in caddyfile, "calendar VIP missing from Caddyfile"
assert '172.20.0.22' in caddyfile, "files VIP missing from Caddyfile"
assert '172.20.0.23' in caddyfile, "mail VIP missing from Caddyfile"
assert '172.20.0.24' in caddyfile, "webdav VIP missing from Caddyfile"
def test_no_radicale_subdomain_in_caddyfile(self, caddyfile):
"""radicale.dev has no DNS record; CalDAV should go via calendar.dev."""
assert 'radicale.dev' not in caddyfile, \
"radicale.dev should not appear in Caddyfile — no DNS record for it"
def test_auto_https_off(self, caddyfile):
assert 'auto_https off' in caddyfile
def test_reverse_proxy_targets_use_container_names(self, caddyfile):
"""Container-internal routing must use service names not IPs."""
assert 'cell-api:3000' in caddyfile
assert 'cell-radicale:5232' in caddyfile
assert 'cell-webui:80' in caddyfile