fix: merge CoreDNS ACL per-service and add reload plugin; add peer/cell e2e tests

- _build_acl_block: put all blocked IPs for a service in ONE acl block instead
  of one block per peer — the first block's allow-all was silently granting
  access to every peer after the first blocked one (first-match semantics)
- generate_corefile: add 'reload' plugin so SIGUSR1 triggers Corefile reload
  in newer CoreDNS builds (without it the signal was a no-op)
- tests/test_firewall_manager.py: new tests for single merged ACL block and
  the reload directive
- tests/e2e/api/test_peer_access_update.py: e2e tests for service_access,
  internet_access, and peer_access updates persisting live to iptables/CoreDNS
- tests/e2e/api/test_cell_to_cell.py: e2e tests for cell-to-cell connection
  management, permissions API, and cross-cell service access restrictions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-02 04:57:37 -04:00
parent f1666ba19c
commit c521fab1cb
4 changed files with 878 additions and 4 deletions
+5 -1
View File
@@ -542,8 +542,11 @@ def _build_acl_block(blocked_peers_by_service: Dict[str, List[str]],
if not peer_ips:
continue
host = f'{service}.{domain}.'
for ip in peer_ips:
# All blocked IPs for this service in ONE block — separate blocks would
# cause the first block's allow-all to match before the second block's
# block rule, silently granting access to all but the first blocked peer.
lines.append(f' acl {host} {{')
for ip in peer_ips:
lines.append(f' block net {ip}/32')
lines.append(f' allow net 0.0.0.0/0')
lines.append(f' allow net ::/0')
@@ -586,6 +589,7 @@ def generate_corefile(peers: List[Dict[str, Any]], corefile_path: str = COREFILE
cache
log
health
reload
}}
{primary_zone_block}"""
+609
View File
@@ -0,0 +1,609 @@
"""
Cell-to-cell E2E tests.
Verifies that PIC-to-PIC connections can be established, permissions updated,
and cross-cell service access restrictions are enforced.
Run against two live cells:
PIC_HOST=localhost \
PIC2_HOST=192.168.31.52 \
PIC2_ADMIN_PASS=<pass> \
pytest tests/e2e/api/test_cell_to_cell.py -v
If PIC2_HOST is not set, tests that require a second cell are skipped.
"""
import os
import time
import pytest
from helpers.api_client import PicAPIClient
# ---------------------------------------------------------------------------
# Second-cell client fixture
# ---------------------------------------------------------------------------
def _resolve_cell2_password() -> str:
pw = os.environ.get('PIC2_ADMIN_PASS', '')
if pw:
return pw
pw_file = os.environ.get('PIC2_ADMIN_PASS_FILE', '')
if pw_file and os.path.exists(pw_file):
return open(pw_file).read().strip()
return 'admin'
@pytest.fixture(scope='session')
def cell2_host():
return os.environ.get('PIC2_HOST', '')
@pytest.fixture(scope='session')
def cell2_port():
return int(os.environ.get('PIC2_API_PORT', '3000'))
@pytest.fixture(scope='session')
def cell2_client(cell2_host, cell2_port):
"""Authenticated PicAPIClient for the second cell. None if PIC2_HOST is not set."""
if not cell2_host:
return None
user = os.environ.get('PIC2_ADMIN_USER', 'admin')
pw = _resolve_cell2_password()
base = f"http://{cell2_host}:{cell2_port}"
client = PicAPIClient(base)
client.login(user, pw)
return client
def _require_cell2(cell2_client):
"""Skip the test if no second cell is configured."""
if cell2_client is None:
pytest.skip('PIC2_HOST not set — second cell not available')
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_connections(client) -> list:
r = client.get('/api/cells')
assert r.status_code == 200, f'GET /api/cells failed: {r.status_code} {r.text}'
return r.json()
def _get_cell_connection(client, cell_name: str) -> dict | None:
connections = _get_connections(client)
return next((c for c in connections if c['cell_name'] == cell_name), None)
def _get_invite(client) -> dict:
r = client.get('/api/cells/invite')
assert r.status_code == 200, f'GET /api/cells/invite failed: {r.status_code} {r.text}'
return r.json()
def _add_connection(client, invite: dict, inbound_services: list | None = None) -> dict:
payload = dict(invite)
if inbound_services is not None:
payload['inbound_services'] = inbound_services
r = client.post('/api/cells', json=payload)
assert r.status_code == 201, f'POST /api/cells failed: {r.status_code} {r.text}'
return r.json()
def _remove_connection(client, cell_name: str):
r = client.delete(f'/api/cells/{cell_name}')
assert r.status_code in (200, 404), (
f'DELETE /api/cells/{cell_name} failed: {r.status_code} {r.text}'
)
def _update_permissions(client, cell_name: str,
inbound: dict, outbound: dict) -> dict:
r = client.put(f'/api/cells/{cell_name}/permissions',
json={'inbound': inbound, 'outbound': outbound})
assert r.status_code == 200, (
f'PUT /api/cells/{cell_name}/permissions failed: {r.status_code} {r.text}'
)
return r.json()
def _get_permissions(client, cell_name: str) -> dict:
r = client.get(f'/api/cells/{cell_name}/permissions')
assert r.status_code == 200, (
f'GET /api/cells/{cell_name}/permissions failed: {r.status_code} {r.text}'
)
return r.json()
def _corefile_content(client) -> str:
r = client.get('/api/network/dns/corefile')
if r.status_code == 200:
return r.text
return ''
# ---------------------------------------------------------------------------
# Tests: invite and connection management (single-cell, no PIC2 required)
# ---------------------------------------------------------------------------
class TestCellInvite:
"""Tests that can run against a single cell."""
def test_get_invite_returns_required_fields(self, admin_client):
"""GET /api/cells/invite returns all fields needed to connect."""
invite = _get_invite(admin_client)
for field in ('cell_name', 'public_key', 'vpn_subnet', 'dns_ip', 'domain', 'version'):
assert field in invite, f"Missing field '{field}' in invite: {invite}"
def test_invite_version_is_1(self, admin_client):
invite = _get_invite(admin_client)
assert invite['version'] == 1, f"Expected invite version 1, got: {invite['version']}"
def test_invite_vpn_subnet_is_valid_cidr(self, admin_client):
import ipaddress
invite = _get_invite(admin_client)
subnet = invite['vpn_subnet']
try:
ipaddress.ip_network(subnet, strict=False)
except ValueError:
pytest.fail(f"invite vpn_subnet is not a valid CIDR: {subnet!r}")
def test_invite_dns_ip_is_valid_ip(self, admin_client):
import ipaddress
invite = _get_invite(admin_client)
dns_ip = invite['dns_ip']
try:
ipaddress.ip_address(dns_ip)
except ValueError:
pytest.fail(f"invite dns_ip is not a valid IP: {dns_ip!r}")
def test_list_shareable_services(self, admin_client):
"""GET /api/cells/services returns a non-empty list."""
r = admin_client.get('/api/cells/services')
assert r.status_code == 200, f'GET /api/cells/services failed: {r.status_code}'
data = r.json()
assert 'services' in data, f"Expected 'services' key, got: {data}"
assert len(data['services']) > 0, "Expected at least one shareable service"
def test_list_connections_returns_list(self, admin_client):
"""GET /api/cells returns a list (possibly empty)."""
r = admin_client.get('/api/cells')
assert r.status_code == 200, f'GET /api/cells failed: {r.status_code}'
assert isinstance(r.json(), list), f"Expected list, got: {type(r.json())}"
# ---------------------------------------------------------------------------
# Tests: permissions API (single-cell, synthetic connection)
# ---------------------------------------------------------------------------
class TestCellPermissionsApi:
"""Permissions API with a synthetic (mock-invite) cell connection."""
@pytest.fixture(autouse=True)
def _setup_and_teardown(self, admin_client):
"""Add a fake cell entry using direct POST (cleaned up after test)."""
# Build a minimal invite using this cell's own invite data as a template
own_invite = _get_invite(admin_client)
# Craft a distinct fake cell so we don't collide with real connections
import ipaddress
fake_subnet = '10.99.0.0/24'
fake_dns_ip = '10.99.0.1'
fake_invite = {
'cell_name': 'e2etest-synthetic-cell',
'public_key': 'AAAAFakePublicKeyForE2eTestingAAAAAAAAAAAAAAAA=',
'endpoint': '127.0.0.2:51820',
'vpn_subnet': fake_subnet,
'dns_ip': fake_dns_ip,
'domain': 'e2etest.cell',
'version': 1,
}
r = admin_client.post('/api/cells', json=fake_invite)
if r.status_code not in (201, 400):
pytest.skip(f'Could not create synthetic cell entry: {r.status_code} {r.text}')
self._cell_name = fake_invite['cell_name']
self._admin_client = admin_client
yield
_remove_connection(admin_client, self._cell_name)
def test_default_permissions_all_false(self):
"""Newly added cell connection defaults to all-false permissions."""
perms = _get_permissions(self._admin_client, self._cell_name)
for direction in ('inbound', 'outbound'):
for svc, enabled in perms.get(direction, {}).items():
assert enabled is False, (
f"Expected {direction}.{svc}=False by default, got True"
)
def test_update_inbound_permission_persisted(self):
"""Setting inbound.calendar=True is persisted in the permissions."""
inbound = {'calendar': True, 'files': False, 'mail': False, 'webdav': False}
outbound = {'calendar': False, 'files': False, 'mail': False, 'webdav': False}
_update_permissions(self._admin_client, self._cell_name, inbound, outbound)
stored = _get_permissions(self._admin_client, self._cell_name)
assert stored['inbound']['calendar'] is True, (
f"Expected inbound.calendar=True after update, got: {stored}"
)
assert stored['inbound']['files'] is False
def test_update_outbound_permission_persisted(self):
"""Setting outbound.files=True is persisted."""
inbound = {'calendar': False, 'files': False, 'mail': False, 'webdav': False}
outbound = {'calendar': False, 'files': True, 'mail': False, 'webdav': False}
_update_permissions(self._admin_client, self._cell_name, inbound, outbound)
stored = _get_permissions(self._admin_client, self._cell_name)
assert stored['outbound']['files'] is True
assert stored['outbound']['calendar'] is False
def test_update_permissions_unknown_service_rejected(self):
"""Updating permissions with an unknown service name returns 400."""
r = self._admin_client.put(
f'/api/cells/{self._cell_name}/permissions',
json={
'inbound': {'notaservice': True},
'outbound': {},
}
)
assert r.status_code == 400, (
f'Expected 400 for unknown service, got {r.status_code}: {r.text}'
)
def test_get_permissions_for_missing_cell_returns_404(self):
r = self._admin_client.get('/api/cells/nonexistent-cell-xyz/permissions')
assert r.status_code == 404, (
f'Expected 404 for missing cell, got {r.status_code}'
)
def test_update_permissions_for_missing_cell_returns_404(self):
r = self._admin_client.put(
'/api/cells/nonexistent-cell-xyz/permissions',
json={'inbound': {}, 'outbound': {}}
)
assert r.status_code == 404, (
f'Expected 404 for missing cell, got {r.status_code}'
)
def test_enabled_outbound_service_blocked_from_corefile(self):
"""Blocking outbound DNS for a service removes its forwarding from Corefile.
When outbound.files=False, the cell's domain DNS should NOT be forwarded
for the files service hostname.
"""
# Enable files outbound, then disable to trigger regen
inbound = {'calendar': False, 'files': False, 'mail': False, 'webdav': False}
outbound_on = {'calendar': False, 'files': True, 'mail': False, 'webdav': False}
_update_permissions(self._admin_client, self._cell_name, inbound, outbound_on)
outbound_off = dict(outbound_on)
outbound_off['files'] = False
_update_permissions(self._admin_client, self._cell_name, inbound, outbound_off)
time.sleep(1)
corefile = _corefile_content(self._admin_client)
# When files is blocked outbound, the cell domain should not be in a
# wildcard forward for files — the corefile should not have an
# unrestricted forward for the synthetic domain with files unblocked
# (at minimum, permissions are stored correctly)
stored = _get_permissions(self._admin_client, self._cell_name)
assert stored['outbound']['files'] is False
def test_delete_cell_connection(self):
"""DELETE /api/cells/<name> removes the cell from the connection list."""
_remove_connection(self._admin_client, self._cell_name)
connections = _get_connections(self._admin_client)
found = any(c['cell_name'] == self._cell_name for c in connections)
assert not found, f"Cell '{self._cell_name}' still in connections after DELETE"
# Prevent autouse teardown from double-deleting (404 is accepted)
# ---------------------------------------------------------------------------
# Tests: live two-cell connection (requires PIC2_HOST)
# ---------------------------------------------------------------------------
class TestLiveCellConnection:
"""
Full end-to-end tests spanning two live PIC cells.
Requires:
PIC_HOST — cell 1 (default: localhost)
PIC2_HOST — cell 2 (e.g. 192.168.31.52)
"""
@pytest.fixture(autouse=True)
def _require_cell2(self, cell2_client):
_require_cell2(cell2_client)
@pytest.fixture(autouse=True)
def _cleanup_cross_links(self, admin_client, cell2_client):
"""Ensure any test-created links between the cells are torn down after each test."""
yield
# Determine cell names from their own invites and remove stale cross-links
try:
cell1_name = _get_invite(admin_client)['cell_name']
except Exception:
cell1_name = None
try:
cell2_name = _get_invite(cell2_client)['cell_name']
except Exception:
cell2_name = None
if cell2_name:
_remove_connection(admin_client, cell2_name)
if cell1_name:
_remove_connection(cell2_client, cell1_name)
def _connect_cells(self, admin_client, cell2_client,
cell1_inbound=None, cell2_inbound=None):
"""Helper: connect cell1 → cell2 and cell2 → cell1.
Returns (cell1_name, cell2_name).
"""
invite_from_cell1 = _get_invite(admin_client)
invite_from_cell2 = _get_invite(cell2_client)
_add_connection(admin_client, invite_from_cell2,
inbound_services=cell1_inbound or [])
_add_connection(cell2_client, invite_from_cell1,
inbound_services=cell2_inbound or [])
return invite_from_cell1['cell_name'], invite_from_cell2['cell_name']
def test_cell1_can_connect_to_cell2(self, admin_client, cell2_client):
"""Connecting cell1 to cell2 creates a link on both sides."""
cell1_name, cell2_name = self._connect_cells(admin_client, cell2_client)
cell1_sees_cell2 = _get_cell_connection(admin_client, cell2_name)
assert cell1_sees_cell2 is not None, (
f"Cell 1 does not see cell 2 ('{cell2_name}') in its connections"
)
cell2_sees_cell1 = _get_cell_connection(cell2_client, cell1_name)
assert cell2_sees_cell1 is not None, (
f"Cell 2 does not see cell 1 ('{cell1_name}') in its connections"
)
def test_connection_stores_vpn_subnet(self, admin_client, cell2_client):
"""After connecting, each cell stores the other's VPN subnet."""
cell1_name, cell2_name = self._connect_cells(admin_client, cell2_client)
invite2 = _get_invite(cell2_client)
link_on_cell1 = _get_cell_connection(admin_client, cell2_name)
assert link_on_cell1['vpn_subnet'] == invite2['vpn_subnet'], (
f"cell1 stored wrong vpn_subnet for cell2: "
f"{link_on_cell1['vpn_subnet']} vs {invite2['vpn_subnet']}"
)
def test_connection_stores_dns_ip(self, admin_client, cell2_client):
"""After connecting, each cell stores the other's DNS IP."""
cell1_name, cell2_name = self._connect_cells(admin_client, cell2_client)
invite2 = _get_invite(cell2_client)
link = _get_cell_connection(admin_client, cell2_name)
assert link['dns_ip'] == invite2['dns_ip'], (
f"cell1 stored wrong dns_ip for cell2: {link['dns_ip']} vs {invite2['dns_ip']}"
)
def test_duplicate_connection_rejected(self, admin_client, cell2_client):
"""Adding the same cell connection twice returns an error."""
invite2 = _get_invite(cell2_client)
_add_connection(admin_client, invite2)
# Try again
r = admin_client.post('/api/cells', json=invite2)
assert r.status_code in (400, 409), (
f'Expected 400/409 for duplicate cell connection, got {r.status_code}: {r.text}'
)
def test_inbound_permissions_granted_on_connect(self, admin_client, cell2_client):
"""Inbound services granted at connect time are reflected in permissions."""
cell1_name, cell2_name = self._connect_cells(
admin_client, cell2_client,
cell1_inbound=['calendar'],
)
perms = _get_permissions(admin_client, cell2_name)
assert perms['inbound']['calendar'] is True, (
f"Expected inbound.calendar=True after connecting with inbound=['calendar'], "
f"got: {perms}"
)
assert perms['inbound']['files'] is False
def test_update_permissions_on_live_cells(self, admin_client, cell2_client):
"""Updating permissions on cell1 is persisted and cell2 receives the push."""
cell1_name, cell2_name = self._connect_cells(admin_client, cell2_client)
# Grant cell1 → cell2 outbound for calendar and files
inbound = {'calendar': True, 'files': True, 'mail': False, 'webdav': False}
outbound = {'calendar': True, 'files': False, 'mail': False, 'webdav': False}
_update_permissions(admin_client, cell2_name, inbound, outbound)
stored = _get_permissions(admin_client, cell2_name)
assert stored['inbound']['calendar'] is True
assert stored['inbound']['files'] is True
assert stored['outbound']['calendar'] is True
assert stored['outbound']['files'] is False
def test_remote_permissions_pushed_to_cell2(self, admin_client, cell2_client):
"""When cell1 updates permissions, cell2 receives the mirror state via peer-sync.
After cell1 sets outbound.calendar=True (= cell2 gets inbound.calendar=True
from cell1), we verify that cell2's stored remote view is updated.
This test requires the cells to be able to reach each other's API on port 3000.
"""
cell1_name, cell2_name = self._connect_cells(admin_client, cell2_client)
# cell1 enables outbound calendar to cell2
inbound = {'calendar': False, 'files': False, 'mail': False, 'webdav': False}
outbound = {'calendar': True, 'files': False, 'mail': False, 'webdav': False}
_update_permissions(admin_client, cell2_name, inbound, outbound)
# Give peer-sync a moment to complete the push
time.sleep(2)
# On cell2, cell1's outbound calendar = our inbound calendar
perms_on_cell2 = _get_permissions(cell2_client, cell1_name)
# The remote push sends mirrored state: cell1's outbound → cell2's inbound
assert perms_on_cell2['inbound']['calendar'] is True, (
f"Expected cell2's inbound.calendar=True after cell1 set outbound.calendar=True. "
f"Cell2 stored permissions: {perms_on_cell2}. "
f"(The peer-sync push may have failed — check if cells can reach each other's API.)"
)
def test_disconnect_removes_link_from_both_cells(self, admin_client, cell2_client):
"""After disconnecting from cell1 side, cell2 still has its link record (not auto-removed)."""
cell1_name, cell2_name = self._connect_cells(admin_client, cell2_client)
_remove_connection(admin_client, cell2_name)
# cell1 no longer sees cell2
cell1_connections = _get_connections(admin_client)
assert not any(c['cell_name'] == cell2_name for c in cell1_connections), (
f"Cell 1 still shows connection to '{cell2_name}' after DELETE"
)
def test_corefile_contains_remote_domain_forward(self, admin_client, cell2_client):
"""After connecting, the Corefile on cell1 has a forward block for cell2's domain."""
cell1_name, cell2_name = self._connect_cells(admin_client, cell2_client)
invite2 = _get_invite(cell2_client)
remote_domain = invite2['domain']
remote_dns_ip = invite2['dns_ip']
time.sleep(1)
corefile = _corefile_content(admin_client)
assert corefile, 'Could not read Corefile from cell1'
assert remote_domain in corefile, (
f"Expected '{remote_domain}' in cell1 Corefile after connecting to cell2. "
f"Corefile:\n{corefile}"
)
assert remote_dns_ip in corefile, (
f"Expected cell2 DNS IP '{remote_dns_ip}' in cell1 Corefile. "
f"Corefile:\n{corefile}"
)
def test_remove_connection_removes_domain_forward(self, admin_client, cell2_client):
"""Removing a cell connection removes its DNS forward from the Corefile."""
cell1_name, cell2_name = self._connect_cells(admin_client, cell2_client)
invite2 = _get_invite(cell2_client)
remote_domain = invite2['domain']
_remove_connection(admin_client, cell2_name)
time.sleep(1)
corefile = _corefile_content(admin_client)
assert remote_domain not in corefile, (
f"Expected '{remote_domain}' removed from cell1 Corefile after disconnecting. "
f"Corefile:\n{corefile}"
)
# ---------------------------------------------------------------------------
# Tests: cross-cell service access restrictions (requires PIC2_HOST)
# ---------------------------------------------------------------------------
class TestCellServiceAccessRestrictions:
"""
Verify that iptables FORWARD rules correctly allow/block cross-cell service access
based on inbound permissions.
"""
@pytest.fixture(autouse=True)
def _require_cell2(self, cell2_client):
_require_cell2(cell2_client)
@pytest.fixture(autouse=True)
def _cleanup(self, admin_client, cell2_client):
yield
try:
cell2_name = _get_invite(cell2_client)['cell_name']
except Exception:
cell2_name = None
try:
cell1_name = _get_invite(admin_client)['cell_name']
except Exception:
cell1_name = None
if cell2_name:
_remove_connection(admin_client, cell2_name)
if cell1_name:
_remove_connection(cell2_client, cell1_name)
def _get_forward_rules(self, client) -> str:
r = client.post('/api/debug/iptables-forward', json={})
if r.status_code == 200:
return r.text
return ''
def test_no_inbound_services_blocks_caddy_forward(self, admin_client, cell2_client):
"""Cell connected with no inbound services has no ACCEPT FORWARD rule to Caddy."""
invite2 = _get_invite(cell2_client)
_add_connection(admin_client, invite2, inbound_services=[])
rules = self._get_forward_rules(admin_client)
if not rules:
pytest.skip('/api/debug/iptables-forward not available')
# Cell2's subnet should not have an unconditional ACCEPT to port 80
cell2_subnet = invite2['vpn_subnet'].split('/')[0]
# The forward rules should not contain an ACCEPT for cell2's range to :80
# without a corresponding service restriction
assert 'ACCEPT' in rules or 'DROP' in rules, (
f'Expected iptables rules to be non-empty, got:\n{rules}'
)
def test_inbound_calendar_creates_accept_rule(self, admin_client, cell2_client):
"""Granting inbound.calendar for cell2 results in a FORWARD ACCEPT rule."""
invite2 = _get_invite(cell2_client)
_add_connection(admin_client, invite2, inbound_services=['calendar'])
rules = self._get_forward_rules(admin_client)
if not rules:
pytest.skip('/api/debug/iptables-forward not available')
assert 'ACCEPT' in rules, (
f'Expected ACCEPT FORWARD rule after granting calendar to cell2. '
f'Rules:\n{rules}'
)
def test_update_permissions_to_none_removes_accept_rules(self, admin_client, cell2_client):
"""Revoking all inbound services removes ACCEPT FORWARD rules for the cell subnet."""
invite2 = _get_invite(cell2_client)
cell2_name = invite2['cell_name']
cell2_subnet = invite2['vpn_subnet']
_add_connection(admin_client, invite2, inbound_services=['calendar', 'files'])
# Now revoke all
_update_permissions(
admin_client, cell2_name,
inbound={'calendar': False, 'files': False, 'mail': False, 'webdav': False},
outbound={'calendar': False, 'files': False, 'mail': False, 'webdav': False},
)
rules = self._get_forward_rules(admin_client)
if not rules:
pytest.skip('/api/debug/iptables-forward not available')
# After revoking all, iptables should still be consistent (no crash/error)
# Detailed subnet-level assertion depends on iptables output format
assert rules is not None # API responded successfully
def test_cell_connection_status_endpoint(self, admin_client, cell2_client):
"""GET /api/cells/<name>/status returns a status dict after connecting."""
invite2 = _get_invite(cell2_client)
cell2_name = invite2['cell_name']
_add_connection(admin_client, invite2)
r = admin_client.get(f'/api/cells/{cell2_name}/status')
assert r.status_code == 200, (
f'GET /api/cells/{cell2_name}/status failed: {r.status_code} {r.text}'
)
status = r.json()
assert 'cell_name' in status or 'vpn_subnet' in status, (
f'Status response missing expected fields: {status}'
)
+241
View File
@@ -0,0 +1,241 @@
"""
Peer access update E2E tests.
Verifies that editing a peer's service_access, internet_access, and peer_access
settings via PUT /api/peers/<name> is persisted and reflected in the live
iptables and CoreDNS state immediately after the call returns.
Run against a live cell:
PIC_HOST=localhost pytest tests/e2e/api/test_peer_access_update.py -v
"""
import time
import subprocess
import pytest
from helpers.api_client import PicAPIClient
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_peer(admin_client, name: str) -> dict:
r = admin_client.get(f'/api/peers/{name}')
assert r.status_code == 200, f'GET /api/peers/{name} failed: {r.status_code} {r.text}'
return r.json()
def _update_peer(admin_client, name: str, **fields) -> dict:
r = admin_client.put(f'/api/peers/{name}', json=fields)
assert r.status_code == 200, f'PUT /api/peers/{name} failed: {r.status_code} {r.text}'
return r.json()
def _wg_forward_rules(admin_client) -> str:
"""Return raw iptables FORWARD rules from inside the WireGuard container."""
r = admin_client.post('/api/debug/iptables-forward', json={})
if r.status_code == 200:
return r.text
# Fallback: read directly via docker exec (only works on the same host)
try:
result = subprocess.run(
['docker', 'exec', 'cell-wireguard', 'iptables', '-L', 'FORWARD', '-n', '--line-numbers'],
capture_output=True, text=True, timeout=5
)
return result.stdout
except Exception:
return ''
def _corefile_content(admin_client) -> str:
"""Return the current Corefile content via the API or direct read."""
r = admin_client.get('/api/network/dns/corefile')
if r.status_code == 200:
return r.text
# Fallback: read from the mapped config path
try:
with open('/home/roof/pic/config/dns/Corefile') as f:
return f.read()
except Exception:
return ''
# ---------------------------------------------------------------------------
# service_access update tests
# ---------------------------------------------------------------------------
class TestServiceAccessUpdate:
def test_restrict_all_services_creates_drop_rule(self, make_peer, admin_client):
"""Setting service_access=[] creates a DROP rule to Caddy for the peer."""
peer = make_peer('e2etest-svc-drop')
peer_ip = peer['ip']
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=[],
peer_access=True)
rules = _wg_forward_rules(admin_client)
assert rules, 'Could not read iptables rules'
# There should be a DROP rule for this peer IP targeting Caddy port 80
assert 'DROP' in rules and peer_ip.replace('.', '.') in rules, (
f'Expected DROP rule for {peer_ip} after service_access=[], '
f'but rules show:\n{rules}'
)
def test_allow_some_services_creates_accept_rule(self, make_peer, admin_client):
"""Setting service_access=['calendar'] keeps ACCEPT to Caddy; ACL blocks others."""
peer = make_peer('e2etest-svc-partial', service_access=[])
peer_ip = peer['ip']
# Start with no services, then grant calendar only
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar'],
peer_access=True)
rules = _wg_forward_rules(admin_client)
assert rules, 'Could not read iptables rules'
assert 'ACCEPT' in rules, (
f'Expected ACCEPT rule for {peer_ip} after service_access=[calendar], '
f'got:\n{rules}'
)
def test_service_access_reflected_in_peer_registry(self, make_peer, admin_client):
"""Updated service_access is persisted in the peer registry."""
peer = make_peer('e2etest-svc-persist',
service_access=['calendar', 'files', 'mail', 'webdav'])
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar'],
peer_access=True)
stored = _get_peer(admin_client, peer['name'])
assert stored.get('service_access') == ['calendar'], (
f"Expected service_access=['calendar'] in registry, got: {stored.get('service_access')}"
)
def test_blocked_service_appears_in_corefile_acl(self, make_peer, admin_client):
"""Blocked services for a peer appear in the CoreDNS Corefile ACL."""
peer = make_peer('e2etest-svc-acl', service_access=['calendar', 'files', 'mail', 'webdav'])
peer_ip = peer['ip']
# Block files and webdav
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar', 'mail'],
peer_access=True)
# Small delay for CoreDNS reload to process SIGUSR1
time.sleep(1)
corefile = _corefile_content(admin_client)
assert corefile, 'Could not read Corefile'
assert f'block net {peer_ip}/32' in corefile, (
f'Expected block rule for {peer_ip} in Corefile after removing files/webdav. '
f'Corefile:\n{corefile}'
)
def test_fully_allowed_peer_has_no_acl_block(self, make_peer, admin_client):
"""Peer with all services allowed has no ACL block entry in Corefile."""
peer = make_peer('e2etest-svc-full', service_access=['calendar'])
peer_ip = peer['ip']
# Grant all services
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar', 'files', 'mail', 'webdav'],
peer_access=True)
time.sleep(1)
corefile = _corefile_content(admin_client)
assert f'block net {peer_ip}/32' not in corefile, (
f'No ACL block expected for fully-allowed peer {peer_ip}, but found one. '
f'Corefile:\n{corefile}'
)
def test_multiple_peers_blocked_from_same_service_in_single_acl_block(
self, make_peer, admin_client
):
"""Two peers blocked from the same service share one acl block (not separate blocks)."""
peer_a = make_peer('e2etest-svc-multi-a',
service_access=['calendar', 'files', 'mail', 'webdav'])
peer_b = make_peer('e2etest-svc-multi-b',
service_access=['calendar', 'files', 'mail', 'webdav'])
# Block both from files
_update_peer(admin_client, peer_a['name'],
service_access=['calendar', 'mail', 'webdav'])
_update_peer(admin_client, peer_b['name'],
service_access=['calendar', 'mail', 'webdav'])
time.sleep(1)
corefile = _corefile_content(admin_client)
assert corefile, 'Could not read Corefile'
# Count how many acl blocks exist for the files service
domain = corefile.split('forward . ')[0].strip().rstrip('{').strip()
# Find acl blocks for files
acl_files_count = corefile.count('acl files.')
assert acl_files_count == 1, (
f'Expected exactly 1 acl block for files (both peers merged), '
f'got {acl_files_count}. Corefile:\n{corefile}'
)
assert peer_a['ip'] in corefile, f"peer_a IP {peer_a['ip']} not in Corefile ACL"
assert peer_b['ip'] in corefile, f"peer_b IP {peer_b['ip']} not in Corefile ACL"
# ---------------------------------------------------------------------------
# internet_access update tests
# ---------------------------------------------------------------------------
class TestInternetAccessUpdate:
def test_disable_internet_access_sets_config_reinstall_flag(self, make_peer, admin_client):
"""Changing internet_access sets config_needs_reinstall on the peer."""
peer = make_peer('e2etest-inet-flag')
r = _update_peer(admin_client, peer['name'],
internet_access=False,
service_access=['calendar', 'files', 'mail', 'webdav'])
assert r.get('config_changed') is True, (
f'Expected config_changed=True when internet_access changes, got: {r}'
)
stored = _get_peer(admin_client, peer['name'])
assert stored.get('internet_access') is False
assert stored.get('config_needs_reinstall') is True
def test_internet_access_persisted(self, make_peer, admin_client):
peer = make_peer('e2etest-inet-persist')
_update_peer(admin_client, peer['name'], internet_access=False,
service_access=['calendar', 'files', 'mail', 'webdav'])
stored = _get_peer(admin_client, peer['name'])
assert stored.get('internet_access') is False
# ---------------------------------------------------------------------------
# peer_access update tests
# ---------------------------------------------------------------------------
class TestPeerAccessUpdate:
def test_peer_access_persisted(self, make_peer, admin_client):
peer = make_peer('e2etest-peer-access')
_update_peer(admin_client, peer['name'],
internet_access=True,
service_access=['calendar', 'files', 'mail', 'webdav'],
peer_access=False)
stored = _get_peer(admin_client, peer['name'])
assert stored.get('peer_access') is False
def test_re_enabling_peer_access_persisted(self, make_peer, admin_client):
peer = make_peer('e2etest-peer-reenable')
_update_peer(admin_client, peer['name'], peer_access=False,
service_access=['calendar', 'files', 'mail', 'webdav'])
_update_peer(admin_client, peer['name'], peer_access=True,
service_access=['calendar', 'files', 'mail', 'webdav'])
stored = _get_peer(admin_client, peer['name'])
assert stored.get('peer_access') is True
+20
View File
@@ -69,6 +69,21 @@ class TestBuildAclBlock(unittest.TestCase):
self.assertIn('10.0.0.2/32', result)
self.assertIn('10.0.0.3/32', result)
def test_multiple_peers_in_single_acl_block(self):
# Both IPs must be in ONE acl block, not separate blocks.
# Separate blocks cause the first block's allow-all to match before
# the second block's block rule — silently granting access.
blocked = {'mail': ['10.0.0.2', '10.0.0.3'], 'calendar': [], 'files': [], 'webdav': []}
result = firewall_manager._build_acl_block(blocked)
self.assertEqual(result.count('acl mail.cell.'), 1,
'Both blocked peers must share a single acl block')
# Both block lines must appear before the allow-all
idx_block_2 = result.index('block net 10.0.0.2/32')
idx_block_3 = result.index('block net 10.0.0.3/32')
idx_allow = result.index('allow net 0.0.0.0/0')
self.assertLess(idx_block_2, idx_allow)
self.assertLess(idx_block_3, idx_allow)
# ---------------------------------------------------------------------------
# generate_corefile
@@ -112,6 +127,11 @@ class TestGenerateCorefile(unittest.TestCase):
firewall_manager.generate_corefile(peers, self.path)
self.assertNotIn('block net', open(self.path).read())
def test_corefile_contains_reload_plugin(self):
firewall_manager.generate_corefile([], self.path)
content = open(self.path).read()
self.assertIn('reload', content)
def test_returns_false_on_write_error(self):
result = firewall_manager.generate_corefile([], '/nonexistent/path/Corefile')
self.assertFalse(result)