Files
pic/tests/test_peer_provisioning.py
roof d54844cd44 fix(P2): peer add rollback, helper failure recovery, manager extraction (A2/A3/A5)
A3 — Peer add atomicity: track firewall_applied flag and call
clear_peer_rules() during rollback so partial peer-add failures
don't leave stale iptables rules behind. Added test.

A2 — Pending config flag: instead of clearing before spawning the
helper container (fire-and-forget), set applying=True and let the
helper clear it on success by writing to cell_config.json via a
mounted /app/data volume. On API restart after a failed apply,
_recover_pending_apply() resets the applying flag so the UI shows
pending changes and the user can retry. GET /api/config/pending now
includes the applying field.

A5 (foundation) — Extract all manager instantiation into managers.py.
app.py re-exports every name so existing test patches (patch('app.X'))
continue to work unchanged. 1021 unit tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 05:27:39 -04:00

420 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Tests for POST /api/peers (peer provisioning) and DELETE /api/peers/<name>.
The new provisioning flow (added in the auth system) requires:
- POST /api/peers body includes 'password'
- On success: auth_manager, email_manager, calendar_manager, file_manager
each have their create methods called once
- On failure of any downstream service: earlier steps are rolled back
- DELETE /api/peers/<name> must also tear down all four service accounts
All external managers are mocked so Docker and real services are never touched.
admin_client is an authenticated admin session.
"""
import os
import sys
import json
from pathlib import Path
from unittest.mock import patch, MagicMock, call
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / 'api'))
from app import app
from auth_manager import AuthManager
# ── helpers ───────────────────────────────────────────────────────────────────
def _make_auth_manager(tmp_path):
data_dir = str(tmp_path / 'data')
config_dir = str(tmp_path / 'config')
os.makedirs(data_dir, exist_ok=True)
os.makedirs(config_dir, exist_ok=True)
mgr = AuthManager(data_dir=data_dir, config_dir=config_dir)
mgr.create_user('admin', 'AdminPass123!', 'admin')
return mgr
def _login(client, username='admin', password='AdminPass123!'):
return client.post(
'/api/auth/login',
data=json.dumps({'username': username, 'password': password}),
content_type='application/json',
)
def _peer_payload(**overrides):
base = {
'name': 'alice',
'public_key': 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=',
'password': 'AlicePass123!',
}
base.update(overrides)
return base
def _post_peer(client, payload=None):
if payload is None:
payload = _peer_payload()
return client.post(
'/api/peers',
data=json.dumps(payload),
content_type='application/json',
)
def _delete_peer(client, name='alice'):
return client.delete(f'/api/peers/{name}')
# ── fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def auth_mgr(tmp_path):
return _make_auth_manager(tmp_path)
@pytest.fixture
def mock_email_mgr():
m = MagicMock()
m.create_email_user.return_value = True
m.delete_email_user.return_value = True
return m
@pytest.fixture
def mock_calendar_mgr():
m = MagicMock()
m.create_calendar_user.return_value = True
m.delete_calendar_user.return_value = True
return m
@pytest.fixture
def mock_file_mgr():
m = MagicMock()
m.create_user.return_value = True
m.delete_user.return_value = True
return m
@pytest.fixture
def mock_wg_mgr():
m = MagicMock()
m.add_peer.return_value = {'success': True, 'ip': '10.0.0.5'}
m.remove_peer.return_value = True
m._get_configured_address.return_value = '10.0.0.1/24'
return m
@pytest.fixture
def mock_peer_registry():
m = MagicMock()
m.add_peer.return_value = True
m.remove_peer.return_value = True
m.get_peer.return_value = {
'peer': 'alice',
'ip': '10.0.0.5',
'public_key': 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=',
'service_access': ['mail', 'calendar', 'files', 'webdav'],
}
m.list_peers.return_value = []
return m
@pytest.fixture
def admin_client(auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry):
"""Authenticated admin client with all service managers mocked."""
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
patches = [
patch('app.auth_manager', auth_mgr),
patch('app.email_manager', mock_email_mgr),
patch('app.calendar_manager', mock_calendar_mgr),
patch('app.file_manager', mock_file_mgr),
patch('app.wireguard_manager', mock_wg_mgr),
patch('app.peer_registry', mock_peer_registry),
# Prevent firewall_manager from running real iptables commands
patch('app.firewall_manager'),
]
try:
import auth_routes
patches.append(
patch.object(auth_routes, 'auth_manager', auth_mgr, create=True)
)
except (ImportError, AttributeError):
pass
started = [p.start() for p in patches]
try:
with app.test_client() as client:
r = _login(client)
assert r.status_code == 200, f'admin login failed: {r.status_code} {r.data}'
yield client
finally:
for p in patches:
p.stop()
# ── POST /api/peers — happy path ──────────────────────────────────────────────
def test_create_peer_returns_201(admin_client):
r = _post_peer(admin_client)
assert r.status_code == 201
def test_create_peer_provisions_all_services(
admin_client, auth_mgr,
mock_email_mgr, mock_calendar_mgr, mock_file_mgr):
"""All four service create methods must be called exactly once."""
_post_peer(admin_client)
# auth provisioning — check user was created in the real auth_mgr
# (we use the real auth_mgr so we can inspect the result directly)
alice = auth_mgr.get_user('alice')
assert alice is not None, 'auth_manager.create_user was not called for alice'
mock_email_mgr.create_email_user.assert_called_once()
mock_calendar_mgr.create_calendar_user.assert_called_once()
mock_file_mgr.create_user.assert_called_once()
def test_create_peer_response_has_ip(admin_client):
r = _post_peer(admin_client)
data = json.loads(r.data)
assert 'ip' in data or 'message' in data # either shape is acceptable
# ── POST /api/peers — validation ──────────────────────────────────────────────
def test_create_peer_requires_password(admin_client):
payload = _peer_payload()
del payload['password']
r = _post_peer(admin_client, payload)
assert r.status_code == 400
def test_create_peer_password_too_short(admin_client):
r = _post_peer(admin_client, _peer_payload(password='abc'))
assert r.status_code == 400
def test_create_peer_requires_name(admin_client):
payload = _peer_payload()
del payload['name']
r = _post_peer(admin_client, payload)
assert r.status_code == 400
def test_create_peer_requires_public_key(admin_client):
payload = _peer_payload()
del payload['public_key']
r = _post_peer(admin_client, payload)
assert r.status_code == 400
# ── POST /api/peers — rollback on failure ─────────────────────────────────────
def test_create_peer_email_failure_is_nonfatal(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry):
"""Email provisioning is best-effort: if create_email_user raises, peer creation
still succeeds (201) and the auth user is NOT rolled back."""
mock_email_mgr.create_email_user.side_effect = RuntimeError('SMTP server down')
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
patches = [
patch('app.auth_manager', auth_mgr),
patch('app.email_manager', mock_email_mgr),
patch('app.calendar_manager', mock_calendar_mgr),
patch('app.file_manager', mock_file_mgr),
patch('app.wireguard_manager', mock_wg_mgr),
patch('app.peer_registry', mock_peer_registry),
patch('app.firewall_manager'),
]
try:
import auth_routes
patches.append(
patch.object(auth_routes, 'auth_manager', auth_mgr, create=True)
)
except (ImportError, AttributeError):
pass
started = [p.start() for p in patches]
try:
with app.test_client() as client:
r = _login(client)
assert r.status_code == 200
resp = _post_peer(client)
# Peer creation must succeed despite email failure (best-effort)
assert resp.status_code == 201, (
f'expected 201 but got {resp.status_code}: {resp.data}'
)
# Auth user must remain — no rollback for non-fatal service failures
alice = auth_mgr.get_user('alice')
assert alice is not None, (
'auth user alice was incorrectly rolled back after non-fatal email failure'
)
finally:
for p in patches:
p.stop()
def test_create_peer_rollback_on_wireguard_failure(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry):
"""If peer_registry.add_peer (WireGuard side) fails, all four service accounts
must be deleted."""
mock_peer_registry.add_peer.return_value = False
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
patches = [
patch('app.auth_manager', auth_mgr),
patch('app.email_manager', mock_email_mgr),
patch('app.calendar_manager', mock_calendar_mgr),
patch('app.file_manager', mock_file_mgr),
patch('app.wireguard_manager', mock_wg_mgr),
patch('app.peer_registry', mock_peer_registry),
patch('app.firewall_manager'),
]
try:
import auth_routes
patches.append(
patch.object(auth_routes, 'auth_manager', auth_mgr, create=True)
)
except (ImportError, AttributeError):
pass
started = [p.start() for p in patches]
try:
with app.test_client() as client:
r = _login(client)
assert r.status_code == 200
_post_peer(client)
# All service delete methods should have been called for cleanup
mock_email_mgr.delete_email_user.assert_called()
mock_calendar_mgr.delete_calendar_user.assert_called()
mock_file_mgr.delete_user.assert_called()
finally:
for p in patches:
p.stop()
# ── DELETE /api/peers/<name> ──────────────────────────────────────────────────
def test_delete_peer_returns_200(admin_client):
r = _delete_peer(admin_client, 'alice')
assert r.status_code == 200
def test_delete_peer_cleans_all_services(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry):
"""DELETE /api/peers/<name> must call delete on all four service managers."""
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
patches = [
patch('app.auth_manager', auth_mgr),
patch('app.email_manager', mock_email_mgr),
patch('app.calendar_manager', mock_calendar_mgr),
patch('app.file_manager', mock_file_mgr),
patch('app.wireguard_manager', mock_wg_mgr),
patch('app.peer_registry', mock_peer_registry),
patch('app.firewall_manager'),
]
try:
import auth_routes
patches.append(
patch.object(auth_routes, 'auth_manager', auth_mgr, create=True)
)
except (ImportError, AttributeError):
pass
started = [p.start() for p in patches]
try:
with app.test_client() as client:
r = _login(client)
assert r.status_code == 200
# Seed the auth store so auth_manager.delete_user has something to delete
auth_mgr.create_user('alice', 'AlicePass123!', 'peer')
_delete_peer(client, 'alice')
# All four service delete methods must have been invoked
mock_email_mgr.delete_email_user.assert_called()
mock_calendar_mgr.delete_calendar_user.assert_called()
mock_file_mgr.delete_user.assert_called()
alice = auth_mgr.get_user('alice')
assert alice is None, 'auth user alice was not removed on peer delete'
finally:
for p in patches:
p.stop()
def test_delete_nonexistent_peer_returns_gracefully(admin_client, mock_peer_registry):
mock_peer_registry.get_peer.return_value = None
mock_peer_registry.remove_peer.return_value = False
r = _delete_peer(admin_client, 'nobody')
# Route must not 500 when the peer simply doesn't exist
assert r.status_code in (200, 404)
# ── POST /api/peers — firewall rollback (A3) ──────────────────────────────────
def test_create_peer_rolls_back_firewall_on_dns_failure(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry):
"""If apply_all_dns_rules raises after firewall rules were applied, the peer
add must call clear_peer_rules to undo the firewall state (A3 fix)."""
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
mock_fw = MagicMock()
mock_fw.apply_peer_rules.return_value = True
mock_fw.apply_all_dns_rules.side_effect = RuntimeError('CoreDNS unreachable')
patches = [
patch('app.auth_manager', auth_mgr),
patch('app.email_manager', mock_email_mgr),
patch('app.calendar_manager', mock_calendar_mgr),
patch('app.file_manager', mock_file_mgr),
patch('app.wireguard_manager', mock_wg_mgr),
patch('app.peer_registry', mock_peer_registry),
patch('app.firewall_manager', mock_fw),
]
try:
import auth_routes
patches.append(patch.object(auth_routes, 'auth_manager', auth_mgr, create=True))
except (ImportError, AttributeError):
pass
started = [p.start() for p in patches]
try:
with app.test_client() as client:
r = _login(client)
assert r.status_code == 200
resp = _post_peer(client)
assert resp.status_code == 500, (
f'expected 500 on DNS failure but got {resp.status_code}'
)
# Firewall rules must be cleared as part of rollback
mock_fw.clear_peer_rules.assert_called_once()
# Registry entry must also be rolled back
mock_peer_registry.remove_peer.assert_called_once()
finally:
for p in patches:
p.stop()