fc3cfc9741
- api/app.py: email/calendar/files provisioning now best-effort (non-fatal); fixed email_manager.create_email_user call to include domain argument - tests/integration: added module-level auth sessions to all integration test files; added admin auth to api fixture and _resolve_admin_pass() helper; added TEST_PEER_PASSWORD constant; added password to peer creation calls - tests/test_peer_provisioning.py: renamed rollback test to reflect new best-effort semantics (email failure no longer causes rollback) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
348 lines
13 KiB
Python
348 lines
13 KiB
Python
"""
|
|
Peer lifecycle integration tests.
|
|
|
|
Covers:
|
|
- Key generation via API
|
|
- Peer creation with various service_access configs
|
|
- Iptables rule verification (enforcement layer)
|
|
- Peer update → rules re-applied
|
|
- Peer deletion → rules cleaned up
|
|
- Duplicate name rejection
|
|
- DNS ACL file updated on peer changes
|
|
|
|
Run with: pytest tests/integration/test_peer_lifecycle.py -v
|
|
"""
|
|
import pytest
|
|
import requests
|
|
import sys, os
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from conftest import API_BASE, peer_rules, iptables_forward, get_live_service_vips, TEST_PEER_PASSWORD, _resolve_admin_pass
|
|
|
|
# Service → virtual IP mapping (mirrors firewall_manager.SERVICE_IPS)
|
|
ALL_SERVICES = {'calendar', 'files', 'mail', 'webdav'}
|
|
ALL_PEERS = ('integration-test-full', 'integration-test-restricted', 'integration-test-none')
|
|
|
|
# Module-level authenticated session — set once by the autouse fixture below
|
|
_S: requests.Session = None
|
|
|
|
|
|
@pytest.fixture(scope='module', autouse=True)
|
|
def _auth_session():
|
|
global _S
|
|
_S = requests.Session()
|
|
_S.headers['Content-Type'] = 'application/json'
|
|
r = _S.post(f"{API_BASE}/api/auth/login",
|
|
json={'username': 'admin', 'password': _resolve_admin_pass()})
|
|
assert r.status_code == 200, f"Login failed: {r.text}"
|
|
|
|
|
|
def api_post(path, **kw):
|
|
return _S.post(f"{API_BASE}{path}", **kw)
|
|
|
|
def api_get(path, **kw):
|
|
return _S.get(f"{API_BASE}{path}", **kw)
|
|
|
|
def api_put(path, **kw):
|
|
return _S.put(f"{API_BASE}{path}", **kw)
|
|
|
|
def api_delete(path, **kw):
|
|
return _S.delete(f"{API_BASE}{path}", **kw)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def generate_keys(name: str) -> dict:
|
|
r = api_post('/api/wireguard/keys/peer', json={'name': name})
|
|
assert r.status_code == 200, f"Key generation failed: {r.text}"
|
|
keys = r.json()
|
|
assert 'public_key' in keys and 'private_key' in keys
|
|
return keys
|
|
|
|
|
|
def get_peer(name: str) -> dict | None:
|
|
peers = api_get('/api/peers').json()
|
|
return next((p for p in peers if p['peer'] == name), None)
|
|
|
|
|
|
def assert_iptables_accept(peer_ip: str, service: str, vips: dict):
|
|
"""Assert the peer has an ACCEPT rule for the given service VIP."""
|
|
vip = vips[service]
|
|
rules = peer_rules(peer_ip)
|
|
matching = [r for r in rules if vip in r and 'ACCEPT' in r]
|
|
assert matching, (
|
|
f"Expected ACCEPT rule for {service} ({vip}) on peer {peer_ip}.\n"
|
|
f"Current rules:\n" + "\n".join(rules)
|
|
)
|
|
|
|
|
|
def assert_iptables_drop(peer_ip: str, service: str, vips: dict):
|
|
"""Assert the peer has a DROP rule for the given service VIP."""
|
|
vip = vips[service]
|
|
rules = peer_rules(peer_ip)
|
|
matching = [r for r in rules if vip in r and 'DROP' in r]
|
|
assert matching, (
|
|
f"Expected DROP rule for {service} ({vip}) on peer {peer_ip}.\n"
|
|
f"Current rules:\n" + "\n".join(rules)
|
|
)
|
|
|
|
|
|
def get_service_vips() -> dict:
|
|
"""Return the actual SERVICE_IPS used by the running firewall_manager."""
|
|
return get_live_service_vips()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Key generation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestKeyGeneration:
|
|
def test_generate_keys_returns_key_pair(self):
|
|
keys = generate_keys('integration-test-keygen')
|
|
assert len(keys['public_key']) > 20
|
|
assert len(keys['private_key']) > 20
|
|
|
|
def test_generated_keys_are_different(self):
|
|
k1 = generate_keys('integration-test-keygen-a')
|
|
k2 = generate_keys('integration-test-keygen-b')
|
|
assert k1['public_key'] != k2['public_key']
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Peer with FULL service access
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPeerFullAccess:
|
|
PEER_NAME = 'integration-test-full'
|
|
|
|
def test_create_peer_full_access(self):
|
|
keys = generate_keys(self.PEER_NAME)
|
|
r = api_post('/api/peers', json={
|
|
'name': self.PEER_NAME,
|
|
'public_key': keys['public_key'],
|
|
'service_access': list(ALL_SERVICES),
|
|
'password': TEST_PEER_PASSWORD,
|
|
})
|
|
assert r.status_code == 201, f"Peer creation failed: {r.text}"
|
|
data = r.json()
|
|
assert 'ip' in data
|
|
assert self.PEER_NAME in data.get('message', '')
|
|
|
|
def test_peer_appears_in_list(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
assert peer is not None, f"Peer {self.PEER_NAME} not found in /api/peers"
|
|
assert set(peer['service_access']) == ALL_SERVICES
|
|
|
|
def test_iptables_accept_all_services(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
assert peer, "Peer not found"
|
|
vips = get_service_vips()
|
|
for svc in ALL_SERVICES:
|
|
assert_iptables_accept(peer['ip'], svc, vips)
|
|
|
|
def test_iptables_has_internet_accept(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
rules = peer_rules(peer['ip'])
|
|
# The catch-all internet ACCEPT rule has no -d destination in iptables-save format.
|
|
# Service rules always have '-d VIP/32'; the internet rule omits -d entirely.
|
|
catch_all = [r for r in rules if '-j ACCEPT' in r and '-d ' not in r]
|
|
assert catch_all, (
|
|
f"No catch-all ACCEPT rule (internet access) found for {self.PEER_NAME}.\n"
|
|
f"Rules:\n" + "\n".join(rules)
|
|
)
|
|
|
|
def test_duplicate_peer_name_rejected(self):
|
|
keys = generate_keys(self.PEER_NAME + '-dup')
|
|
r = api_post('/api/peers', json={
|
|
'name': self.PEER_NAME,
|
|
'public_key': keys['public_key'],
|
|
'password': TEST_PEER_PASSWORD,
|
|
})
|
|
assert r.status_code in (400, 409), "Duplicate peer should be rejected"
|
|
|
|
def test_delete_peer_full_access(self):
|
|
r = api_delete(f'/api/peers/{self.PEER_NAME}')
|
|
assert r.status_code == 200
|
|
assert get_peer(self.PEER_NAME) is None
|
|
|
|
def test_iptables_rules_removed_after_delete(self):
|
|
# Peer was deleted in the previous test — rules must be gone
|
|
# We don't have the IP cached here, so verify no test-full comment exists
|
|
fw = iptables_forward()
|
|
comment = f'pic-peer-'
|
|
# Build expected comment from peer name (we need the IP — check all lines)
|
|
# If the peer is gone, no rules with this peer's typical IP should mention the test name
|
|
# We verify by checking no 'integration-test-full' style comment exists
|
|
# (Comments use IPs, not names — so just verify the previous peer IP is gone)
|
|
# Since we can't get the IP after deletion, we verify the list is clean
|
|
remaining = api_get('/api/peers').json()
|
|
names = [p['peer'] for p in remaining]
|
|
assert self.PEER_NAME not in names
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Peer with RESTRICTED service access (calendar only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPeerRestrictedAccess:
|
|
PEER_NAME = 'integration-test-restricted'
|
|
|
|
def test_create_peer_restricted_access(self):
|
|
keys = generate_keys(self.PEER_NAME)
|
|
r = api_post('/api/peers', json={
|
|
'name': self.PEER_NAME,
|
|
'public_key': keys['public_key'],
|
|
'service_access': ['calendar'],
|
|
'internet_access': False,
|
|
'password': TEST_PEER_PASSWORD,
|
|
})
|
|
assert r.status_code == 201, f"Peer creation failed: {r.text}"
|
|
|
|
def test_peer_service_access_stored_correctly(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
assert peer is not None
|
|
assert peer['service_access'] == ['calendar']
|
|
assert peer.get('internet_access') is False
|
|
|
|
def test_iptables_calendar_accepted(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
vips = get_service_vips()
|
|
assert_iptables_accept(peer['ip'], 'calendar', vips)
|
|
|
|
def test_iptables_other_services_dropped(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
vips = get_service_vips()
|
|
for svc in ('files', 'mail', 'webdav'):
|
|
assert_iptables_drop(peer['ip'], svc, vips)
|
|
|
|
def test_iptables_no_internet_accept(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
rules = peer_rules(peer['ip'])
|
|
# internet_access=False → no catch-all ACCEPT (no -d rule that is ACCEPT)
|
|
catch_all_accept = [r for r in rules if '-j ACCEPT' in r and '-d ' not in r]
|
|
assert not catch_all_accept, (
|
|
f"internet_access=False peer should not have catch-all ACCEPT.\nRules:\n"
|
|
+ "\n".join(rules)
|
|
)
|
|
|
|
def test_update_peer_add_files_access(self):
|
|
r = api_put(f'/api/peers/{self.PEER_NAME}',
|
|
json={'service_access': ['calendar', 'files']})
|
|
assert r.status_code == 200
|
|
|
|
def test_iptables_updated_after_service_change(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
vips = get_service_vips()
|
|
assert_iptables_accept(peer['ip'], 'calendar', vips)
|
|
assert_iptables_accept(peer['ip'], 'files', vips)
|
|
assert_iptables_drop(peer['ip'], 'mail', vips)
|
|
assert_iptables_drop(peer['ip'], 'webdav', vips)
|
|
|
|
def test_delete_restricted_peer(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
assert peer is not None
|
|
peer_ip = peer['ip']
|
|
|
|
r = api_delete(f'/api/peers/{self.PEER_NAME}')
|
|
assert r.status_code == 200
|
|
assert get_peer(self.PEER_NAME) is None
|
|
|
|
remaining_rules = peer_rules(peer_ip)
|
|
assert not remaining_rules, (
|
|
f"Iptables rules remain after deletion of {self.PEER_NAME} ({peer_ip}):\n"
|
|
+ "\n".join(remaining_rules)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Peer with NO service access and NO internet
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPeerNoAccess:
|
|
PEER_NAME = 'integration-test-none'
|
|
|
|
def test_create_peer_no_access(self):
|
|
keys = generate_keys(self.PEER_NAME)
|
|
r = api_post('/api/peers', json={
|
|
'name': self.PEER_NAME,
|
|
'public_key': keys['public_key'],
|
|
'service_access': [],
|
|
'internet_access': False,
|
|
'peer_access': False,
|
|
'password': TEST_PEER_PASSWORD,
|
|
})
|
|
assert r.status_code == 201, f"Peer creation failed: {r.text}"
|
|
|
|
def test_peer_stored_with_empty_service_access(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
assert peer is not None
|
|
assert peer['service_access'] == []
|
|
assert peer.get('internet_access') is False
|
|
assert peer.get('peer_access') is False
|
|
|
|
def test_iptables_all_services_dropped(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
vips = get_service_vips()
|
|
for svc in ALL_SERVICES:
|
|
assert_iptables_drop(peer['ip'], svc, vips)
|
|
|
|
def test_iptables_peer_to_peer_dropped(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
rules = peer_rules(peer['ip'])
|
|
# peer_access=False → 10.0.0.0/24 should be DROP
|
|
peer_net_drop = [r for r in rules if '10.0.0.0/24' in r and 'DROP' in r]
|
|
assert peer_net_drop, (
|
|
f"Expected DROP rule for peer-to-peer traffic on {self.PEER_NAME}\n"
|
|
+ "\n".join(rules)
|
|
)
|
|
|
|
def test_delete_no_access_peer(self):
|
|
peer = get_peer(self.PEER_NAME)
|
|
assert peer is not None
|
|
peer_ip = peer['ip']
|
|
|
|
r = api_delete(f'/api/peers/{self.PEER_NAME}')
|
|
assert r.status_code == 200
|
|
|
|
remaining_rules = peer_rules(peer_ip)
|
|
assert not remaining_rules, (
|
|
f"Iptables rules remain after deletion ({peer_ip}):\n"
|
|
+ "\n".join(remaining_rules)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Concurrent peer registry consistency
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPeerRegistryConsistency:
|
|
def test_peer_ips_are_unique(self):
|
|
peers = api_get('/api/peers').json()
|
|
ips = [p['ip'] for p in peers]
|
|
assert len(ips) == len(set(ips)), f"Duplicate IPs in peer registry: {ips}"
|
|
|
|
def test_all_peer_ips_in_wireguard_subnet(self):
|
|
import ipaddress
|
|
cfg = api_get('/api/config').json()
|
|
wg_addr = cfg.get('service_configs', {}).get('wireguard', {}).get('address', '')
|
|
if not wg_addr:
|
|
pytest.skip("No WireGuard address configured")
|
|
network = ipaddress.ip_network(wg_addr, strict=False)
|
|
peers = api_get('/api/peers').json()
|
|
for peer in peers:
|
|
ip_str = peer['ip'].split('/')[0]
|
|
ip = ipaddress.ip_address(ip_str)
|
|
assert ip in network, (
|
|
f"Peer {peer['peer']} IP {ip_str} is outside WireGuard subnet {network}"
|
|
)
|
|
|
|
def test_each_live_peer_has_iptables_rules(self):
|
|
peers = api_get('/api/peers').json()
|
|
for peer in peers:
|
|
rules = peer_rules(peer['ip'])
|
|
assert rules, (
|
|
f"Peer {peer['peer']} ({peer['ip']}) has no iptables rules — "
|
|
"enforcement is missing"
|
|
)
|