#!/usr/bin/env python3 """ Tests for POST /api/peers (peer provisioning) and DELETE /api/peers/. The new provisioning flow (added in the auth system) requires: - POST /api/peers body includes 'password' - On success: auth_manager, email_manager, calendar_manager, file_manager each have their create methods called once - On failure of any downstream service: earlier steps are rolled back - DELETE /api/peers/ must also tear down all four service accounts All external managers are mocked so Docker and real services are never touched. admin_client is an authenticated admin session. """ import os import sys import json from pathlib import Path from unittest.mock import patch, MagicMock, call import pytest sys.path.insert(0, str(Path(__file__).parent.parent / 'api')) from app import app from auth_manager import AuthManager # ── helpers ─────────────────────────────────────────────────────────────────── def _make_auth_manager(tmp_path): data_dir = str(tmp_path / 'data') config_dir = str(tmp_path / 'config') os.makedirs(data_dir, exist_ok=True) os.makedirs(config_dir, exist_ok=True) mgr = AuthManager(data_dir=data_dir, config_dir=config_dir) mgr.create_user('admin', 'AdminPass123!', 'admin') return mgr def _login(client, username='admin', password='AdminPass123!'): return client.post( '/api/auth/login', data=json.dumps({'username': username, 'password': password}), content_type='application/json', ) def _peer_payload(**overrides): base = { 'name': 'alice', 'public_key': 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', 'password': 'AlicePass123!', } base.update(overrides) return base def _post_peer(client, payload=None): if payload is None: payload = _peer_payload() return client.post( '/api/peers', data=json.dumps(payload), content_type='application/json', ) def _delete_peer(client, name='alice'): return client.delete(f'/api/peers/{name}') # ── fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def auth_mgr(tmp_path): return _make_auth_manager(tmp_path) @pytest.fixture def mock_email_mgr(): m = MagicMock() m.create_email_user.return_value = True m.delete_email_user.return_value = True return m @pytest.fixture def mock_calendar_mgr(): m = MagicMock() m.create_calendar_user.return_value = True m.delete_calendar_user.return_value = True return m @pytest.fixture def mock_file_mgr(): m = MagicMock() m.create_user.return_value = True m.delete_user.return_value = True return m @pytest.fixture def mock_wg_mgr(): m = MagicMock() m.add_peer.return_value = {'success': True, 'ip': '10.0.0.5'} m.remove_peer.return_value = True m._get_configured_address.return_value = '10.0.0.1/24' return m @pytest.fixture def mock_peer_registry(): m = MagicMock() m.add_peer.return_value = True m.remove_peer.return_value = True m.get_peer.return_value = { 'peer': 'alice', 'ip': '10.0.0.5', 'public_key': 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', 'service_access': ['mail', 'calendar', 'files', 'webdav'], } m.list_peers.return_value = [] return m @pytest.fixture def admin_client(auth_mgr, mock_email_mgr, mock_calendar_mgr, mock_file_mgr, mock_wg_mgr, mock_peer_registry): """Authenticated admin client with all service managers mocked.""" app.config['TESTING'] = True app.config['SECRET_KEY'] = 'test-secret' patches = [ patch('app.auth_manager', auth_mgr), patch('app.email_manager', mock_email_mgr), patch('app.calendar_manager', mock_calendar_mgr), patch('app.file_manager', mock_file_mgr), patch('app.wireguard_manager', mock_wg_mgr), patch('app.peer_registry', mock_peer_registry), # Prevent firewall_manager from running real iptables commands patch('app.firewall_manager'), ] try: import auth_routes patches.append( patch.object(auth_routes, 'auth_manager', auth_mgr, create=True) ) except (ImportError, AttributeError): pass started = [p.start() for p in patches] try: with app.test_client() as client: r = _login(client) assert r.status_code == 200, f'admin login failed: {r.status_code} {r.data}' yield client finally: for p in patches: p.stop() # ── POST /api/peers — happy path ────────────────────────────────────────────── def test_create_peer_returns_201(admin_client): r = _post_peer(admin_client) assert r.status_code == 201 def test_create_peer_provisions_all_services( admin_client, auth_mgr, mock_email_mgr, mock_calendar_mgr, mock_file_mgr): """All four service create methods must be called exactly once.""" _post_peer(admin_client) # auth provisioning — check user was created in the real auth_mgr # (we use the real auth_mgr so we can inspect the result directly) alice = auth_mgr.get_user('alice') assert alice is not None, 'auth_manager.create_user was not called for alice' mock_email_mgr.create_email_user.assert_called_once() mock_calendar_mgr.create_calendar_user.assert_called_once() mock_file_mgr.create_user.assert_called_once() def test_create_peer_response_has_ip(admin_client): r = _post_peer(admin_client) data = json.loads(r.data) assert 'ip' in data or 'message' in data # either shape is acceptable # ── POST /api/peers — validation ────────────────────────────────────────────── def test_create_peer_requires_password(admin_client): payload = _peer_payload() del payload['password'] r = _post_peer(admin_client, payload) assert r.status_code == 400 def test_create_peer_password_too_short(admin_client): r = _post_peer(admin_client, _peer_payload(password='abc')) assert r.status_code == 400 def test_create_peer_requires_name(admin_client): payload = _peer_payload() del payload['name'] r = _post_peer(admin_client, payload) assert r.status_code == 400 def test_create_peer_requires_public_key(admin_client): payload = _peer_payload() del payload['public_key'] r = _post_peer(admin_client, payload) assert r.status_code == 400 # ── POST /api/peers — rollback on failure ───────────────────────────────────── def test_create_peer_rollback_on_email_failure( auth_mgr, mock_email_mgr, mock_calendar_mgr, mock_file_mgr, mock_wg_mgr, mock_peer_registry): """If email_manager.create_email_user raises, auth user must be deleted (rollback).""" mock_email_mgr.create_email_user.side_effect = RuntimeError('SMTP server down') app.config['TESTING'] = True app.config['SECRET_KEY'] = 'test-secret' patches = [ patch('app.auth_manager', auth_mgr), patch('app.email_manager', mock_email_mgr), patch('app.calendar_manager', mock_calendar_mgr), patch('app.file_manager', mock_file_mgr), patch('app.wireguard_manager', mock_wg_mgr), patch('app.peer_registry', mock_peer_registry), patch('app.firewall_manager'), ] try: import auth_routes patches.append( patch.object(auth_routes, 'auth_manager', auth_mgr, create=True) ) except (ImportError, AttributeError): pass started = [p.start() for p in patches] try: with app.test_client() as client: r = _login(client) assert r.status_code == 200 _post_peer(client) # alice must not remain in the auth store (rolled back) alice = auth_mgr.get_user('alice') assert alice is None, ( 'auth user alice was not rolled back after email_manager failure' ) finally: for p in patches: p.stop() def test_create_peer_rollback_on_wireguard_failure( auth_mgr, mock_email_mgr, mock_calendar_mgr, mock_file_mgr, mock_wg_mgr, mock_peer_registry): """If peer_registry.add_peer (WireGuard side) fails, all four service accounts must be deleted.""" mock_peer_registry.add_peer.return_value = False app.config['TESTING'] = True app.config['SECRET_KEY'] = 'test-secret' patches = [ patch('app.auth_manager', auth_mgr), patch('app.email_manager', mock_email_mgr), patch('app.calendar_manager', mock_calendar_mgr), patch('app.file_manager', mock_file_mgr), patch('app.wireguard_manager', mock_wg_mgr), patch('app.peer_registry', mock_peer_registry), patch('app.firewall_manager'), ] try: import auth_routes patches.append( patch.object(auth_routes, 'auth_manager', auth_mgr, create=True) ) except (ImportError, AttributeError): pass started = [p.start() for p in patches] try: with app.test_client() as client: r = _login(client) assert r.status_code == 200 _post_peer(client) # All service delete methods should have been called for cleanup mock_email_mgr.delete_email_user.assert_called() mock_calendar_mgr.delete_calendar_user.assert_called() mock_file_mgr.delete_user.assert_called() finally: for p in patches: p.stop() # ── DELETE /api/peers/ ────────────────────────────────────────────────── def test_delete_peer_returns_200(admin_client): r = _delete_peer(admin_client, 'alice') assert r.status_code == 200 def test_delete_peer_cleans_all_services( auth_mgr, mock_email_mgr, mock_calendar_mgr, mock_file_mgr, mock_wg_mgr, mock_peer_registry): """DELETE /api/peers/ must call delete on all four service managers.""" app.config['TESTING'] = True app.config['SECRET_KEY'] = 'test-secret' patches = [ patch('app.auth_manager', auth_mgr), patch('app.email_manager', mock_email_mgr), patch('app.calendar_manager', mock_calendar_mgr), patch('app.file_manager', mock_file_mgr), patch('app.wireguard_manager', mock_wg_mgr), patch('app.peer_registry', mock_peer_registry), patch('app.firewall_manager'), ] try: import auth_routes patches.append( patch.object(auth_routes, 'auth_manager', auth_mgr, create=True) ) except (ImportError, AttributeError): pass started = [p.start() for p in patches] try: with app.test_client() as client: r = _login(client) assert r.status_code == 200 # Seed the auth store so auth_manager.delete_user has something to delete auth_mgr.create_user('alice', 'AlicePass123!', 'peer') _delete_peer(client, 'alice') # All four service delete methods must have been invoked mock_email_mgr.delete_email_user.assert_called() mock_calendar_mgr.delete_calendar_user.assert_called() mock_file_mgr.delete_user.assert_called() alice = auth_mgr.get_user('alice') assert alice is None, 'auth user alice was not removed on peer delete' finally: for p in patches: p.stop() def test_delete_nonexistent_peer_returns_gracefully(admin_client, mock_peer_registry): mock_peer_registry.get_peer.return_value = None mock_peer_registry.remove_peer.return_value = False r = _delete_peer(admin_client, 'nobody') # Route must not 500 when the peer simply doesn't exist assert r.status_code in (200, 404)