Files
pic/tests/test_log_manager_extra.py
T
roof 13074f56cb
Unit Tests / test (push) Successful in 12m34s
fix: logging verbosity now actually applies + per-service log levels
Root causes fixed:
- Dead LOG_LEVEL globals() lookup pinned root logger at INFO regardless of
  PIC_LOG_LEVEL env or config; replaced with _resolve_root_log_level() +
  apply_root_log_level() which sets both root logger and all attached handlers
  at startup and on runtime re-apply.
- set_service_level() only set the named 'pic.<service>' logger; bare module
  loggers (e.g. 'caddy_manager') were never reached, so per-service log files
  stayed 0 bytes. Fixed via _SERVICE_MODULE_LOGGERS map covering all managers.
- Log viewer GET /api/logs had no level filter; added ?level= query param.
- Per-service log levels lived in an out-of-band config/api/log_levels.json
  side-file with no validation; migrated into ConfigManager under a new
  'logging' section ({python:{root,services}, containers:{caddy,coredns,
  wireguard,mailserver,api}}) with get/set helpers, invalid-level rejection,
  and one-time migration from the old file on first load.

New capabilities:
- Container log levels: Caddy (injects global log { level X } + hot reload),
  CoreDNS (DEBUG enables log plugin, else errors-only), WireGuard/mailserver
  via pending_restart path.
- PUT /api/logs/verbosity accepts {python, containers} dict; returns per-entry
  applied:hot|pending_restart status.
- Webui Logs page gains two-section Verbosity tab (Python services + Container
  services) with needs-restart badges.
- managers.py wires per-service loggers before manager instantiation and
  re-applies persisted levels from ConfigManager; legacy log_levels.json read
  removed.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 19:14:01 -04:00

456 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Additional tests for LogManager covering uncovered paths:
- get_service_logs_parsed (JSON + non-JSON lines, level filtering)
- search_logs (time_range filter, level filter, non-JSON lines)
- _matches_search_criteria (query/time_range/level checks)
- _is_log_level (JSON and text fallback)
- export_logs (json/csv/text/unknown format)
- _logs_to_csv / _logs_to_text
- get_log_statistics (single service and all services)
- get_log_file_info (missing file → error key)
- set_service_level (known and unknown service)
- get_service_levels
- get_all_log_file_infos (active + rotated files)
- compress_old_logs
- stop
- _start_rotation_monitor creates a running thread
"""
import sys
import os
import json
import gzip
import shutil
import tempfile
import time
import unittest
from datetime import datetime, timedelta
from pathlib import Path
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from log_manager import LogManager
def _lm(tmp):
log_dir = os.path.join(tmp, 'logs')
os.makedirs(log_dir, exist_ok=True)
return LogManager(log_dir=log_dir)
def _write_log_file(log_dir, service, entries):
"""Write JSON log entries to a service log file."""
path = os.path.join(log_dir, f'{service}.log')
with open(path, 'w') as f:
for e in entries:
f.write(json.dumps(e) + '\n')
return path
class TestGetServiceLogsParsed(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_missing_log_file_returns_error_entry(self):
result = self.lm.get_service_logs_parsed('nosuchservice')
self.assertEqual(len(result), 1)
self.assertIn('error', result[0])
def test_returns_parsed_json_entries(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'ok'},
{'timestamp': '2026-01-01T00:00:01', 'level': 'ERROR', 'message': 'fail'},
])
result = self.lm.get_service_logs_parsed('svc', level='ALL', lines=10)
self.assertEqual(len(result), 2)
levels = {e['level'] for e in result}
self.assertIn('INFO', levels)
self.assertIn('ERROR', levels)
def test_level_filter_excludes_non_matching(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'ok'},
{'timestamp': '2026-01-01T00:00:01', 'level': 'ERROR', 'message': 'fail'},
])
result = self.lm.get_service_logs_parsed('svc', level='ERROR', lines=10)
self.assertTrue(all(e.get('level') == 'ERROR' for e in result))
def test_non_json_lines_are_included_for_all_level(self):
path = os.path.join(self.lm.log_dir, 'svc.log')
with open(path, 'w') as f:
f.write('plain text log line\n')
result = self.lm.get_service_logs_parsed('svc', level='ALL', lines=10)
self.assertEqual(len(result), 1)
self.assertIn('raw_line', result[0])
class TestSearchLogs(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_search_finds_matching_message(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'user login ok'},
{'timestamp': '2026-01-01T00:00:01', 'level': 'INFO', 'message': 'disk full'},
])
self.lm.service_loggers['svc'] = type('', (), {})() # register so search includes it
results = self.lm.search_logs('login', services=['svc'])
self.assertEqual(len(results), 1)
self.assertIn('login', results[0]['message'])
def test_search_level_filter(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'info msg'},
{'timestamp': '2026-01-01T00:00:01', 'level': 'ERROR', 'message': 'error msg'},
])
results = self.lm.search_logs('', services=['svc'], level='ERROR')
self.assertTrue(all(e.get('level') == 'ERROR' for e in results))
def test_search_time_range_filter(self):
early = '2026-01-01T10:00:00'
late = '2026-01-01T14:00:00'
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': early, 'level': 'INFO', 'message': 'early'},
{'timestamp': late, 'level': 'INFO', 'message': 'late'},
])
# Register logger so search_logs finds the service
self.lm.add_service_logger('svc', {'level': 'INFO'})
start = datetime(2026, 1, 1, 11, 0, 0)
end = datetime(2026, 1, 1, 13, 0, 0)
results = self.lm.search_logs('', services=['svc'], time_range=(start, end))
msgs = [r.get('message', '') for r in results]
# 'early' is within 11:00-13:00 range, 'late' at 14:00 should be excluded
self.assertNotIn('late', msgs)
def test_non_json_lines_matched_by_query(self):
path = os.path.join(self.lm.log_dir, 'svc.log')
with open(path, 'w') as f:
f.write('this line contains keyterm\n')
f.write('unrelated line\n')
results = self.lm.search_logs('keyterm', services=['svc'])
self.assertEqual(len(results), 1)
self.assertIn('keyterm', results[0]['raw_line'])
def test_search_no_services_returns_empty(self):
results = self.lm.search_logs('anything', services=[])
self.assertEqual(results, [])
class TestMatchesSearchCriteria(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_query_match(self):
entry = {'message': 'hello world', 'level': 'INFO'}
self.assertTrue(self.lm._matches_search_criteria(entry, 'hello', None, None))
def test_query_no_match(self):
entry = {'message': 'hello world', 'level': 'INFO'}
self.assertFalse(self.lm._matches_search_criteria(entry, 'nothere', None, None))
def test_level_filter_match(self):
entry = {'message': 'msg', 'level': 'ERROR'}
self.assertTrue(self.lm._matches_search_criteria(entry, '', None, 'ERROR'))
def test_level_filter_no_match(self):
entry = {'message': 'msg', 'level': 'INFO'}
self.assertFalse(self.lm._matches_search_criteria(entry, '', None, 'ERROR'))
def test_time_range_within(self):
entry = {'message': 'msg', 'level': 'INFO', 'timestamp': '2026-06-01T12:00:00'}
start = datetime(2026, 6, 1, 11, 0)
end = datetime(2026, 6, 1, 13, 0)
self.assertTrue(self.lm._matches_search_criteria(entry, '', (start, end), None))
def test_time_range_outside(self):
entry = {'message': 'msg', 'level': 'INFO', 'timestamp': '2026-06-01T14:00:00'}
start = datetime(2026, 6, 1, 11, 0)
end = datetime(2026, 6, 1, 13, 0)
self.assertFalse(self.lm._matches_search_criteria(entry, '', (start, end), None))
def test_time_range_invalid_timestamp_excluded(self):
entry = {'message': 'msg', 'level': 'INFO', 'timestamp': 'not-a-date'}
start = datetime(2026, 6, 1, 11, 0)
end = datetime(2026, 6, 1, 13, 0)
self.assertFalse(self.lm._matches_search_criteria(entry, '', (start, end), None))
class TestIsLogLevel(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_json_line_matches_level(self):
line = json.dumps({'level': 'ERROR', 'message': 'oops'})
self.assertTrue(self.lm._is_log_level(line, 'ERROR'))
def test_json_line_no_match(self):
line = json.dumps({'level': 'INFO', 'message': 'ok'})
self.assertFalse(self.lm._is_log_level(line, 'ERROR'))
def test_text_line_fallback_match(self):
line = '2026-01-01 ERROR something went wrong'
self.assertTrue(self.lm._is_log_level(line, 'ERROR'))
def test_text_line_fallback_no_match(self):
line = '2026-01-01 INFO something went fine'
self.assertFalse(self.lm._is_log_level(line, 'ERROR'))
class TestExportLogs(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_export_json_format(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'hi'},
])
result = self.lm.export_logs('json', filters={'services': ['svc']})
parsed = json.loads(result)
self.assertIsInstance(parsed, list)
def test_export_csv_format(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'hi'},
])
result = self.lm.export_logs('csv', filters={'services': ['svc']})
self.assertIsInstance(result, str)
def test_export_text_format(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'hi'},
])
result = self.lm.export_logs('text', filters={'services': ['svc']})
self.assertIsInstance(result, str)
def test_export_unknown_format_raises(self):
with self.assertRaises(ValueError):
self.lm.export_logs('pdf')
def test_logs_to_csv_empty_returns_empty_string(self):
result = self.lm._logs_to_csv([])
self.assertEqual(result, '')
def test_logs_to_csv_has_header(self):
logs = [{'level': 'INFO', 'message': 'hi', 'timestamp': '2026-01-01T00:00:00'}]
csv = self.lm._logs_to_csv(logs)
lines = csv.split('\n')
self.assertGreater(len(lines), 1)
header_fields = set(lines[0].split(','))
self.assertIn('level', header_fields)
self.assertIn('message', header_fields)
def test_logs_to_text_formats_entries(self):
logs = [{'level': 'ERROR', 'service': 'svc', 'message': 'oops', 'timestamp': '2026-01-01T00:00:00'}]
text = self.lm._logs_to_text(logs)
self.assertIn('ERROR', text)
self.assertIn('oops', text)
class TestGetLogStatistics(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_stats_for_missing_service_log(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
stats = self.lm.get_log_statistics('svc')
# Log file may not have been written yet; should still return dict
self.assertIsInstance(stats, dict)
def test_stats_counts_levels(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
lgr = self.lm.service_loggers['svc']
lgr.info('info msg')
lgr.error('error msg')
# Flush handlers
for h in lgr.handlers:
h.flush()
stats = self.lm.get_log_statistics('svc')
self.assertIn('svc', stats)
def test_stats_for_nonexistent_service_returns_error_key(self):
# A service that was never added has no log file
stats = self.lm.get_log_statistics('nosuch')
self.assertIn('nosuch', stats)
self.assertIn('error', stats['nosuch'])
class TestGetLogFileInfo(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_returns_error_for_missing_log(self):
info = self.lm.get_log_file_info('nosuchservice')
self.assertIn('error', info)
def test_returns_file_info_for_existing_log(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
lgr = self.lm.service_loggers['svc']
lgr.info('test')
for h in lgr.handlers:
h.flush()
info = self.lm.get_log_file_info('svc')
self.assertIn('file_path', info)
self.assertIn('exists', info)
self.assertTrue(info['exists'])
class TestSetServiceLevel(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_set_level_for_known_service(self):
import logging
self.lm.add_service_logger('svc', {'level': 'INFO'})
self.lm.set_service_level('svc', 'DEBUG')
lgr = self.lm.service_loggers['svc']
self.assertEqual(lgr.level, logging.DEBUG)
def test_set_level_for_unknown_service_does_not_raise(self):
self.lm.set_service_level('nosuch', 'DEBUG') # must not raise
def test_get_service_levels_returns_dict(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
levels = self.lm.get_service_levels()
self.assertIsInstance(levels, dict)
self.assertIn('svc', levels)
def test_set_level_also_sets_bare_module_logger(self):
"""A verbosity change for 'network' must reach BOTH picell.network and
the network_manager module logger so per-service files capture every
record (the core bug)."""
import logging
self.lm.add_service_logger('network', {'level': 'INFO'})
self.lm.set_service_level('network', 'DEBUG')
self.assertEqual(self.lm.service_loggers['network'].level, logging.DEBUG)
self.assertEqual(logging.getLogger('network_manager').level, logging.DEBUG)
def test_set_level_for_routing_covers_firewall_module_logger(self):
import logging
self.lm.add_service_logger('routing', {'level': 'INFO'})
self.lm.set_service_level('routing', 'DEBUG')
self.assertEqual(logging.getLogger('routing_manager').level, logging.DEBUG)
self.assertEqual(logging.getLogger('firewall_manager').level, logging.DEBUG)
class TestGetAllLogFileInfos(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_returns_list(self):
result = self.lm.get_all_log_file_infos()
self.assertIsInstance(result, list)
def test_includes_active_log_file(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
lgr = self.lm.service_loggers['svc']
lgr.info('test')
for h in lgr.handlers:
h.flush()
result = self.lm.get_all_log_file_infos()
names = [e['file'] for e in result]
self.assertIn('svc.log', names)
def test_includes_rotated_log_file(self):
# Create a fake rotated log file
rotated = os.path.join(str(self.lm.log_dir), 'svc.log.1')
with open(rotated, 'w') as f:
f.write('old log\n')
result = self.lm.get_all_log_file_infos()
names = [e['file'] for e in result]
self.assertIn('svc.log.1', names)
class TestCompressOldLogs(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_compress_old_logs_compresses_rotated_file(self):
rotated = os.path.join(str(self.lm.log_dir), 'svc.log.1')
with open(rotated, 'w') as f:
f.write('old log content\n')
self.lm.compress_old_logs()
gz_path = rotated + '.gz'
self.assertTrue(os.path.exists(gz_path))
self.assertFalse(os.path.exists(rotated))
def test_compress_old_logs_skips_already_compressed(self):
gz_path = os.path.join(str(self.lm.log_dir), 'svc.log.1.gz')
with gzip.open(gz_path, 'wb') as f:
f.write(b'already compressed')
self.lm.compress_old_logs()
# Original gz should still exist
self.assertTrue(os.path.exists(gz_path))
class TestStop(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp)
def test_stop_sets_running_false(self):
self.lm.stop()
self.assertFalse(self.lm.running)
def test_stop_closes_all_handlers(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
self.lm.stop() # must not raise
if __name__ == '__main__':
unittest.main()