Files
pic/tests/test_logs_endpoints.py
T
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

364 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Unit tests for logs Flask endpoints in api/app.py.
Covers:
GET /api/logs — backend log file (reads picell.log)
GET /api/logs/services/<service> — per-service logs via log_manager
POST /api/logs/search — search across services
POST /api/logs/export — export logs
GET /api/logs/statistics — log stats
POST /api/logs/rotate — rotate logs
GET /api/logs/files — list log file info
GET /api/logs/verbosity — get log levels
PUT /api/logs/verbosity — set log levels
"""
import sys
import json
import os
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock, mock_open
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
class TestGetBackendLogs(unittest.TestCase):
"""GET /api/logs — reads picell.log from api directory."""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
def test_get_logs_returns_404_when_log_file_missing(self):
# Patch os.path.exists so the log file appears absent
with patch('app.os.path.exists', return_value=False):
r = self.client.get('/api/logs')
self.assertEqual(r.status_code, 404)
self.assertIn('error', json.loads(r.data))
def test_get_logs_returns_200_with_log_content(self):
log_content = 'INFO 2026-04-27 server started\nERROR something went wrong\n'
m = mock_open(read_data=log_content)
# Bypass auth enforcement by replacing auth_manager with a non-AuthManager object
with patch('app.auth_manager', MagicMock(spec=object)), \
patch('app.os.path.exists', return_value=True), \
patch('builtins.open', m):
r = self.client.get('/api/logs')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('log', data)
def test_get_logs_respects_lines_query_param(self):
# Produce 10 lines; request only last 3
all_lines = [f'line {i}\n' for i in range(10)]
m = mock_open(read_data=''.join(all_lines))
m.return_value.__iter__ = lambda s: iter(all_lines)
m.return_value.readlines = lambda: all_lines
# Bypass auth enforcement by replacing auth_manager with a non-AuthManager object
with patch('app.auth_manager', MagicMock(spec=object)), \
patch('app.os.path.exists', return_value=True), \
patch('builtins.open', m):
r = self.client.get('/api/logs?lines=3')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
# The tail should contain only the last 3 lines
self.assertIn('line 7', data['log'])
self.assertIn('line 8', data['log'])
self.assertIn('line 9', data['log'])
def test_get_logs_returns_500_on_exception(self):
with patch('app.os.path.exists', return_value=True), \
patch('builtins.open', side_effect=PermissionError('access denied')):
r = self.client.get('/api/logs')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetServiceLogs(unittest.TestCase):
"""GET /api/logs/services/<service>"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_get_service_logs_returns_200_with_log_data(self, mock_lm):
mock_lm.get_service_logs.return_value = [
'[INFO] 2026-04-27 dns started',
'[WARN] 2026-04-27 retry attempt',
]
r = self.client.get('/api/logs/services/dns')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertEqual(data['service'], 'dns')
self.assertIsInstance(data['logs'], list)
self.assertEqual(len(data['logs']), 2)
@patch('app.log_manager')
def test_get_service_logs_passes_level_and_lines_params(self, mock_lm):
mock_lm.get_service_logs.return_value = []
self.client.get('/api/logs/services/email?level=ERROR&lines=25')
mock_lm.get_service_logs.assert_called_once_with('email', 'ERROR', 25)
@patch('app.log_manager')
def test_get_service_logs_uses_defaults_when_params_absent(self, mock_lm):
mock_lm.get_service_logs.return_value = []
self.client.get('/api/logs/services/wireguard')
mock_lm.get_service_logs.assert_called_once_with('wireguard', 'INFO', 50)
@patch('app.log_manager')
def test_get_service_logs_returns_500_on_exception(self, mock_lm):
mock_lm.get_service_logs.side_effect = Exception('log file missing')
r = self.client.get('/api/logs/services/calendar')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestSearchLogs(unittest.TestCase):
"""POST /api/logs/search"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_search_logs_returns_200_with_results_and_count(self, mock_lm):
mock_lm.search_logs.return_value = [
{'service': 'dns', 'line': 'ERROR timeout'},
]
r = self.client.post(
'/api/logs/search',
data=json.dumps({'query': 'ERROR', 'services': ['dns']}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('results', data)
self.assertIn('count', data)
self.assertEqual(data['count'], 1)
@patch('app.log_manager')
def test_search_logs_works_with_empty_body(self, mock_lm):
mock_lm.search_logs.return_value = []
r = self.client.post('/api/logs/search')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertEqual(data['results'], [])
self.assertEqual(data['count'], 0)
@patch('app.log_manager')
def test_search_logs_returns_500_on_exception(self, mock_lm):
mock_lm.search_logs.side_effect = Exception('index unavailable')
r = self.client.post(
'/api/logs/search',
data=json.dumps({'query': 'fail'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestExportLogs(unittest.TestCase):
"""POST /api/logs/export"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_export_logs_returns_200_with_log_data_and_format(self, mock_lm):
mock_lm.export_logs.return_value = '[{"ts":1,"msg":"ok"}]'
r = self.client.post(
'/api/logs/export',
data=json.dumps({'format': 'json', 'filters': {'service': 'dns'}}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('logs', data)
self.assertIn('format', data)
self.assertEqual(data['format'], 'json')
@patch('app.log_manager')
def test_export_logs_defaults_to_json_format(self, mock_lm):
mock_lm.export_logs.return_value = '[]'
self.client.post('/api/logs/export')
mock_lm.export_logs.assert_called_once_with('json', {})
@patch('app.log_manager')
def test_export_logs_returns_500_on_exception(self, mock_lm):
mock_lm.export_logs.side_effect = Exception('export failed')
r = self.client.post(
'/api/logs/export',
data=json.dumps({'format': 'csv'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetLogStatistics(unittest.TestCase):
"""GET /api/logs/statistics"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_get_statistics_returns_200_with_stats_dict(self, mock_lm):
mock_lm.get_log_statistics.return_value = {
'total_lines': 1200,
'error_count': 3,
'warn_count': 17,
}
r = self.client.get('/api/logs/statistics')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('total_lines', data)
@patch('app.log_manager')
def test_get_statistics_passes_service_param(self, mock_lm):
mock_lm.get_log_statistics.return_value = {}
self.client.get('/api/logs/statistics?service=email')
mock_lm.get_log_statistics.assert_called_once_with('email')
@patch('app.log_manager')
def test_get_statistics_passes_none_when_no_service_param(self, mock_lm):
mock_lm.get_log_statistics.return_value = {}
self.client.get('/api/logs/statistics')
mock_lm.get_log_statistics.assert_called_once_with(None)
@patch('app.log_manager')
def test_get_statistics_returns_500_on_exception(self, mock_lm):
mock_lm.get_log_statistics.side_effect = Exception('stats error')
r = self.client.get('/api/logs/statistics')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestRotateLogs(unittest.TestCase):
"""POST /api/logs/rotate"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_rotate_all_logs_returns_200(self, mock_lm):
r = self.client.post('/api/logs/rotate')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('message', data)
mock_lm.rotate_logs.assert_called_once_with(None)
@patch('app.log_manager')
def test_rotate_specific_service_passes_service_name(self, mock_lm):
r = self.client.post(
'/api/logs/rotate',
data=json.dumps({'service': 'dns'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
mock_lm.rotate_logs.assert_called_once_with('dns')
@patch('app.log_manager')
def test_rotate_returns_500_on_exception(self, mock_lm):
mock_lm.rotate_logs.side_effect = Exception('rotate failed')
r = self.client.post('/api/logs/rotate')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetLogFileInfos(unittest.TestCase):
"""GET /api/logs/files"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_get_log_files_returns_200_with_file_list(self, mock_lm):
mock_lm.get_all_log_file_infos.return_value = [
{'service': 'dns', 'path': '/data/logs/dns.log', 'size_bytes': 4096},
{'service': 'email', 'path': '/data/logs/email.log', 'size_bytes': 8192},
]
r = self.client.get('/api/logs/files')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 2)
@patch('app.log_manager')
def test_get_log_files_returns_500_on_exception(self, mock_lm):
mock_lm.get_all_log_file_infos.side_effect = Exception('filesystem error')
r = self.client.get('/api/logs/files')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestLogVerbosity(unittest.TestCase):
"""GET /api/logs/verbosity and PUT /api/logs/verbosity"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_get_verbosity_returns_200_with_levels_map(self, mock_lm):
mock_lm.get_service_levels.return_value = {
'dns': 'INFO',
'email': 'DEBUG',
'wireguard': 'WARNING',
}
r = self.client.get('/api/logs/verbosity')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('dns', data)
self.assertEqual(data['email'], 'DEBUG')
@patch('app.log_manager')
def test_get_verbosity_returns_500_on_exception(self, mock_lm):
mock_lm.get_service_levels.side_effect = Exception('config missing')
r = self.client.get('/api/logs/verbosity')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
@patch('app.log_manager')
def test_put_verbosity_returns_200_and_calls_set_level(self, mock_lm):
mock_lm.get_service_levels.return_value = {'dns': 'DEBUG'}
with tempfile.TemporaryDirectory() as tmpdir:
# Endpoint builds: os.path.join(os.path.dirname(__file__), 'config', 'log_levels.json')
# Patch dirname to return tmpdir so the full path becomes tmpdir/config/log_levels.json
config_dir = os.path.join(tmpdir, 'config')
os.makedirs(config_dir)
with patch('app.auth_manager', MagicMock(spec=object)), \
patch('app.os.path.dirname', return_value=tmpdir):
r = self.client.put(
'/api/logs/verbosity',
data=json.dumps({'dns': 'DEBUG'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
mock_lm.set_service_level.assert_called_with('dns', 'DEBUG')
@patch('app.log_manager')
def test_put_verbosity_returns_500_on_exception(self, mock_lm):
mock_lm.set_service_level.side_effect = Exception('unknown service')
r = self.client.put(
'/api/logs/verbosity',
data=json.dumps({'unknown_svc': 'DEBUG'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
if __name__ == '__main__':
unittest.main()