Files
pic/tests/integration/test_peer_lifecycle.py
roof fc3cfc9741 Fix post-deploy auth issues: best-effort service provisioning, integration test auth, test mock corrections
- api/app.py: email/calendar/files provisioning now best-effort (non-fatal); fixed email_manager.create_email_user call to include domain argument
- tests/integration: added module-level auth sessions to all integration test files; added admin auth to api fixture and _resolve_admin_pass() helper; added TEST_PEER_PASSWORD constant; added password to peer creation calls
- tests/test_peer_provisioning.py: renamed rollback test to reflect new best-effort semantics (email failure no longer causes rollback)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 15:42:03 -04:00

348 lines
13 KiB
Python

"""
Peer lifecycle integration tests.
Covers:
- Key generation via API
- Peer creation with various service_access configs
- Iptables rule verification (enforcement layer)
- Peer update → rules re-applied
- Peer deletion → rules cleaned up
- Duplicate name rejection
- DNS ACL file updated on peer changes
Run with: pytest tests/integration/test_peer_lifecycle.py -v
"""
import pytest
import requests
import sys, os
sys.path.insert(0, os.path.dirname(__file__))
from conftest import API_BASE, peer_rules, iptables_forward, get_live_service_vips, TEST_PEER_PASSWORD, _resolve_admin_pass
# Service → virtual IP mapping (mirrors firewall_manager.SERVICE_IPS)
ALL_SERVICES = {'calendar', 'files', 'mail', 'webdav'}
ALL_PEERS = ('integration-test-full', 'integration-test-restricted', 'integration-test-none')
# Module-level authenticated session — set once by the autouse fixture below
_S: requests.Session = None
@pytest.fixture(scope='module', autouse=True)
def _auth_session():
global _S
_S = requests.Session()
_S.headers['Content-Type'] = 'application/json'
r = _S.post(f"{API_BASE}/api/auth/login",
json={'username': 'admin', 'password': _resolve_admin_pass()})
assert r.status_code == 200, f"Login failed: {r.text}"
def api_post(path, **kw):
return _S.post(f"{API_BASE}{path}", **kw)
def api_get(path, **kw):
return _S.get(f"{API_BASE}{path}", **kw)
def api_put(path, **kw):
return _S.put(f"{API_BASE}{path}", **kw)
def api_delete(path, **kw):
return _S.delete(f"{API_BASE}{path}", **kw)
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
def generate_keys(name: str) -> dict:
r = api_post('/api/wireguard/keys/peer', json={'name': name})
assert r.status_code == 200, f"Key generation failed: {r.text}"
keys = r.json()
assert 'public_key' in keys and 'private_key' in keys
return keys
def get_peer(name: str) -> dict | None:
peers = api_get('/api/peers').json()
return next((p for p in peers if p['peer'] == name), None)
def assert_iptables_accept(peer_ip: str, service: str, vips: dict):
"""Assert the peer has an ACCEPT rule for the given service VIP."""
vip = vips[service]
rules = peer_rules(peer_ip)
matching = [r for r in rules if vip in r and 'ACCEPT' in r]
assert matching, (
f"Expected ACCEPT rule for {service} ({vip}) on peer {peer_ip}.\n"
f"Current rules:\n" + "\n".join(rules)
)
def assert_iptables_drop(peer_ip: str, service: str, vips: dict):
"""Assert the peer has a DROP rule for the given service VIP."""
vip = vips[service]
rules = peer_rules(peer_ip)
matching = [r for r in rules if vip in r and 'DROP' in r]
assert matching, (
f"Expected DROP rule for {service} ({vip}) on peer {peer_ip}.\n"
f"Current rules:\n" + "\n".join(rules)
)
def get_service_vips() -> dict:
"""Return the actual SERVICE_IPS used by the running firewall_manager."""
return get_live_service_vips()
# ---------------------------------------------------------------------------
# Key generation
# ---------------------------------------------------------------------------
class TestKeyGeneration:
def test_generate_keys_returns_key_pair(self):
keys = generate_keys('integration-test-keygen')
assert len(keys['public_key']) > 20
assert len(keys['private_key']) > 20
def test_generated_keys_are_different(self):
k1 = generate_keys('integration-test-keygen-a')
k2 = generate_keys('integration-test-keygen-b')
assert k1['public_key'] != k2['public_key']
# ---------------------------------------------------------------------------
# Peer with FULL service access
# ---------------------------------------------------------------------------
class TestPeerFullAccess:
PEER_NAME = 'integration-test-full'
def test_create_peer_full_access(self):
keys = generate_keys(self.PEER_NAME)
r = api_post('/api/peers', json={
'name': self.PEER_NAME,
'public_key': keys['public_key'],
'service_access': list(ALL_SERVICES),
'password': TEST_PEER_PASSWORD,
})
assert r.status_code == 201, f"Peer creation failed: {r.text}"
data = r.json()
assert 'ip' in data
assert self.PEER_NAME in data.get('message', '')
def test_peer_appears_in_list(self):
peer = get_peer(self.PEER_NAME)
assert peer is not None, f"Peer {self.PEER_NAME} not found in /api/peers"
assert set(peer['service_access']) == ALL_SERVICES
def test_iptables_accept_all_services(self):
peer = get_peer(self.PEER_NAME)
assert peer, "Peer not found"
vips = get_service_vips()
for svc in ALL_SERVICES:
assert_iptables_accept(peer['ip'], svc, vips)
def test_iptables_has_internet_accept(self):
peer = get_peer(self.PEER_NAME)
rules = peer_rules(peer['ip'])
# The catch-all internet ACCEPT rule has no -d destination in iptables-save format.
# Service rules always have '-d VIP/32'; the internet rule omits -d entirely.
catch_all = [r for r in rules if '-j ACCEPT' in r and '-d ' not in r]
assert catch_all, (
f"No catch-all ACCEPT rule (internet access) found for {self.PEER_NAME}.\n"
f"Rules:\n" + "\n".join(rules)
)
def test_duplicate_peer_name_rejected(self):
keys = generate_keys(self.PEER_NAME + '-dup')
r = api_post('/api/peers', json={
'name': self.PEER_NAME,
'public_key': keys['public_key'],
'password': TEST_PEER_PASSWORD,
})
assert r.status_code in (400, 409), "Duplicate peer should be rejected"
def test_delete_peer_full_access(self):
r = api_delete(f'/api/peers/{self.PEER_NAME}')
assert r.status_code == 200
assert get_peer(self.PEER_NAME) is None
def test_iptables_rules_removed_after_delete(self):
# Peer was deleted in the previous test — rules must be gone
# We don't have the IP cached here, so verify no test-full comment exists
fw = iptables_forward()
comment = f'pic-peer-'
# Build expected comment from peer name (we need the IP — check all lines)
# If the peer is gone, no rules with this peer's typical IP should mention the test name
# We verify by checking no 'integration-test-full' style comment exists
# (Comments use IPs, not names — so just verify the previous peer IP is gone)
# Since we can't get the IP after deletion, we verify the list is clean
remaining = api_get('/api/peers').json()
names = [p['peer'] for p in remaining]
assert self.PEER_NAME not in names
# ---------------------------------------------------------------------------
# Peer with RESTRICTED service access (calendar only)
# ---------------------------------------------------------------------------
class TestPeerRestrictedAccess:
PEER_NAME = 'integration-test-restricted'
def test_create_peer_restricted_access(self):
keys = generate_keys(self.PEER_NAME)
r = api_post('/api/peers', json={
'name': self.PEER_NAME,
'public_key': keys['public_key'],
'service_access': ['calendar'],
'internet_access': False,
'password': TEST_PEER_PASSWORD,
})
assert r.status_code == 201, f"Peer creation failed: {r.text}"
def test_peer_service_access_stored_correctly(self):
peer = get_peer(self.PEER_NAME)
assert peer is not None
assert peer['service_access'] == ['calendar']
assert peer.get('internet_access') is False
def test_iptables_calendar_accepted(self):
peer = get_peer(self.PEER_NAME)
vips = get_service_vips()
assert_iptables_accept(peer['ip'], 'calendar', vips)
def test_iptables_other_services_dropped(self):
peer = get_peer(self.PEER_NAME)
vips = get_service_vips()
for svc in ('files', 'mail', 'webdav'):
assert_iptables_drop(peer['ip'], svc, vips)
def test_iptables_no_internet_accept(self):
peer = get_peer(self.PEER_NAME)
rules = peer_rules(peer['ip'])
# internet_access=False → no catch-all ACCEPT (no -d rule that is ACCEPT)
catch_all_accept = [r for r in rules if '-j ACCEPT' in r and '-d ' not in r]
assert not catch_all_accept, (
f"internet_access=False peer should not have catch-all ACCEPT.\nRules:\n"
+ "\n".join(rules)
)
def test_update_peer_add_files_access(self):
r = api_put(f'/api/peers/{self.PEER_NAME}',
json={'service_access': ['calendar', 'files']})
assert r.status_code == 200
def test_iptables_updated_after_service_change(self):
peer = get_peer(self.PEER_NAME)
vips = get_service_vips()
assert_iptables_accept(peer['ip'], 'calendar', vips)
assert_iptables_accept(peer['ip'], 'files', vips)
assert_iptables_drop(peer['ip'], 'mail', vips)
assert_iptables_drop(peer['ip'], 'webdav', vips)
def test_delete_restricted_peer(self):
peer = get_peer(self.PEER_NAME)
assert peer is not None
peer_ip = peer['ip']
r = api_delete(f'/api/peers/{self.PEER_NAME}')
assert r.status_code == 200
assert get_peer(self.PEER_NAME) is None
remaining_rules = peer_rules(peer_ip)
assert not remaining_rules, (
f"Iptables rules remain after deletion of {self.PEER_NAME} ({peer_ip}):\n"
+ "\n".join(remaining_rules)
)
# ---------------------------------------------------------------------------
# Peer with NO service access and NO internet
# ---------------------------------------------------------------------------
class TestPeerNoAccess:
PEER_NAME = 'integration-test-none'
def test_create_peer_no_access(self):
keys = generate_keys(self.PEER_NAME)
r = api_post('/api/peers', json={
'name': self.PEER_NAME,
'public_key': keys['public_key'],
'service_access': [],
'internet_access': False,
'peer_access': False,
'password': TEST_PEER_PASSWORD,
})
assert r.status_code == 201, f"Peer creation failed: {r.text}"
def test_peer_stored_with_empty_service_access(self):
peer = get_peer(self.PEER_NAME)
assert peer is not None
assert peer['service_access'] == []
assert peer.get('internet_access') is False
assert peer.get('peer_access') is False
def test_iptables_all_services_dropped(self):
peer = get_peer(self.PEER_NAME)
vips = get_service_vips()
for svc in ALL_SERVICES:
assert_iptables_drop(peer['ip'], svc, vips)
def test_iptables_peer_to_peer_dropped(self):
peer = get_peer(self.PEER_NAME)
rules = peer_rules(peer['ip'])
# peer_access=False → 10.0.0.0/24 should be DROP
peer_net_drop = [r for r in rules if '10.0.0.0/24' in r and 'DROP' in r]
assert peer_net_drop, (
f"Expected DROP rule for peer-to-peer traffic on {self.PEER_NAME}\n"
+ "\n".join(rules)
)
def test_delete_no_access_peer(self):
peer = get_peer(self.PEER_NAME)
assert peer is not None
peer_ip = peer['ip']
r = api_delete(f'/api/peers/{self.PEER_NAME}')
assert r.status_code == 200
remaining_rules = peer_rules(peer_ip)
assert not remaining_rules, (
f"Iptables rules remain after deletion ({peer_ip}):\n"
+ "\n".join(remaining_rules)
)
# ---------------------------------------------------------------------------
# Concurrent peer registry consistency
# ---------------------------------------------------------------------------
class TestPeerRegistryConsistency:
def test_peer_ips_are_unique(self):
peers = api_get('/api/peers').json()
ips = [p['ip'] for p in peers]
assert len(ips) == len(set(ips)), f"Duplicate IPs in peer registry: {ips}"
def test_all_peer_ips_in_wireguard_subnet(self):
import ipaddress
cfg = api_get('/api/config').json()
wg_addr = cfg.get('service_configs', {}).get('wireguard', {}).get('address', '')
if not wg_addr:
pytest.skip("No WireGuard address configured")
network = ipaddress.ip_network(wg_addr, strict=False)
peers = api_get('/api/peers').json()
for peer in peers:
ip_str = peer['ip'].split('/')[0]
ip = ipaddress.ip_address(ip_str)
assert ip in network, (
f"Peer {peer['peer']} IP {ip_str} is outside WireGuard subnet {network}"
)
def test_each_live_peer_has_iptables_rules(self):
peers = api_get('/api/peers').json()
for peer in peers:
rules = peer_rules(peer['ip'])
assert rules, (
f"Peer {peer['peer']} ({peer['ip']}) has no iptables rules — "
"enforcement is missing"
)