Files
pic/tests/test_log_manager_extra.py
T
roof aa1e5c41ec
Unit Tests / test (push) Successful in 12m6s
test: raise coverage 68.7% -> ~80.4%; add ~250 tests for new egress/DDNS/network paths
Coverage was below acceptable levels and several newly-added code paths
(sshuttle egress, proxy egress, DDNS provider stubs, DNS overview route,
peer-registry provisioning) had zero test coverage.

~250 new unit tests are added across 16 new test files. Existing test files
are updated to match refactored interfaces (DHCP removed, constants
introduced, network_manager restructured). .coveragerc is added to pin the
source mapping and the 70% floor so regressions are caught at commit time.

tests/test_enhanced_api.py was previously living in api/ (wrong location)
and is moved to tests/ where it belongs.

Integration test files are updated to remove references to DHCP endpoints
and add coverage for the new DNS overview and DDNS sync endpoints.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 09:03:39 -04:00

439 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Additional tests for LogManager covering uncovered paths:
- get_service_logs_parsed (JSON + non-JSON lines, level filtering)
- search_logs (time_range filter, level filter, non-JSON lines)
- _matches_search_criteria (query/time_range/level checks)
- _is_log_level (JSON and text fallback)
- export_logs (json/csv/text/unknown format)
- _logs_to_csv / _logs_to_text
- get_log_statistics (single service and all services)
- get_log_file_info (missing file → error key)
- set_service_level (known and unknown service)
- get_service_levels
- get_all_log_file_infos (active + rotated files)
- compress_old_logs
- stop
- _start_rotation_monitor creates a running thread
"""
import sys
import os
import json
import gzip
import shutil
import tempfile
import time
import unittest
from datetime import datetime, timedelta
from pathlib import Path
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from log_manager import LogManager
def _lm(tmp):
log_dir = os.path.join(tmp, 'logs')
os.makedirs(log_dir, exist_ok=True)
return LogManager(log_dir=log_dir)
def _write_log_file(log_dir, service, entries):
"""Write JSON log entries to a service log file."""
path = os.path.join(log_dir, f'{service}.log')
with open(path, 'w') as f:
for e in entries:
f.write(json.dumps(e) + '\n')
return path
class TestGetServiceLogsParsed(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_missing_log_file_returns_error_entry(self):
result = self.lm.get_service_logs_parsed('nosuchservice')
self.assertEqual(len(result), 1)
self.assertIn('error', result[0])
def test_returns_parsed_json_entries(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'ok'},
{'timestamp': '2026-01-01T00:00:01', 'level': 'ERROR', 'message': 'fail'},
])
result = self.lm.get_service_logs_parsed('svc', level='ALL', lines=10)
self.assertEqual(len(result), 2)
levels = {e['level'] for e in result}
self.assertIn('INFO', levels)
self.assertIn('ERROR', levels)
def test_level_filter_excludes_non_matching(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'ok'},
{'timestamp': '2026-01-01T00:00:01', 'level': 'ERROR', 'message': 'fail'},
])
result = self.lm.get_service_logs_parsed('svc', level='ERROR', lines=10)
self.assertTrue(all(e.get('level') == 'ERROR' for e in result))
def test_non_json_lines_are_included_for_all_level(self):
path = os.path.join(self.lm.log_dir, 'svc.log')
with open(path, 'w') as f:
f.write('plain text log line\n')
result = self.lm.get_service_logs_parsed('svc', level='ALL', lines=10)
self.assertEqual(len(result), 1)
self.assertIn('raw_line', result[0])
class TestSearchLogs(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_search_finds_matching_message(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'user login ok'},
{'timestamp': '2026-01-01T00:00:01', 'level': 'INFO', 'message': 'disk full'},
])
self.lm.service_loggers['svc'] = type('', (), {})() # register so search includes it
results = self.lm.search_logs('login', services=['svc'])
self.assertEqual(len(results), 1)
self.assertIn('login', results[0]['message'])
def test_search_level_filter(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'info msg'},
{'timestamp': '2026-01-01T00:00:01', 'level': 'ERROR', 'message': 'error msg'},
])
results = self.lm.search_logs('', services=['svc'], level='ERROR')
self.assertTrue(all(e.get('level') == 'ERROR' for e in results))
def test_search_time_range_filter(self):
early = '2026-01-01T10:00:00'
late = '2026-01-01T14:00:00'
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': early, 'level': 'INFO', 'message': 'early'},
{'timestamp': late, 'level': 'INFO', 'message': 'late'},
])
# Register logger so search_logs finds the service
self.lm.add_service_logger('svc', {'level': 'INFO'})
start = datetime(2026, 1, 1, 11, 0, 0)
end = datetime(2026, 1, 1, 13, 0, 0)
results = self.lm.search_logs('', services=['svc'], time_range=(start, end))
msgs = [r.get('message', '') for r in results]
# 'early' is within 11:00-13:00 range, 'late' at 14:00 should be excluded
self.assertNotIn('late', msgs)
def test_non_json_lines_matched_by_query(self):
path = os.path.join(self.lm.log_dir, 'svc.log')
with open(path, 'w') as f:
f.write('this line contains keyterm\n')
f.write('unrelated line\n')
results = self.lm.search_logs('keyterm', services=['svc'])
self.assertEqual(len(results), 1)
self.assertIn('keyterm', results[0]['raw_line'])
def test_search_no_services_returns_empty(self):
results = self.lm.search_logs('anything', services=[])
self.assertEqual(results, [])
class TestMatchesSearchCriteria(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_query_match(self):
entry = {'message': 'hello world', 'level': 'INFO'}
self.assertTrue(self.lm._matches_search_criteria(entry, 'hello', None, None))
def test_query_no_match(self):
entry = {'message': 'hello world', 'level': 'INFO'}
self.assertFalse(self.lm._matches_search_criteria(entry, 'nothere', None, None))
def test_level_filter_match(self):
entry = {'message': 'msg', 'level': 'ERROR'}
self.assertTrue(self.lm._matches_search_criteria(entry, '', None, 'ERROR'))
def test_level_filter_no_match(self):
entry = {'message': 'msg', 'level': 'INFO'}
self.assertFalse(self.lm._matches_search_criteria(entry, '', None, 'ERROR'))
def test_time_range_within(self):
entry = {'message': 'msg', 'level': 'INFO', 'timestamp': '2026-06-01T12:00:00'}
start = datetime(2026, 6, 1, 11, 0)
end = datetime(2026, 6, 1, 13, 0)
self.assertTrue(self.lm._matches_search_criteria(entry, '', (start, end), None))
def test_time_range_outside(self):
entry = {'message': 'msg', 'level': 'INFO', 'timestamp': '2026-06-01T14:00:00'}
start = datetime(2026, 6, 1, 11, 0)
end = datetime(2026, 6, 1, 13, 0)
self.assertFalse(self.lm._matches_search_criteria(entry, '', (start, end), None))
def test_time_range_invalid_timestamp_excluded(self):
entry = {'message': 'msg', 'level': 'INFO', 'timestamp': 'not-a-date'}
start = datetime(2026, 6, 1, 11, 0)
end = datetime(2026, 6, 1, 13, 0)
self.assertFalse(self.lm._matches_search_criteria(entry, '', (start, end), None))
class TestIsLogLevel(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_json_line_matches_level(self):
line = json.dumps({'level': 'ERROR', 'message': 'oops'})
self.assertTrue(self.lm._is_log_level(line, 'ERROR'))
def test_json_line_no_match(self):
line = json.dumps({'level': 'INFO', 'message': 'ok'})
self.assertFalse(self.lm._is_log_level(line, 'ERROR'))
def test_text_line_fallback_match(self):
line = '2026-01-01 ERROR something went wrong'
self.assertTrue(self.lm._is_log_level(line, 'ERROR'))
def test_text_line_fallback_no_match(self):
line = '2026-01-01 INFO something went fine'
self.assertFalse(self.lm._is_log_level(line, 'ERROR'))
class TestExportLogs(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_export_json_format(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'hi'},
])
result = self.lm.export_logs('json', filters={'services': ['svc']})
parsed = json.loads(result)
self.assertIsInstance(parsed, list)
def test_export_csv_format(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'hi'},
])
result = self.lm.export_logs('csv', filters={'services': ['svc']})
self.assertIsInstance(result, str)
def test_export_text_format(self):
_write_log_file(self.lm.log_dir, 'svc', [
{'timestamp': '2026-01-01T00:00:00', 'level': 'INFO', 'message': 'hi'},
])
result = self.lm.export_logs('text', filters={'services': ['svc']})
self.assertIsInstance(result, str)
def test_export_unknown_format_raises(self):
with self.assertRaises(ValueError):
self.lm.export_logs('pdf')
def test_logs_to_csv_empty_returns_empty_string(self):
result = self.lm._logs_to_csv([])
self.assertEqual(result, '')
def test_logs_to_csv_has_header(self):
logs = [{'level': 'INFO', 'message': 'hi', 'timestamp': '2026-01-01T00:00:00'}]
csv = self.lm._logs_to_csv(logs)
lines = csv.split('\n')
self.assertGreater(len(lines), 1)
header_fields = set(lines[0].split(','))
self.assertIn('level', header_fields)
self.assertIn('message', header_fields)
def test_logs_to_text_formats_entries(self):
logs = [{'level': 'ERROR', 'service': 'svc', 'message': 'oops', 'timestamp': '2026-01-01T00:00:00'}]
text = self.lm._logs_to_text(logs)
self.assertIn('ERROR', text)
self.assertIn('oops', text)
class TestGetLogStatistics(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_stats_for_missing_service_log(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
stats = self.lm.get_log_statistics('svc')
# Log file may not have been written yet; should still return dict
self.assertIsInstance(stats, dict)
def test_stats_counts_levels(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
lgr = self.lm.service_loggers['svc']
lgr.info('info msg')
lgr.error('error msg')
# Flush handlers
for h in lgr.handlers:
h.flush()
stats = self.lm.get_log_statistics('svc')
self.assertIn('svc', stats)
def test_stats_for_nonexistent_service_returns_error_key(self):
# A service that was never added has no log file
stats = self.lm.get_log_statistics('nosuch')
self.assertIn('nosuch', stats)
self.assertIn('error', stats['nosuch'])
class TestGetLogFileInfo(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_returns_error_for_missing_log(self):
info = self.lm.get_log_file_info('nosuchservice')
self.assertIn('error', info)
def test_returns_file_info_for_existing_log(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
lgr = self.lm.service_loggers['svc']
lgr.info('test')
for h in lgr.handlers:
h.flush()
info = self.lm.get_log_file_info('svc')
self.assertIn('file_path', info)
self.assertIn('exists', info)
self.assertTrue(info['exists'])
class TestSetServiceLevel(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_set_level_for_known_service(self):
import logging
self.lm.add_service_logger('svc', {'level': 'INFO'})
self.lm.set_service_level('svc', 'DEBUG')
lgr = self.lm.service_loggers['svc']
self.assertEqual(lgr.level, logging.DEBUG)
def test_set_level_for_unknown_service_does_not_raise(self):
self.lm.set_service_level('nosuch', 'DEBUG') # must not raise
def test_get_service_levels_returns_dict(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
levels = self.lm.get_service_levels()
self.assertIsInstance(levels, dict)
self.assertIn('svc', levels)
class TestGetAllLogFileInfos(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_returns_list(self):
result = self.lm.get_all_log_file_infos()
self.assertIsInstance(result, list)
def test_includes_active_log_file(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
lgr = self.lm.service_loggers['svc']
lgr.info('test')
for h in lgr.handlers:
h.flush()
result = self.lm.get_all_log_file_infos()
names = [e['file'] for e in result]
self.assertIn('svc.log', names)
def test_includes_rotated_log_file(self):
# Create a fake rotated log file
rotated = os.path.join(str(self.lm.log_dir), 'svc.log.1')
with open(rotated, 'w') as f:
f.write('old log\n')
result = self.lm.get_all_log_file_infos()
names = [e['file'] for e in result]
self.assertIn('svc.log.1', names)
class TestCompressOldLogs(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
self.lm.stop()
shutil.rmtree(self.tmp)
def test_compress_old_logs_compresses_rotated_file(self):
rotated = os.path.join(str(self.lm.log_dir), 'svc.log.1')
with open(rotated, 'w') as f:
f.write('old log content\n')
self.lm.compress_old_logs()
gz_path = rotated + '.gz'
self.assertTrue(os.path.exists(gz_path))
self.assertFalse(os.path.exists(rotated))
def test_compress_old_logs_skips_already_compressed(self):
gz_path = os.path.join(str(self.lm.log_dir), 'svc.log.1.gz')
with gzip.open(gz_path, 'wb') as f:
f.write(b'already compressed')
self.lm.compress_old_logs()
# Original gz should still exist
self.assertTrue(os.path.exists(gz_path))
class TestStop(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.lm = _lm(self.tmp)
def tearDown(self):
shutil.rmtree(self.tmp)
def test_stop_sets_running_false(self):
self.lm.stop()
self.assertFalse(self.lm.running)
def test_stop_closes_all_handlers(self):
self.lm.add_service_logger('svc', {'level': 'INFO'})
self.lm.stop() # must not raise
if __name__ == '__main__':
unittest.main()