615448b875
When ip_range changes in Settings, the new subnet is now applied to: - DNS zone records (network_manager.apply_ip_range) - Caddy virtual IPs (firewall_manager.ensure_caddy_virtual_ips) - iptables per-service rules (firewall_manager.update_service_ips) - docker-compose.yml static IPs if writable (ip_utils.update_docker_compose_ips) New module ip_utils.py derives all container IPs from the subnet using fixed offsets so the entire stack stays consistent from one setting. 321 tests pass (72 new tests added for ip_utils, apply_ip_range, update_service_ips). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
328 lines
14 KiB
Python
328 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for firewall_manager — per-peer iptables rule generation and DNS ACL logic.
|
|
All docker exec calls are mocked so tests run without a live Docker environment.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
import unittest
|
|
from unittest.mock import patch, call, MagicMock
|
|
from pathlib import Path
|
|
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
import firewall_manager
|
|
|
|
|
|
def _make_peer(ip, internet=True, services=None, peers=True):
|
|
if services is None:
|
|
services = list(firewall_manager.SERVICE_IPS.keys())
|
|
return {'ip': ip, 'internet_access': internet, 'service_access': services, 'peer_access': peers}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _peer_comment
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPeerComment(unittest.TestCase):
|
|
def test_dots_replaced_with_dashes(self):
|
|
self.assertEqual(firewall_manager._peer_comment('10.0.0.2'), 'pic-peer-10-0-0-2')
|
|
|
|
def test_different_ip(self):
|
|
self.assertEqual(firewall_manager._peer_comment('192.168.1.100'), 'pic-peer-192-168-1-100')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _build_acl_block
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestBuildAclBlock(unittest.TestCase):
|
|
def test_empty_returns_empty_string(self):
|
|
self.assertEqual(firewall_manager._build_acl_block({}), '')
|
|
|
|
def test_no_blocked_peers_returns_empty(self):
|
|
blocked = {s: [] for s in firewall_manager.SERVICE_IPS}
|
|
self.assertEqual(firewall_manager._build_acl_block(blocked), '')
|
|
|
|
def test_blocked_peer_appears_in_acl(self):
|
|
blocked = {'calendar': ['10.0.0.5'], 'files': [], 'mail': [], 'webdav': []}
|
|
result = firewall_manager._build_acl_block(blocked)
|
|
self.assertIn('acl calendar.cell.', result)
|
|
self.assertIn('block net 10.0.0.5/32', result)
|
|
self.assertIn('allow net 0.0.0.0/0', result)
|
|
|
|
def test_unknown_service_skipped(self):
|
|
blocked = {'nonexistent': ['10.0.0.2']}
|
|
result = firewall_manager._build_acl_block(blocked)
|
|
self.assertEqual(result, '')
|
|
|
|
def test_multiple_peers_blocked_from_same_service(self):
|
|
blocked = {'mail': ['10.0.0.2', '10.0.0.3'], 'calendar': [], 'files': [], 'webdav': []}
|
|
result = firewall_manager._build_acl_block(blocked)
|
|
self.assertEqual(result.count('block net'), 2)
|
|
self.assertIn('10.0.0.2/32', result)
|
|
self.assertIn('10.0.0.3/32', result)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# generate_corefile
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGenerateCorefile(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.path = os.path.join(self.tmp, 'Corefile')
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp)
|
|
|
|
def test_creates_corefile(self):
|
|
firewall_manager.generate_corefile([], self.path)
|
|
self.assertTrue(os.path.exists(self.path))
|
|
|
|
def test_contains_forward_and_cache(self):
|
|
firewall_manager.generate_corefile([], self.path)
|
|
content = open(self.path).read()
|
|
self.assertIn('forward . 8.8.8.8', content)
|
|
self.assertIn('cache', content)
|
|
self.assertIn('cell {', content)
|
|
|
|
def test_no_blocked_services_no_acl_block(self):
|
|
peers = [_make_peer('10.0.0.2', internet=True,
|
|
services=list(firewall_manager.SERVICE_IPS.keys()))]
|
|
firewall_manager.generate_corefile(peers, self.path)
|
|
content = open(self.path).read()
|
|
self.assertNotIn('block net', content)
|
|
|
|
def test_blocked_service_generates_acl(self):
|
|
peers = [_make_peer('10.0.0.3', internet=False, services=['calendar'])]
|
|
firewall_manager.generate_corefile(peers, self.path)
|
|
content = open(self.path).read()
|
|
# files/mail/webdav are blocked for this peer
|
|
self.assertIn('block net 10.0.0.3/32', content)
|
|
|
|
def test_peer_with_all_services_allowed_no_acl(self):
|
|
peers = [_make_peer('10.0.0.2', services=list(firewall_manager.SERVICE_IPS.keys()))]
|
|
firewall_manager.generate_corefile(peers, self.path)
|
|
self.assertNotIn('block net', open(self.path).read())
|
|
|
|
def test_returns_false_on_write_error(self):
|
|
result = firewall_manager.generate_corefile([], '/nonexistent/path/Corefile')
|
|
self.assertFalse(result)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# apply_peer_rules — iptables call verification
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestApplyPeerRules(unittest.TestCase):
|
|
"""Verify correct iptables calls for full-internet vs split-tunnel peers."""
|
|
|
|
def _run_apply(self, peer_ip, settings):
|
|
calls_made = []
|
|
|
|
def fake_wg_exec(args):
|
|
calls_made.append(args)
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
m.stdout = ''
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec):
|
|
firewall_manager.apply_peer_rules(peer_ip, settings)
|
|
|
|
return calls_made
|
|
|
|
def test_full_internet_peer_gets_accept_rule(self):
|
|
calls = self._run_apply('10.0.0.2', {'internet_access': True,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
|
'peer_access': True})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
targets = [c[c.index('-j') + 1] for c in iptables_calls if '-j' in c]
|
|
# Full-internet peer: only ACCEPT rules (no DROP except iptables-restore clears)
|
|
self.assertNotIn('DROP', targets)
|
|
self.assertIn('ACCEPT', targets)
|
|
|
|
def test_no_internet_peer_gets_drop_rule(self):
|
|
calls = self._run_apply('10.0.0.3', {'internet_access': False,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
|
'peer_access': True})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
targets = [c[c.index('-j') + 1] for c in iptables_calls if '-j' in c]
|
|
self.assertIn('DROP', targets)
|
|
self.assertIn('ACCEPT', targets)
|
|
|
|
def test_service_access_restriction_generates_drop(self):
|
|
calls = self._run_apply('10.0.0.4', {'internet_access': False,
|
|
'service_access': ['calendar'],
|
|
'peer_access': True})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
# files/mail/webdav should be DROPped, calendar ACCEPTed
|
|
targets_with_ips = [
|
|
(c[c.index('-d') + 1], c[c.index('-j') + 1])
|
|
for c in iptables_calls
|
|
if '-d' in c and '-j' in c
|
|
]
|
|
svc_rules = {ip: t for ip, t in targets_with_ips
|
|
if ip in firewall_manager.SERVICE_IPS.values()}
|
|
calendar_ip = firewall_manager.SERVICE_IPS['calendar']
|
|
files_ip = firewall_manager.SERVICE_IPS['files']
|
|
self.assertEqual(svc_rules.get(calendar_ip), 'ACCEPT')
|
|
self.assertEqual(svc_rules.get(files_ip), 'DROP')
|
|
|
|
def test_all_rules_tagged_with_peer_comment(self):
|
|
calls = self._run_apply('10.0.0.2', {'internet_access': True,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
|
'peer_access': True})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
comment = firewall_manager._peer_comment('10.0.0.2')
|
|
for c in iptables_calls:
|
|
if '-I' in c: # only insertion rules need the comment
|
|
self.assertIn(comment, c, f"Rule missing comment tag: {c}")
|
|
|
|
def test_peer_with_no_peer_access_gets_drop_for_vpn_subnet(self):
|
|
calls = self._run_apply('10.0.0.5', {'internet_access': True,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
|
'peer_access': False})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
vpn_rules = [c for c in iptables_calls if '-d' in c and '10.0.0.0/24' in c]
|
|
self.assertTrue(vpn_rules, "Expected a rule for 10.0.0.0/24")
|
|
for c in vpn_rules:
|
|
self.assertIn('DROP', c)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# apply_all_peer_rules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestApplyAllPeerRules(unittest.TestCase):
|
|
def test_calls_apply_per_peer(self):
|
|
peers = [_make_peer('10.0.0.2'), _make_peer('10.0.0.3', internet=False)]
|
|
with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \
|
|
patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply:
|
|
firewall_manager.apply_all_peer_rules(peers)
|
|
self.assertEqual(mock_apply.call_count, 2)
|
|
called_ips = {c.args[0] for c in mock_apply.call_args_list}
|
|
self.assertEqual(called_ips, {'10.0.0.2', '10.0.0.3'})
|
|
|
|
def test_peer_without_ip_is_skipped(self):
|
|
peers = [{'internet_access': True}, _make_peer('10.0.0.2')]
|
|
with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \
|
|
patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply:
|
|
firewall_manager.apply_all_peer_rules(peers)
|
|
self.assertEqual(mock_apply.call_count, 1)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# clear_peer_rules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestClearPeerRules(unittest.TestCase):
|
|
def test_removes_only_matching_comment_lines(self):
|
|
save_output = (
|
|
'*filter\n'
|
|
':INPUT ACCEPT [0:0]\n'
|
|
':FORWARD ACCEPT [0:0]\n'
|
|
'-A FORWARD -s 10.0.0.2 -m comment --comment pic-peer-10-0-0-2 -j ACCEPT\n'
|
|
'-A FORWARD -s 10.0.0.3 -m comment --comment pic-peer-10-0-0-3 -j DROP\n'
|
|
'COMMIT\n'
|
|
)
|
|
restored = []
|
|
|
|
def fake_wg_exec(args):
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
if args == ['iptables-save']:
|
|
m.stdout = save_output
|
|
return m
|
|
|
|
def fake_restore(cmd, input, **kwargs):
|
|
restored.append(input)
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch('subprocess.run', side_effect=fake_restore):
|
|
firewall_manager.clear_peer_rules('10.0.0.2')
|
|
|
|
self.assertEqual(len(restored), 1)
|
|
restored_content = restored[0]
|
|
self.assertNotIn('pic-peer-10-0-0-2', restored_content)
|
|
self.assertIn('pic-peer-10-0-0-3', restored_content)
|
|
|
|
def test_no_op_when_no_matching_rules(self):
|
|
save_output = '*filter\n:FORWARD ACCEPT [0:0]\nCOMMIT\n'
|
|
|
|
def fake_wg_exec(args):
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
m.stdout = save_output
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch('subprocess.run') as mock_restore:
|
|
firewall_manager.clear_peer_rules('10.0.0.99')
|
|
|
|
mock_restore.assert_not_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# update_service_ips
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUpdateServiceIps(unittest.TestCase):
|
|
def tearDown(self):
|
|
# Restore default SERVICE_IPS after each test
|
|
firewall_manager.update_service_ips('172.20.0.0/16')
|
|
|
|
def test_default_ips_are_172_20(self):
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['calendar'], '172.20.0.21')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['webdav'], '172.20.0.24')
|
|
|
|
def test_update_changes_all_virtual_ips(self):
|
|
firewall_manager.update_service_ips('10.0.0.0/24')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['calendar'], '10.0.0.21')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['files'], '10.0.0.22')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['mail'], '10.0.0.23')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['webdav'], '10.0.0.24')
|
|
|
|
def test_update_replaces_not_extends(self):
|
|
firewall_manager.update_service_ips('10.0.0.0/24')
|
|
# Should only have the four virtual-IP keys
|
|
self.assertEqual(set(firewall_manager.SERVICE_IPS.keys()),
|
|
{'calendar', 'files', 'mail', 'webdav'})
|
|
|
|
def test_apply_peer_rules_uses_updated_ips(self):
|
|
firewall_manager.update_service_ips('10.0.0.0/24')
|
|
called_with = []
|
|
|
|
def fake_wg_exec(args):
|
|
called_with.append(args)
|
|
m = MagicMock()
|
|
m.returncode = 1 # simulate rule-doesn't-exist → _ensure_rule inserts
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch.object(firewall_manager, 'clear_peer_rules'):
|
|
firewall_manager.apply_peer_rules('10.0.0.5', {
|
|
'internet_access': True,
|
|
'service_access': ['calendar'],
|
|
'peer_access': True,
|
|
})
|
|
|
|
iptables_calls = [c for c in called_with if c and c[0] == 'iptables']
|
|
dest_ips = [c[c.index('-d') + 1] for c in iptables_calls if '-d' in c]
|
|
# calendar vIP should now be 10.0.0.21
|
|
self.assertIn('10.0.0.21', dest_ips)
|
|
# old IP must not appear
|
|
self.assertNotIn('172.20.0.21', dest_ips)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|