Files
pic/tests/e2e/api/test_peer_access_update.py
roof 1a611e0474 fix: UI always accessible; fix exit-relay AllowedIPs not updating
**PIC UI always accessible (service_access=[])**
Remove the per-peer Caddy:80 ACCEPT/DROP rule from apply_peer_rules.
Service access was enforced at two layers (iptables DROP + CoreDNS ACL),
but the iptables layer also blocked the PIC web UI served through Caddy.
CoreDNS ACL alone is sufficient — DNS blocks service hostnames; the UI
path through Caddy remains reachable regardless of service_access value.

**Exit-relay internet routing (route_via another cell)**
update_peer_ip validated new_ip as a single ip_network, rejecting the
comma-separated '10.0.1.0/24, 0.0.0.0/0' string passed by
update_cell_peer_allowed_ips(add_default_route=True). The AllowedIPs
in wg0.conf was never updated, so WireGuard never routed internet traffic
through the exit cell's tunnel. Fix: validate each CIDR individually and
apply the change live via wg set without a container restart.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 05:41:22 -04:00

248 lines
10 KiB
Python

"""
Peer access update E2E tests.
Verifies that editing a peer's service_access, internet_access, and peer_access
settings via PUT /api/peers/<name> is persisted and reflected in the live
iptables and CoreDNS state immediately after the call returns.
Run against a live cell:
PIC_HOST=localhost pytest tests/e2e/api/test_peer_access_update.py -v
"""
import time
import subprocess
import pytest
from helpers.api_client import PicAPIClient
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_peer(admin_client, name: str) -> dict:
r = admin_client.get(f'/api/peers/{name}')
assert r.status_code == 200, f'GET /api/peers/{name} failed: {r.status_code} {r.text}'
return r.json()
def _update_peer(admin_client, name: str, **fields) -> dict:
r = admin_client.put(f'/api/peers/{name}', json=fields)
assert r.status_code == 200, f'PUT /api/peers/{name} failed: {r.status_code} {r.text}'
return r.json()
def _wg_forward_rules(admin_client) -> str:
"""Return raw iptables FORWARD rules from inside the WireGuard container."""
r = admin_client.post('/api/debug/iptables-forward', json={})
if r.status_code == 200:
return r.text
# Fallback: read directly via docker exec (only works on the same host)
try:
result = subprocess.run(
['docker', 'exec', 'cell-wireguard', 'iptables', '-L', 'FORWARD', '-n', '--line-numbers'],
capture_output=True, text=True, timeout=5
)
return result.stdout
except Exception:
return ''
def _corefile_content(admin_client) -> str:
"""Return the current Corefile content via the API or direct read."""
r = admin_client.get('/api/network/dns/corefile')
if r.status_code == 200:
return r.text
# Fallback: read from the mapped config path
try:
with open('/home/roof/pic/config/dns/Corefile') as f:
return f.read()
except Exception:
return ''
# ---------------------------------------------------------------------------
# service_access update tests
# ---------------------------------------------------------------------------
class TestServiceAccessUpdate:
def test_restrict_all_services_no_caddy_drop_rule(self, make_peer, admin_client):
"""Setting service_access=[] must NOT create a DROP rule for Caddy:80.
Service access is controlled by CoreDNS ACL. Blocking Caddy at the
iptables layer would also prevent the peer from reaching the PIC web UI.
The peer must still be able to reach the UI regardless of service_access.
"""
peer = make_peer('e2etest-svc-drop')
peer_ip = peer['ip']
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=[],
peer_access=True)
rules = _wg_forward_rules(admin_client)
if not rules:
return # can't verify without iptables access — skip silently
# No Caddy-targeted DROP for this peer; service blocking is DNS-ACL only
caddy_drop = f'{peer_ip}' in rules and 'DROP' in rules and 'dpt:80' in rules
assert not caddy_drop, (
f'Found Caddy DROP rule for {peer_ip} after service_access=[] — '
f'this blocks the PIC UI. Service access should be DNS-ACL only.\n{rules}'
)
def test_internet_access_peer_has_accept_rule(self, make_peer, admin_client):
"""A peer with internet_access=True has a catch-all ACCEPT rule."""
peer = make_peer('e2etest-svc-partial', service_access=[])
peer_ip = peer['ip']
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar'],
peer_access=True)
rules = _wg_forward_rules(admin_client)
assert rules, 'Could not read iptables rules'
assert peer_ip in rules and 'ACCEPT' in rules, (
f'Expected ACCEPT rule for {peer_ip} after internet_access=True, '
f'got:\n{rules}'
)
def test_service_access_reflected_in_peer_registry(self, make_peer, admin_client):
"""Updated service_access is persisted in the peer registry."""
peer = make_peer('e2etest-svc-persist',
service_access=['calendar', 'files', 'mail', 'webdav'])
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar'],
peer_access=True)
stored = _get_peer(admin_client, peer['name'])
assert stored.get('service_access') == ['calendar'], (
f"Expected service_access=['calendar'] in registry, got: {stored.get('service_access')}"
)
def test_blocked_service_appears_in_corefile_acl(self, make_peer, admin_client):
"""Blocked services for a peer appear in the CoreDNS Corefile ACL."""
peer = make_peer('e2etest-svc-acl', service_access=['calendar', 'files', 'mail', 'webdav'])
peer_ip = peer['ip']
# Block files and webdav
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar', 'mail'],
peer_access=True)
# Small delay for CoreDNS reload to process SIGUSR1
time.sleep(1)
corefile = _corefile_content(admin_client)
assert corefile, 'Could not read Corefile'
assert f'block net {peer_ip}/32' in corefile, (
f'Expected block rule for {peer_ip} in Corefile after removing files/webdav. '
f'Corefile:\n{corefile}'
)
def test_fully_allowed_peer_has_no_acl_block(self, make_peer, admin_client):
"""Peer with all services allowed has no ACL block entry in Corefile."""
peer = make_peer('e2etest-svc-full', service_access=['calendar'])
peer_ip = peer['ip']
# Grant all services
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar', 'files', 'mail', 'webdav'],
peer_access=True)
time.sleep(1)
corefile = _corefile_content(admin_client)
assert f'block net {peer_ip}/32' not in corefile, (
f'No ACL block expected for fully-allowed peer {peer_ip}, but found one. '
f'Corefile:\n{corefile}'
)
def test_multiple_peers_blocked_from_same_service_in_single_acl_block(
self, make_peer, admin_client
):
"""Two peers blocked from the same service share one acl block (not separate blocks)."""
peer_a = make_peer('e2etest-svc-multi-a',
service_access=['calendar', 'files', 'mail', 'webdav'])
peer_b = make_peer('e2etest-svc-multi-b',
service_access=['calendar', 'files', 'mail', 'webdav'])
# Block both from files
_update_peer(admin_client, peer_a['name'],
service_access=['calendar', 'mail', 'webdav'])
_update_peer(admin_client, peer_b['name'],
service_access=['calendar', 'mail', 'webdav'])
time.sleep(1)
corefile = _corefile_content(admin_client)
assert corefile, 'Could not read Corefile'
# Count how many acl blocks exist for the files service
domain = corefile.split('forward . ')[0].strip().rstrip('{').strip()
# Find acl blocks for files
acl_files_count = corefile.count('acl files.')
assert acl_files_count == 1, (
f'Expected exactly 1 acl block for files (both peers merged), '
f'got {acl_files_count}. Corefile:\n{corefile}'
)
assert peer_a['ip'] in corefile, f"peer_a IP {peer_a['ip']} not in Corefile ACL"
assert peer_b['ip'] in corefile, f"peer_b IP {peer_b['ip']} not in Corefile ACL"
# ---------------------------------------------------------------------------
# internet_access update tests
# ---------------------------------------------------------------------------
class TestInternetAccessUpdate:
def test_disable_internet_access_sets_config_reinstall_flag(self, make_peer, admin_client):
"""Changing internet_access sets config_needs_reinstall on the peer."""
peer = make_peer('e2etest-inet-flag')
r = _update_peer(admin_client, peer['name'],
internet_access=False,
service_access=['calendar', 'files', 'mail', 'webdav'])
assert r.get('config_changed') is True, (
f'Expected config_changed=True when internet_access changes, got: {r}'
)
stored = _get_peer(admin_client, peer['name'])
assert stored.get('internet_access') is False
assert stored.get('config_needs_reinstall') is True
def test_internet_access_persisted(self, make_peer, admin_client):
peer = make_peer('e2etest-inet-persist')
_update_peer(admin_client, peer['name'], internet_access=False,
service_access=['calendar', 'files', 'mail', 'webdav'])
stored = _get_peer(admin_client, peer['name'])
assert stored.get('internet_access') is False
# ---------------------------------------------------------------------------
# peer_access update tests
# ---------------------------------------------------------------------------
class TestPeerAccessUpdate:
def test_peer_access_persisted(self, make_peer, admin_client):
peer = make_peer('e2etest-peer-access')
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar', 'files', 'mail', 'webdav'],
peer_access=False)
stored = _get_peer(admin_client, peer['name'])
assert stored.get('peer_access') is False
def test_re_enabling_peer_access_persisted(self, make_peer, admin_client):
peer = make_peer('e2etest-peer-reenable')
_update_peer(admin_client, peer['name'], peer_access=False,
service_access=['calendar', 'files', 'mail', 'webdav'])
_update_peer(admin_client, peer['name'], peer_access=True,
service_access=['calendar', 'files', 'mail', 'webdav'])
stored = _get_peer(admin_client, peer['name'])
assert stored.get('peer_access') is True