#!/usr/bin/env python3 """Tests for the audit after_request hook, auth-route audit calls, and audit API authz.""" import os import sys import json from pathlib import Path from unittest.mock import patch import contextlib import pytest sys.path.insert(0, str(Path(__file__).parent.parent / 'api')) from app import app from auth_manager import AuthManager from audit_manager import AuditManager def _make_auth_manager(tmp_path): data_dir = str(tmp_path / 'data') config_dir = str(tmp_path / 'config') os.makedirs(data_dir, exist_ok=True) os.makedirs(config_dir, exist_ok=True) mgr = AuthManager(data_dir=data_dir, config_dir=config_dir) mgr.create_user('admin', 'AdminPass123!', 'admin') mgr.create_user('alice', 'AlicePass123!', 'peer') return mgr def _login(client, username, password): return client.post('/api/auth/login', data=json.dumps({'username': username, 'password': password}), content_type='application/json') @contextlib.contextmanager def _client(auth_mgr, audit_mgr, login_as=None): app.config['TESTING'] = True app.config['SECRET_KEY'] = 'test-secret' with patch('app.auth_manager', auth_mgr), \ patch('app.audit_manager', audit_mgr): import auth_routes with patch.object(auth_routes, 'auth_manager', auth_mgr, create=True): with app.test_client() as c: if login_as == 'admin': assert _login(c, 'admin', 'AdminPass123!').status_code == 200 elif login_as == 'peer': assert _login(c, 'alice', 'AlicePass123!').status_code == 200 yield c @pytest.fixture def auth_mgr(tmp_path): return _make_auth_manager(tmp_path) @pytest.fixture def audit_mgr(tmp_path): return AuditManager(data_dir=str(tmp_path / 'auditdata'), config_dir=str(tmp_path / 'auditcfg')) # ── after_request capture ───────────────────────────────────────────────────── def test_post_peers_records_peer_create(auth_mgr, audit_mgr): with _client(auth_mgr, audit_mgr, login_as='admin') as c: with patch('app.peer_registry') as pr: pr.add_peer.return_value = {'success': True, 'peer': {'name': 'bob'}} c.post('/api/peers', json={'name': 'bob'}) res = audit_mgr.query({'action': 'peer.create'}) assert res['total'] >= 1 e = res['entries'][0] assert e['target_type'] == 'peer' assert e['method'] == 'POST' assert e['actor'] == 'admin' assert e['role'] == 'admin' def test_4xx_records_failure(auth_mgr, audit_mgr): with _client(auth_mgr, audit_mgr, login_as='admin') as c: # missing body -> handler returns 400 c.post('/api/peers', json={}) res = audit_mgr.query({'action': 'peer.create'}) assert res['total'] >= 1 assert res['entries'][0]['result'] == 'failure' def test_config_update_summary_lists_key_names_only(auth_mgr, audit_mgr): # The summary is built from request-body key names regardless of the # handler outcome, so we assert only on the recorded audit entry. with _client(auth_mgr, audit_mgr, login_as='admin') as c: c.put('/api/config', json={'email': {'smtp_password': 'hunter2supersecret', 'smtp_port': 25}}) res = audit_mgr.query({'action': 'config.update'}) assert res['total'] >= 1 summary = res['entries'][0]['summary'] assert 'smtp_port' in summary assert 'smtp_password' in summary # key NAME is allowed assert 'hunter2supersecret' not in summary # value never recorded def test_unmapped_mutating_endpoint_gets_generic_action(auth_mgr, audit_mgr): # email.send_email is NOT in ROUTE_ACTION_MAP — it must still be recorded # via the generic "." fallback so nothing is invisible. with _client(auth_mgr, audit_mgr, login_as='admin') as c: c.post('/api/email/send', json={}) entries = audit_mgr.query({})['entries'] match = [e for e in entries if e['path'] == '/api/email/send'] assert match, 'unmapped mutating endpoint was not audited' assert match[0]['action'] == 'post./api/email/send' assert match[0]['target_type'] == 'unknown' # ── auth routes: never write password ───────────────────────────────────────── def test_change_password_audited_without_value(auth_mgr, audit_mgr): with _client(auth_mgr, audit_mgr, login_as='admin') as c: c.post('/api/auth/change-password', json={'old_password': 'AdminPass123!', 'new_password': 'BrandNewPass456!'}) res = audit_mgr.query({'action': 'user.password_change'}) assert res['total'] == 1 raw = json.dumps(res['entries'][0]) assert 'AdminPass123!' not in raw assert 'BrandNewPass456!' not in raw assert res['entries'][0]['summary'] == 'password changed' def test_admin_reset_password_audited_without_value(auth_mgr, audit_mgr): with _client(auth_mgr, audit_mgr, login_as='admin') as c: c.post('/api/auth/admin/reset-password', json={'username': 'alice', 'new_password': 'ResetPass789!'}) res = audit_mgr.query({'action': 'user.password_reset'}) assert res['total'] == 1 raw = json.dumps(res['entries'][0]) assert 'ResetPass789!' not in raw assert 'alice' in res['entries'][0]['summary'] def test_auth_login_does_not_write_password(auth_mgr, audit_mgr): with _client(auth_mgr, audit_mgr) as c: _login(c, 'admin', 'AdminPass123!') res = audit_mgr.query({}) for e in res['entries']: assert 'AdminPass123!' not in json.dumps(e) # ── audit API authz ─────────────────────────────────────────────────────────── def test_peer_forbidden_on_audit_list(auth_mgr, audit_mgr): with _client(auth_mgr, audit_mgr, login_as='peer') as c: r = c.get('/api/audit') assert r.status_code == 403 def test_admin_allowed_on_audit_list(auth_mgr, audit_mgr): audit_mgr.record('admin', 'admin', '', 'peer.create', 'peer', 'bob', '', 'success', 201, 'POST', '/api/peers', '') with _client(auth_mgr, audit_mgr, login_as='admin') as c: r = c.get('/api/audit') assert r.status_code == 200 body = r.get_json() assert body['total'] >= 1 assert 'entries' in body def test_audit_verify_endpoint(auth_mgr, audit_mgr): audit_mgr.record('admin', 'admin', '', 'x', '', '', '', 'success', 200, 'POST', '/api/x', '') with _client(auth_mgr, audit_mgr, login_as='admin') as c: r = c.get('/api/audit/verify') assert r.status_code == 200 assert r.get_json()['ok'] is True def test_audit_export_csv(auth_mgr, audit_mgr): audit_mgr.record('admin', 'admin', '', 'peer.create', 'peer', 'bob', '', 'success', 201, 'POST', '/api/peers', '') with _client(auth_mgr, audit_mgr, login_as='admin') as c: r = c.get('/api/audit/export?format=csv') assert r.status_code == 200 assert 'text/csv' in r.content_type assert b'peer.create' in r.data