d5018c2b34
Sprint 1 — Security & correctness:
- Restore all 10 commented-out is_local_request() checks (vault, containers, images, volumes)
- Fix XFF spoofing: only trust the LAST X-Forwarded-For entry (Caddy's append), not all
- Require prefix length in wireguard.address (was accepting bare IPs like 10.0.0.1)
- Validate service_access list in add_peer (valid: calendar/files/mail/webdav)
- Fix dhcp/reservations POST/DELETE: unpack mac/ip/hostname from body (was passing dict as positional arg)
- Fix network/test POST: remove spurious data arg (test_connectivity takes no args)
- Fix remove_peer: clear iptables rules and regenerate DNS ACLs on deletion (was leaving stale rules)
- Fix CoreDNS reload: SIGHUP → SIGUSR1 (SIGHUP kills the process; SIGUSR1 triggers reload plugin)
- Remove local.{domain} block from Corefile template (local.zone doesn't exist, caused log spam)
- Fix routing_manager._remove_nat_rule: targeted -D instead of flushing entire POSTROUTING chain
Sprint 2 — State consistency:
- Atomic config writes in config_manager, ip_utils, firewall_manager, network_manager
(write to .tmp → fsync → os.replace, prevents truncated files on kill)
- backup_config: now also backs up Caddyfile, Corefile, .env, DNS zone files
- restore_config: restores all of the above so config stays consistent after restore
Sprint 3 — Dead code / documentation:
- Remove CellManager instantiation from app startup (was never called, double-instantiated all managers)
- Document routing_manager scope (targets host, not cell-wireguard; methods not called by any active route)
Sprint 4 — Test infrastructure:
- Add tests/conftest.py with shared tmp_dir, tmp_config_dir, tmp_data_dir, flask_client fixtures
- Add tests/test_config_validation.py: 400 paths for ip_range, port, wireguard.address validation
- Add tests/test_ip_utils_caddyfile.py: 14 tests for write_caddyfile (was completely untested)
- Expand test_app_misc.py: 7 new is_local_request tests covering XFF spoofing and cell-network IPs
- Add --cov-fail-under=70 to make test-coverage
- Add pre-commit hook that runs pytest before every commit
414 tests pass (was 372).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
174 lines
7.7 KiB
Python
174 lines
7.7 KiB
Python
import sys
|
|
from pathlib import Path
|
|
|
|
# Add api directory to path
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
import unittest
|
|
from unittest.mock import patch, MagicMock
|
|
import threading
|
|
import time
|
|
import os
|
|
import sys
|
|
import types
|
|
import builtins
|
|
|
|
# Patch LOG_LEVEL and LOG_FILE in environment before importing app_module
|
|
# os.environ['LOG_LEVEL'] = 'INFO'
|
|
# os.environ['LOG_FILE'] = 'test.log'
|
|
|
|
# Patch manager classes in builtins before importing api.app
|
|
manager_names = [
|
|
'NetworkManager', 'WireGuardManager', 'PeerRegistry', 'EmailManager',
|
|
'CalendarManager', 'FileManager', 'RoutingManager', 'CellManager', 'VaultManager', 'ContainerManager'
|
|
]
|
|
for name in manager_names:
|
|
setattr(builtins, name, MagicMock)
|
|
|
|
builtins.LOG_LEVEL = 'INFO' # type: ignore[attr-defined]
|
|
builtins.LOG_FILE = 'test.log' # type: ignore[attr-defined]
|
|
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '../api'))
|
|
import app as app_module
|
|
|
|
# LOG_LEVEL = 'INFO'
|
|
# LOG_FILE = 'test.log'
|
|
|
|
class TestAppMisc(unittest.TestCase):
|
|
def setUp(self):
|
|
# Patch managers to avoid side effects
|
|
self.patches = [
|
|
patch.object(app_module, 'network_manager', MagicMock()),
|
|
patch.object(app_module, 'wireguard_manager', MagicMock()),
|
|
patch.object(app_module, 'peer_registry', MagicMock()),
|
|
patch.object(app_module, 'email_manager', MagicMock()),
|
|
patch.object(app_module, 'calendar_manager', MagicMock()),
|
|
patch.object(app_module, 'file_manager', MagicMock()),
|
|
patch.object(app_module, 'routing_manager', MagicMock()),
|
|
patch.object(app_module, 'container_manager', MagicMock()),
|
|
]
|
|
for p in self.patches:
|
|
p.start()
|
|
# Patch vault_manager on app (setattr to avoid linter error)
|
|
self._original_vault_manager = getattr(app_module.app, 'vault_manager', None) # type: ignore[attr-defined]
|
|
setattr(app_module.app, 'vault_manager', MagicMock()) # type: ignore[attr-defined]
|
|
|
|
def tearDown(self):
|
|
for p in self.patches:
|
|
p.stop()
|
|
# Remove or restore vault_manager
|
|
if self._original_vault_manager is not None:
|
|
setattr(app_module.app, 'vault_manager', self._original_vault_manager) # type: ignore[attr-defined]
|
|
else:
|
|
delattr(app_module.app, 'vault_manager') # type: ignore[attr-defined]
|
|
|
|
def test_health_monitor_thread_runs(self):
|
|
# Patch health_history and service_alert_counters
|
|
with patch.object(app_module, 'health_history', new=[]), \
|
|
patch.object(app_module, 'service_alert_counters', new={}), \
|
|
app_module.app.app_context():
|
|
# Patch managers to return healthy
|
|
app_module.network_manager.get_status.return_value = {'ok': True}
|
|
app_module.wireguard_manager.get_status.return_value = {'ok': True}
|
|
app_module.email_manager.get_status.return_value = {'ok': True}
|
|
app_module.calendar_manager.get_status.return_value = {'ok': True}
|
|
app_module.file_manager.get_status.return_value = {'ok': True}
|
|
app_module.routing_manager.get_status.return_value = {'ok': True}
|
|
app_module.app.vault_manager.get_status.return_value = {'ok': True}
|
|
# Run one health check
|
|
result = app_module.perform_health_check()
|
|
self.assertIn('network', result)
|
|
self.assertIn('alerts', result)
|
|
|
|
def test_enrich_log_context_sets_context(self):
|
|
# Simulate Flask request context
|
|
class DummyRequest:
|
|
remote_addr = '127.0.0.1'
|
|
method = 'GET'
|
|
path = '/test'
|
|
headers = {}
|
|
user = type('User', (), {'id': 'user1'})()
|
|
with patch('app.request', new=DummyRequest()):
|
|
app_module.enrich_log_context()
|
|
ctx = app_module.request_context.get()
|
|
self.assertEqual(ctx['client_ip'], '127.0.0.1')
|
|
self.assertEqual(ctx['method'], 'GET')
|
|
self.assertEqual(ctx['path'], '/test')
|
|
self.assertEqual(ctx['user'], 'user1')
|
|
|
|
def _req(self, remote_addr, xff=''):
|
|
class R:
|
|
pass
|
|
r = R()
|
|
r.remote_addr = remote_addr
|
|
r.headers = {'X-Forwarded-For': xff} if xff else {}
|
|
return r
|
|
|
|
def test_is_local_request_loopback(self):
|
|
with patch('app.request', new=self._req('127.0.0.1')):
|
|
self.assertTrue(app_module.is_local_request())
|
|
|
|
def test_is_local_request_public_ip(self):
|
|
with patch('app.request', new=self._req('8.8.8.8')):
|
|
self.assertFalse(app_module.is_local_request())
|
|
|
|
def test_is_local_request_private_ip(self):
|
|
with patch('app.request', new=self._req('192.168.1.5')):
|
|
self.assertTrue(app_module.is_local_request())
|
|
|
|
def test_is_local_request_xff_spoof_rejected(self):
|
|
# Client sends X-Forwarded-For: 127.0.0.1 but actual IP is public
|
|
# Old code would trust the first XFF entry — fixed to trust only last
|
|
with patch('app.request', new=self._req('8.8.8.8', xff='127.0.0.1, 8.8.8.8')):
|
|
self.assertFalse(app_module.is_local_request())
|
|
|
|
def test_is_local_request_xff_last_entry_local(self):
|
|
# Caddy appends the real client IP; last entry is local → allow
|
|
with patch('app.request', new=self._req('8.8.8.8', xff='8.8.8.8, 192.168.1.10')):
|
|
self.assertTrue(app_module.is_local_request())
|
|
|
|
def test_is_local_request_xff_single_public_rejected(self):
|
|
with patch('app.request', new=self._req('8.8.8.8', xff='1.2.3.4')):
|
|
self.assertFalse(app_module.is_local_request())
|
|
|
|
def test_is_local_request_cell_network_ip(self):
|
|
# 172.20.0.10 is the API container's IP — should be allowed
|
|
with patch('app.request', new=self._req('172.20.0.10')):
|
|
self.assertTrue(app_module.is_local_request())
|
|
|
|
def test_health_check_exception(self):
|
|
# Patch datetime to raise exception
|
|
with patch('app.datetime') as mock_dt, app_module.app.app_context():
|
|
mock_dt.utcnow.side_effect = Exception('fail')
|
|
client = app_module.app.test_client()
|
|
response = client.get('/health')
|
|
self.assertIn(response.status_code, (200, 500))
|
|
data = response.get_json(silent=True)
|
|
# Accept either a valid JSON with 'error' or None
|
|
if data is not None and response.status_code == 500:
|
|
self.assertIn('error', data)
|
|
|
|
def test_get_cell_status_exception(self):
|
|
with app_module.app.app_context():
|
|
app_module.network_manager.get_status.side_effect = Exception('fail')
|
|
client = app_module.app.test_client()
|
|
response = client.get('/api/status')
|
|
# The route handles per-service exceptions internally and returns 200
|
|
# with per-service error info; only outer failures yield 500
|
|
self.assertIn(response.status_code, (200, 500))
|
|
data = response.get_json(silent=True)
|
|
self.assertIsNotNone(data)
|
|
|
|
def test_get_config_exception(self):
|
|
with patch('app.datetime') as mock_dt, app_module.app.app_context():
|
|
mock_dt.utcnow.side_effect = Exception('fail')
|
|
client = app_module.app.test_client()
|
|
response = client.get('/api/config')
|
|
self.assertIn(response.status_code, (200, 500))
|
|
data = response.get_json(silent=True)
|
|
# Accept either a valid config dict or an error
|
|
if data is not None and response.status_code == 500:
|
|
self.assertIn('error', data)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |