fix: architecture audit — security, atomicity, broken endpoints, test coverage
Sprint 1 — Security & correctness:
- Restore all 10 commented-out is_local_request() checks (vault, containers, images, volumes)
- Fix XFF spoofing: only trust the LAST X-Forwarded-For entry (Caddy's append), not all
- Require prefix length in wireguard.address (was accepting bare IPs like 10.0.0.1)
- Validate service_access list in add_peer (valid: calendar/files/mail/webdav)
- Fix dhcp/reservations POST/DELETE: unpack mac/ip/hostname from body (was passing dict as positional arg)
- Fix network/test POST: remove spurious data arg (test_connectivity takes no args)
- Fix remove_peer: clear iptables rules and regenerate DNS ACLs on deletion (was leaving stale rules)
- Fix CoreDNS reload: SIGHUP → SIGUSR1 (SIGHUP kills the process; SIGUSR1 triggers reload plugin)
- Remove local.{domain} block from Corefile template (local.zone doesn't exist, caused log spam)
- Fix routing_manager._remove_nat_rule: targeted -D instead of flushing entire POSTROUTING chain
Sprint 2 — State consistency:
- Atomic config writes in config_manager, ip_utils, firewall_manager, network_manager
(write to .tmp → fsync → os.replace, prevents truncated files on kill)
- backup_config: now also backs up Caddyfile, Corefile, .env, DNS zone files
- restore_config: restores all of the above so config stays consistent after restore
Sprint 3 — Dead code / documentation:
- Remove CellManager instantiation from app startup (was never called, double-instantiated all managers)
- Document routing_manager scope (targets host, not cell-wireguard; methods not called by any active route)
Sprint 4 — Test infrastructure:
- Add tests/conftest.py with shared tmp_dir, tmp_config_dir, tmp_data_dir, flask_client fixtures
- Add tests/test_config_validation.py: 400 paths for ip_range, port, wireguard.address validation
- Add tests/test_ip_utils_caddyfile.py: 14 tests for write_caddyfile (was completely untested)
- Expand test_app_misc.py: 7 new is_local_request tests covering XFF spoofing and cell-network IPs
- Add --cov-fail-under=70 to make test-coverage
- Add pre-commit hook that runs pytest before every commit
414 tests pass (was 372).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,45 @@
|
||||
"""
|
||||
Shared pytest fixtures for the PIC test suite.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import tempfile
|
||||
import shutil
|
||||
import pytest
|
||||
|
||||
# Ensure api/ is on the path for all tests
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_dir():
|
||||
"""Temporary directory that is cleaned up after each test."""
|
||||
d = tempfile.mkdtemp()
|
||||
yield d
|
||||
shutil.rmtree(d, ignore_errors=True)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_config_dir(tmp_dir):
|
||||
"""Temporary config dir with the sub-directories expected by managers."""
|
||||
for sub in ('api', 'caddy', 'dns', 'dhcp', 'ntp', 'wireguard'):
|
||||
os.makedirs(os.path.join(tmp_dir, sub), exist_ok=True)
|
||||
return tmp_dir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_data_dir(tmp_dir):
|
||||
"""Temporary data dir with the sub-directories expected by managers."""
|
||||
for sub in ('dns', 'mail', 'calendar', 'files', 'wireguard'):
|
||||
os.makedirs(os.path.join(tmp_dir, sub), exist_ok=True)
|
||||
return tmp_dir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def flask_client():
|
||||
"""Flask test client with TESTING mode enabled."""
|
||||
from app import app
|
||||
app.config['TESTING'] = True
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
@@ -141,17 +141,23 @@ class TestAPIEndpoints(unittest.TestCase):
|
||||
mock_network.add_dhcp_reservation.return_value = True
|
||||
response = self.client.post('/api/dhcp/reservations', data=json.dumps({'ip': '10.0.0.2', 'mac': '00:11:22:33:44:55'}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
# Simulate error
|
||||
mock_network.add_dhcp_reservation.side_effect = Exception('fail')
|
||||
# Missing mac field → 400, not 500
|
||||
response = self.client.post('/api/dhcp/reservations', data=json.dumps({'ip': '10.0.0.2'}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
# Simulate manager error
|
||||
mock_network.add_dhcp_reservation.side_effect = Exception('fail')
|
||||
response = self.client.post('/api/dhcp/reservations', data=json.dumps({'ip': '10.0.0.2', 'mac': '00:11:22:33:44:55'}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
# Mock remove_dhcp_reservation
|
||||
mock_network.remove_dhcp_reservation.return_value = True
|
||||
response = self.client.delete('/api/dhcp/reservations', data=json.dumps({'ip': '10.0.0.2'}), content_type='application/json')
|
||||
response = self.client.delete('/api/dhcp/reservations', data=json.dumps({'mac': '00:11:22:33:44:55'}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
# Simulate error
|
||||
mock_network.remove_dhcp_reservation.side_effect = Exception('fail')
|
||||
# Missing mac → 400
|
||||
response = self.client.delete('/api/dhcp/reservations', data=json.dumps({'ip': '10.0.0.2'}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
# Simulate manager error
|
||||
mock_network.remove_dhcp_reservation.side_effect = Exception('fail')
|
||||
response = self.client.delete('/api/dhcp/reservations', data=json.dumps({'mac': '00:11:22:33:44:55'}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
|
||||
@patch('app.network_manager')
|
||||
|
||||
+37
-10
@@ -45,7 +45,6 @@ class TestAppMisc(unittest.TestCase):
|
||||
patch.object(app_module, 'calendar_manager', MagicMock()),
|
||||
patch.object(app_module, 'file_manager', MagicMock()),
|
||||
patch.object(app_module, 'routing_manager', MagicMock()),
|
||||
patch.object(app_module, 'cell_manager', MagicMock()),
|
||||
patch.object(app_module, 'container_manager', MagicMock()),
|
||||
]
|
||||
for p in self.patches:
|
||||
@@ -97,18 +96,46 @@ class TestAppMisc(unittest.TestCase):
|
||||
self.assertEqual(ctx['path'], '/test')
|
||||
self.assertEqual(ctx['user'], 'user1')
|
||||
|
||||
def test_is_local_request(self):
|
||||
class DummyRequest:
|
||||
remote_addr = '127.0.0.1'
|
||||
headers = {}
|
||||
with patch('app.request', new=DummyRequest()):
|
||||
def _req(self, remote_addr, xff=''):
|
||||
class R:
|
||||
pass
|
||||
r = R()
|
||||
r.remote_addr = remote_addr
|
||||
r.headers = {'X-Forwarded-For': xff} if xff else {}
|
||||
return r
|
||||
|
||||
def test_is_local_request_loopback(self):
|
||||
with patch('app.request', new=self._req('127.0.0.1')):
|
||||
self.assertTrue(app_module.is_local_request())
|
||||
class DummyRequest2:
|
||||
remote_addr = '8.8.8.8'
|
||||
headers = {}
|
||||
with patch('app.request', new=DummyRequest2()):
|
||||
|
||||
def test_is_local_request_public_ip(self):
|
||||
with patch('app.request', new=self._req('8.8.8.8')):
|
||||
self.assertFalse(app_module.is_local_request())
|
||||
|
||||
def test_is_local_request_private_ip(self):
|
||||
with patch('app.request', new=self._req('192.168.1.5')):
|
||||
self.assertTrue(app_module.is_local_request())
|
||||
|
||||
def test_is_local_request_xff_spoof_rejected(self):
|
||||
# Client sends X-Forwarded-For: 127.0.0.1 but actual IP is public
|
||||
# Old code would trust the first XFF entry — fixed to trust only last
|
||||
with patch('app.request', new=self._req('8.8.8.8', xff='127.0.0.1, 8.8.8.8')):
|
||||
self.assertFalse(app_module.is_local_request())
|
||||
|
||||
def test_is_local_request_xff_last_entry_local(self):
|
||||
# Caddy appends the real client IP; last entry is local → allow
|
||||
with patch('app.request', new=self._req('8.8.8.8', xff='8.8.8.8, 192.168.1.10')):
|
||||
self.assertTrue(app_module.is_local_request())
|
||||
|
||||
def test_is_local_request_xff_single_public_rejected(self):
|
||||
with patch('app.request', new=self._req('8.8.8.8', xff='1.2.3.4')):
|
||||
self.assertFalse(app_module.is_local_request())
|
||||
|
||||
def test_is_local_request_cell_network_ip(self):
|
||||
# 172.20.0.10 is the API container's IP — should be allowed
|
||||
with patch('app.request', new=self._req('172.20.0.10')):
|
||||
self.assertTrue(app_module.is_local_request())
|
||||
|
||||
def test_health_check_exception(self):
|
||||
# Patch datetime to raise exception
|
||||
with patch('app.datetime') as mock_dt, app_module.app.app_context():
|
||||
|
||||
@@ -0,0 +1,174 @@
|
||||
"""
|
||||
Tests for PUT /api/config input validation (400 paths).
|
||||
These are the highest-risk untested paths: the only server-side guard against
|
||||
bad subnet/port values entering persistent config.
|
||||
"""
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
|
||||
|
||||
|
||||
def _make_client():
|
||||
from app import app
|
||||
app.config['TESTING'] = True
|
||||
return app.test_client()
|
||||
|
||||
|
||||
def _put(client, payload):
|
||||
return client.put(
|
||||
'/api/config',
|
||||
data=json.dumps(payload),
|
||||
content_type='application/json',
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ip_range validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestIpRangeValidation(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.client = _make_client()
|
||||
|
||||
def test_non_rfc1918_returns_400(self):
|
||||
r = _put(self.client, {'ip_range': '1.2.3.0/24'})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
body = json.loads(r.data)
|
||||
self.assertIn('error', body)
|
||||
self.assertIn('RFC-1918', body['error'])
|
||||
|
||||
def test_172_0_subnet_returns_400(self):
|
||||
# 172.0.0.0/24 is NOT in 172.16.0.0/12 — was the bug on the dev machine
|
||||
r = _put(self.client, {'ip_range': '172.0.0.0/24'})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_172_15_subnet_returns_400(self):
|
||||
# One prefix below the 172.16.0.0/12 boundary
|
||||
r = _put(self.client, {'ip_range': '172.15.0.0/24'})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_172_32_subnet_returns_400(self):
|
||||
# One prefix above the 172.31.255.255 boundary
|
||||
r = _put(self.client, {'ip_range': '172.32.0.0/24'})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_public_ip_returns_400(self):
|
||||
r = _put(self.client, {'ip_range': '8.8.0.0/16'})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_172_16_exact_boundary_accepted(self):
|
||||
# 172.16.0.0/12 is the exact lower boundary — must be valid
|
||||
r = _put(self.client, {'ip_range': '172.16.0.0/12'})
|
||||
# 200 or 202 — just not 400
|
||||
self.assertNotEqual(r.status_code, 400)
|
||||
|
||||
def test_10_network_accepted(self):
|
||||
r = _put(self.client, {'ip_range': '10.0.0.0/8'})
|
||||
self.assertNotEqual(r.status_code, 400)
|
||||
|
||||
def test_192_168_network_accepted(self):
|
||||
r = _put(self.client, {'ip_range': '192.168.0.0/16'})
|
||||
self.assertNotEqual(r.status_code, 400)
|
||||
|
||||
def test_invalid_cidr_syntax_returns_400(self):
|
||||
r = _put(self.client, {'ip_range': 'not-a-cidr'})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Port range validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPortValidation(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.client = _make_client()
|
||||
|
||||
def test_dns_port_zero_returns_400(self):
|
||||
r = _put(self.client, {'network': {'dns_port': 0}})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
body = json.loads(r.data)
|
||||
self.assertIn('dns_port', body.get('error', ''))
|
||||
|
||||
def test_dns_port_65536_returns_400(self):
|
||||
r = _put(self.client, {'network': {'dns_port': 65536}})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_wireguard_port_zero_returns_400(self):
|
||||
r = _put(self.client, {'wireguard': {'port': 0}})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_wireguard_port_65536_returns_400(self):
|
||||
r = _put(self.client, {'wireguard': {'port': 65536}})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_wireguard_port_1_accepted(self):
|
||||
r = _put(self.client, {'wireguard': {'port': 1}})
|
||||
self.assertNotEqual(r.status_code, 400)
|
||||
|
||||
def test_wireguard_port_65535_accepted(self):
|
||||
r = _put(self.client, {'wireguard': {'port': 65535}})
|
||||
self.assertNotEqual(r.status_code, 400)
|
||||
|
||||
def test_email_smtp_port_zero_returns_400(self):
|
||||
r = _put(self.client, {'email': {'smtp_port': 0}})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_calendar_port_negative_returns_400(self):
|
||||
r = _put(self.client, {'calendar': {'port': -1}})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WireGuard address validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWireguardAddressValidation(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.client = _make_client()
|
||||
|
||||
def test_bad_wg_address_returns_400(self):
|
||||
r = _put(self.client, {'wireguard': {'address': 'not-an-ip'}})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
body = json.loads(r.data)
|
||||
self.assertIn('wireguard.address', body.get('error', ''))
|
||||
|
||||
def test_ip_without_prefix_returns_400(self):
|
||||
r = _put(self.client, {'wireguard': {'address': '10.0.0.1'}})
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_valid_wg_address_accepted(self):
|
||||
r = _put(self.client, {'wireguard': {'address': '10.0.0.1/24'}})
|
||||
self.assertNotEqual(r.status_code, 400)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Body validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBodyValidation(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.client = _make_client()
|
||||
|
||||
def test_no_body_returns_400(self):
|
||||
r = self.client.put('/api/config', content_type='application/json')
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_empty_body_returns_400(self):
|
||||
r = self.client.put('/api/config', data='', content_type='application/json')
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
def test_valid_cell_name_change_returns_200(self):
|
||||
r = _put(self.client, {'cell_name': 'testcell'})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -0,0 +1,102 @@
|
||||
"""
|
||||
Tests for ip_utils.write_caddyfile — this function is called on every
|
||||
ip_range / domain / cell_name change and was previously untested.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
|
||||
|
||||
from ip_utils import write_caddyfile, get_service_ips
|
||||
|
||||
|
||||
class TestWriteCaddyfile(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp()
|
||||
self.path = os.path.join(self.tmp, 'caddy', 'Caddyfile')
|
||||
|
||||
def _write(self, ip_range='172.20.0.0/16', cell_name='mycell', domain='cell'):
|
||||
ok = write_caddyfile(ip_range, cell_name, domain, self.path)
|
||||
self.assertTrue(ok, "write_caddyfile returned False")
|
||||
with open(self.path) as f:
|
||||
return f.read()
|
||||
|
||||
def test_creates_file_in_subdirectory(self):
|
||||
self._write()
|
||||
self.assertTrue(os.path.isfile(self.path))
|
||||
|
||||
def test_cell_domain_vhost_present(self):
|
||||
content = self._write(cell_name='mycell', domain='cell')
|
||||
self.assertIn('http://mycell.cell', content)
|
||||
|
||||
def test_custom_domain_used(self):
|
||||
content = self._write(cell_name='pic0', domain='dev')
|
||||
self.assertIn('http://pic0.dev', content)
|
||||
self.assertNotIn('mycell', content)
|
||||
self.assertNotIn('.cell', content)
|
||||
|
||||
def test_service_subdomains_use_domain(self):
|
||||
content = self._write(domain='mynet')
|
||||
self.assertIn('http://calendar.mynet', content)
|
||||
self.assertIn('http://files.mynet', content)
|
||||
self.assertIn('http://mail.mynet', content)
|
||||
self.assertIn('http://webdav.mynet', content)
|
||||
|
||||
def test_virtual_ips_match_ip_range(self):
|
||||
ip_range = '10.0.0.0/16'
|
||||
content = self._write(ip_range=ip_range)
|
||||
ips = get_service_ips(ip_range)
|
||||
self.assertIn(ips['vip_calendar'], content)
|
||||
self.assertIn(ips['vip_files'], content)
|
||||
self.assertIn(ips['vip_mail'], content)
|
||||
self.assertIn(ips['vip_webdav'], content)
|
||||
|
||||
def test_reverse_proxy_targets_are_internal_ports(self):
|
||||
content = self._write()
|
||||
self.assertIn('reverse_proxy cell-radicale:5232', content)
|
||||
self.assertIn('reverse_proxy cell-filegator:8080', content)
|
||||
self.assertIn('reverse_proxy cell-rainloop:8888', content)
|
||||
self.assertIn('reverse_proxy cell-webdav:80', content)
|
||||
|
||||
def test_api_proxy_present(self):
|
||||
content = self._write()
|
||||
self.assertIn('reverse_proxy cell-api:3000', content)
|
||||
|
||||
def test_overwrite_on_second_call(self):
|
||||
self._write(cell_name='first', domain='cell')
|
||||
content = self._write(cell_name='second', domain='cell')
|
||||
self.assertIn('second.cell', content)
|
||||
self.assertNotIn('first.cell', content)
|
||||
|
||||
def test_different_ip_ranges_produce_different_vips(self):
|
||||
c1 = self._write(ip_range='10.0.0.0/16')
|
||||
os.remove(self.path)
|
||||
c2 = self._write(ip_range='192.168.1.0/24')
|
||||
self.assertNotEqual(c1, c2)
|
||||
|
||||
def test_auto_https_off(self):
|
||||
content = self._write()
|
||||
self.assertIn('auto_https off', content)
|
||||
|
||||
def test_catchall_block_present(self):
|
||||
content = self._write()
|
||||
self.assertIn(':80 {', content)
|
||||
|
||||
def test_invalid_ip_range_returns_false(self):
|
||||
result = write_caddyfile('not-a-cidr', 'cell', 'cell', self.path)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_file_is_not_empty(self):
|
||||
self._write()
|
||||
self.assertGreater(os.path.getsize(self.path), 100)
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user