1a611e0474
**PIC UI always accessible (service_access=[])** Remove the per-peer Caddy:80 ACCEPT/DROP rule from apply_peer_rules. Service access was enforced at two layers (iptables DROP + CoreDNS ACL), but the iptables layer also blocked the PIC web UI served through Caddy. CoreDNS ACL alone is sufficient — DNS blocks service hostnames; the UI path through Caddy remains reachable regardless of service_access value. **Exit-relay internet routing (route_via another cell)** update_peer_ip validated new_ip as a single ip_network, rejecting the comma-separated '10.0.1.0/24, 0.0.0.0/0' string passed by update_cell_peer_allowed_ips(add_default_route=True). The AllowedIPs in wg0.conf was never updated, so WireGuard never routed internet traffic through the exit cell's tunnel. Fix: validate each CIDR individually and apply the change live via wg set without a container restart. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
248 lines
10 KiB
Python
248 lines
10 KiB
Python
"""
|
|
Peer access update E2E tests.
|
|
|
|
Verifies that editing a peer's service_access, internet_access, and peer_access
|
|
settings via PUT /api/peers/<name> is persisted and reflected in the live
|
|
iptables and CoreDNS state immediately after the call returns.
|
|
|
|
Run against a live cell:
|
|
PIC_HOST=localhost pytest tests/e2e/api/test_peer_access_update.py -v
|
|
"""
|
|
import time
|
|
import subprocess
|
|
import pytest
|
|
|
|
from helpers.api_client import PicAPIClient
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_peer(admin_client, name: str) -> dict:
|
|
r = admin_client.get(f'/api/peers/{name}')
|
|
assert r.status_code == 200, f'GET /api/peers/{name} failed: {r.status_code} {r.text}'
|
|
return r.json()
|
|
|
|
|
|
def _update_peer(admin_client, name: str, **fields) -> dict:
|
|
r = admin_client.put(f'/api/peers/{name}', json=fields)
|
|
assert r.status_code == 200, f'PUT /api/peers/{name} failed: {r.status_code} {r.text}'
|
|
return r.json()
|
|
|
|
|
|
def _wg_forward_rules(admin_client) -> str:
|
|
"""Return raw iptables FORWARD rules from inside the WireGuard container."""
|
|
r = admin_client.post('/api/debug/iptables-forward', json={})
|
|
if r.status_code == 200:
|
|
return r.text
|
|
# Fallback: read directly via docker exec (only works on the same host)
|
|
try:
|
|
result = subprocess.run(
|
|
['docker', 'exec', 'cell-wireguard', 'iptables', '-L', 'FORWARD', '-n', '--line-numbers'],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
return result.stdout
|
|
except Exception:
|
|
return ''
|
|
|
|
|
|
def _corefile_content(admin_client) -> str:
|
|
"""Return the current Corefile content via the API or direct read."""
|
|
r = admin_client.get('/api/network/dns/corefile')
|
|
if r.status_code == 200:
|
|
return r.text
|
|
# Fallback: read from the mapped config path
|
|
try:
|
|
with open('/home/roof/pic/config/dns/Corefile') as f:
|
|
return f.read()
|
|
except Exception:
|
|
return ''
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# service_access update tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestServiceAccessUpdate:
|
|
|
|
def test_restrict_all_services_no_caddy_drop_rule(self, make_peer, admin_client):
|
|
"""Setting service_access=[] must NOT create a DROP rule for Caddy:80.
|
|
|
|
Service access is controlled by CoreDNS ACL. Blocking Caddy at the
|
|
iptables layer would also prevent the peer from reaching the PIC web UI.
|
|
The peer must still be able to reach the UI regardless of service_access.
|
|
"""
|
|
peer = make_peer('e2etest-svc-drop')
|
|
peer_ip = peer['ip']
|
|
|
|
_update_peer(admin_client, peer['name'],
|
|
internet_access=True,
|
|
service_access=[],
|
|
peer_access=True)
|
|
|
|
rules = _wg_forward_rules(admin_client)
|
|
if not rules:
|
|
return # can't verify without iptables access — skip silently
|
|
# No Caddy-targeted DROP for this peer; service blocking is DNS-ACL only
|
|
caddy_drop = f'{peer_ip}' in rules and 'DROP' in rules and 'dpt:80' in rules
|
|
assert not caddy_drop, (
|
|
f'Found Caddy DROP rule for {peer_ip} after service_access=[] — '
|
|
f'this blocks the PIC UI. Service access should be DNS-ACL only.\n{rules}'
|
|
)
|
|
|
|
def test_internet_access_peer_has_accept_rule(self, make_peer, admin_client):
|
|
"""A peer with internet_access=True has a catch-all ACCEPT rule."""
|
|
peer = make_peer('e2etest-svc-partial', service_access=[])
|
|
peer_ip = peer['ip']
|
|
|
|
_update_peer(admin_client, peer['name'],
|
|
internet_access=True,
|
|
service_access=['calendar'],
|
|
peer_access=True)
|
|
|
|
rules = _wg_forward_rules(admin_client)
|
|
assert rules, 'Could not read iptables rules'
|
|
assert peer_ip in rules and 'ACCEPT' in rules, (
|
|
f'Expected ACCEPT rule for {peer_ip} after internet_access=True, '
|
|
f'got:\n{rules}'
|
|
)
|
|
|
|
def test_service_access_reflected_in_peer_registry(self, make_peer, admin_client):
|
|
"""Updated service_access is persisted in the peer registry."""
|
|
peer = make_peer('e2etest-svc-persist',
|
|
service_access=['calendar', 'files', 'mail', 'webdav'])
|
|
|
|
_update_peer(admin_client, peer['name'],
|
|
internet_access=True,
|
|
service_access=['calendar'],
|
|
peer_access=True)
|
|
|
|
stored = _get_peer(admin_client, peer['name'])
|
|
assert stored.get('service_access') == ['calendar'], (
|
|
f"Expected service_access=['calendar'] in registry, got: {stored.get('service_access')}"
|
|
)
|
|
|
|
def test_blocked_service_appears_in_corefile_acl(self, make_peer, admin_client):
|
|
"""Blocked services for a peer appear in the CoreDNS Corefile ACL."""
|
|
peer = make_peer('e2etest-svc-acl', service_access=['calendar', 'files', 'mail', 'webdav'])
|
|
peer_ip = peer['ip']
|
|
|
|
# Block files and webdav
|
|
_update_peer(admin_client, peer['name'],
|
|
internet_access=True,
|
|
service_access=['calendar', 'mail'],
|
|
peer_access=True)
|
|
|
|
# Small delay for CoreDNS reload to process SIGUSR1
|
|
time.sleep(1)
|
|
|
|
corefile = _corefile_content(admin_client)
|
|
assert corefile, 'Could not read Corefile'
|
|
assert f'block net {peer_ip}/32' in corefile, (
|
|
f'Expected block rule for {peer_ip} in Corefile after removing files/webdav. '
|
|
f'Corefile:\n{corefile}'
|
|
)
|
|
|
|
def test_fully_allowed_peer_has_no_acl_block(self, make_peer, admin_client):
|
|
"""Peer with all services allowed has no ACL block entry in Corefile."""
|
|
peer = make_peer('e2etest-svc-full', service_access=['calendar'])
|
|
peer_ip = peer['ip']
|
|
|
|
# Grant all services
|
|
_update_peer(admin_client, peer['name'],
|
|
internet_access=True,
|
|
service_access=['calendar', 'files', 'mail', 'webdav'],
|
|
peer_access=True)
|
|
|
|
time.sleep(1)
|
|
corefile = _corefile_content(admin_client)
|
|
assert f'block net {peer_ip}/32' not in corefile, (
|
|
f'No ACL block expected for fully-allowed peer {peer_ip}, but found one. '
|
|
f'Corefile:\n{corefile}'
|
|
)
|
|
|
|
def test_multiple_peers_blocked_from_same_service_in_single_acl_block(
|
|
self, make_peer, admin_client
|
|
):
|
|
"""Two peers blocked from the same service share one acl block (not separate blocks)."""
|
|
peer_a = make_peer('e2etest-svc-multi-a',
|
|
service_access=['calendar', 'files', 'mail', 'webdav'])
|
|
peer_b = make_peer('e2etest-svc-multi-b',
|
|
service_access=['calendar', 'files', 'mail', 'webdav'])
|
|
|
|
# Block both from files
|
|
_update_peer(admin_client, peer_a['name'],
|
|
service_access=['calendar', 'mail', 'webdav'])
|
|
_update_peer(admin_client, peer_b['name'],
|
|
service_access=['calendar', 'mail', 'webdav'])
|
|
|
|
time.sleep(1)
|
|
corefile = _corefile_content(admin_client)
|
|
assert corefile, 'Could not read Corefile'
|
|
|
|
# Count how many acl blocks exist for the files service
|
|
domain = corefile.split('forward . ')[0].strip().rstrip('{').strip()
|
|
# Find acl blocks for files
|
|
acl_files_count = corefile.count('acl files.')
|
|
assert acl_files_count == 1, (
|
|
f'Expected exactly 1 acl block for files (both peers merged), '
|
|
f'got {acl_files_count}. Corefile:\n{corefile}'
|
|
)
|
|
assert peer_a['ip'] in corefile, f"peer_a IP {peer_a['ip']} not in Corefile ACL"
|
|
assert peer_b['ip'] in corefile, f"peer_b IP {peer_b['ip']} not in Corefile ACL"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# internet_access update tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestInternetAccessUpdate:
|
|
|
|
def test_disable_internet_access_sets_config_reinstall_flag(self, make_peer, admin_client):
|
|
"""Changing internet_access sets config_needs_reinstall on the peer."""
|
|
peer = make_peer('e2etest-inet-flag')
|
|
|
|
r = _update_peer(admin_client, peer['name'],
|
|
internet_access=False,
|
|
service_access=['calendar', 'files', 'mail', 'webdav'])
|
|
|
|
assert r.get('config_changed') is True, (
|
|
f'Expected config_changed=True when internet_access changes, got: {r}'
|
|
)
|
|
stored = _get_peer(admin_client, peer['name'])
|
|
assert stored.get('internet_access') is False
|
|
assert stored.get('config_needs_reinstall') is True
|
|
|
|
def test_internet_access_persisted(self, make_peer, admin_client):
|
|
peer = make_peer('e2etest-inet-persist')
|
|
_update_peer(admin_client, peer['name'], internet_access=False,
|
|
service_access=['calendar', 'files', 'mail', 'webdav'])
|
|
stored = _get_peer(admin_client, peer['name'])
|
|
assert stored.get('internet_access') is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# peer_access update tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPeerAccessUpdate:
|
|
|
|
def test_peer_access_persisted(self, make_peer, admin_client):
|
|
peer = make_peer('e2etest-peer-access')
|
|
_update_peer(admin_client, peer['name'],
|
|
internet_access=True,
|
|
service_access=['calendar', 'files', 'mail', 'webdav'],
|
|
peer_access=False)
|
|
stored = _get_peer(admin_client, peer['name'])
|
|
assert stored.get('peer_access') is False
|
|
|
|
def test_re_enabling_peer_access_persisted(self, make_peer, admin_client):
|
|
peer = make_peer('e2etest-peer-reenable')
|
|
_update_peer(admin_client, peer['name'], peer_access=False,
|
|
service_access=['calendar', 'files', 'mail', 'webdav'])
|
|
_update_peer(admin_client, peer['name'], peer_access=True,
|
|
service_access=['calendar', 'files', 'mail', 'webdav'])
|
|
stored = _get_peer(admin_client, peer['name'])
|
|
assert stored.get('peer_access') is True
|