#!/usr/bin/env python3 """ Tests for firewall_manager — per-peer iptables rule generation and DNS ACL logic. All docker exec calls are mocked so tests run without a live Docker environment. """ import sys import os import tempfile import shutil import unittest from unittest.mock import patch, call, MagicMock from pathlib import Path api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) import firewall_manager def _make_peer(ip, internet=True, services=None, peers=True): if services is None: services = list(firewall_manager.SERVICE_IPS.keys()) return {'ip': ip, 'internet_access': internet, 'service_access': services, 'peer_access': peers} # --------------------------------------------------------------------------- # _peer_comment # --------------------------------------------------------------------------- class TestPeerComment(unittest.TestCase): def test_dots_replaced_with_dashes(self): self.assertEqual(firewall_manager._peer_comment('10.0.0.2'), 'pic-peer-10-0-0-2') def test_different_ip(self): self.assertEqual(firewall_manager._peer_comment('192.168.1.100'), 'pic-peer-192-168-1-100') # --------------------------------------------------------------------------- # _build_acl_block # --------------------------------------------------------------------------- class TestBuildAclBlock(unittest.TestCase): def test_empty_returns_empty_string(self): self.assertEqual(firewall_manager._build_acl_block({}), '') def test_no_blocked_peers_returns_empty(self): blocked = {s: [] for s in firewall_manager.SERVICE_IPS} self.assertEqual(firewall_manager._build_acl_block(blocked), '') def test_blocked_peer_appears_in_acl(self): blocked = {'calendar': ['10.0.0.5'], 'files': [], 'mail': [], 'webdav': []} result = firewall_manager._build_acl_block(blocked) self.assertIn('acl calendar.cell.', result) self.assertIn('block net 10.0.0.5/32', result) self.assertIn('allow net 0.0.0.0/0', result) def test_unknown_service_skipped(self): blocked = {'nonexistent': ['10.0.0.2']} result = firewall_manager._build_acl_block(blocked) self.assertEqual(result, '') def test_multiple_peers_blocked_from_same_service(self): blocked = {'mail': ['10.0.0.2', '10.0.0.3'], 'calendar': [], 'files': [], 'webdav': []} result = firewall_manager._build_acl_block(blocked) self.assertEqual(result.count('block net'), 2) self.assertIn('10.0.0.2/32', result) self.assertIn('10.0.0.3/32', result) # --------------------------------------------------------------------------- # generate_corefile # --------------------------------------------------------------------------- class TestGenerateCorefile(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.path = os.path.join(self.tmp, 'Corefile') def tearDown(self): shutil.rmtree(self.tmp) def test_creates_corefile(self): firewall_manager.generate_corefile([], self.path) self.assertTrue(os.path.exists(self.path)) def test_contains_forward_and_cache(self): firewall_manager.generate_corefile([], self.path) content = open(self.path).read() self.assertIn('forward . 8.8.8.8', content) self.assertIn('cache', content) self.assertIn('cell {', content) def test_no_blocked_services_no_acl_block(self): peers = [_make_peer('10.0.0.2', internet=True, services=list(firewall_manager.SERVICE_IPS.keys()))] firewall_manager.generate_corefile(peers, self.path) content = open(self.path).read() self.assertNotIn('block net', content) def test_blocked_service_generates_acl(self): peers = [_make_peer('10.0.0.3', internet=False, services=['calendar'])] firewall_manager.generate_corefile(peers, self.path) content = open(self.path).read() # files/mail/webdav are blocked for this peer self.assertIn('block net 10.0.0.3/32', content) def test_peer_with_all_services_allowed_no_acl(self): peers = [_make_peer('10.0.0.2', services=list(firewall_manager.SERVICE_IPS.keys()))] firewall_manager.generate_corefile(peers, self.path) self.assertNotIn('block net', open(self.path).read()) def test_returns_false_on_write_error(self): result = firewall_manager.generate_corefile([], '/nonexistent/path/Corefile') self.assertFalse(result) # --------------------------------------------------------------------------- # apply_peer_rules — iptables call verification # --------------------------------------------------------------------------- class TestApplyPeerRules(unittest.TestCase): """Verify correct iptables calls for full-internet vs split-tunnel peers.""" def _run_apply(self, peer_ip, settings): calls_made = [] def fake_wg_exec(args): calls_made.append(args) m = MagicMock() m.returncode = 0 m.stdout = '' return m with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec): firewall_manager.apply_peer_rules(peer_ip, settings) return calls_made def test_full_internet_peer_gets_accept_rule(self): calls = self._run_apply('10.0.0.2', {'internet_access': True, 'service_access': list(firewall_manager.SERVICE_IPS.keys()), 'peer_access': True}) iptables_calls = [c for c in calls if 'iptables' in c] targets = [c[c.index('-j') + 1] for c in iptables_calls if '-j' in c] # Full-internet peer: only ACCEPT rules (no DROP except iptables-restore clears) self.assertNotIn('DROP', targets) self.assertIn('ACCEPT', targets) def test_no_internet_peer_gets_drop_rule(self): calls = self._run_apply('10.0.0.3', {'internet_access': False, 'service_access': list(firewall_manager.SERVICE_IPS.keys()), 'peer_access': True}) iptables_calls = [c for c in calls if 'iptables' in c] targets = [c[c.index('-j') + 1] for c in iptables_calls if '-j' in c] self.assertIn('DROP', targets) self.assertIn('ACCEPT', targets) def test_service_access_restriction_generates_drop(self): calls = self._run_apply('10.0.0.4', {'internet_access': False, 'service_access': ['calendar'], 'peer_access': True}) iptables_calls = [c for c in calls if 'iptables' in c] # files/mail/webdav should be DROPped, calendar ACCEPTed targets_with_ips = [ (c[c.index('-d') + 1], c[c.index('-j') + 1]) for c in iptables_calls if '-d' in c and '-j' in c ] svc_rules = {ip: t for ip, t in targets_with_ips if ip in firewall_manager.SERVICE_IPS.values()} calendar_ip = firewall_manager.SERVICE_IPS['calendar'] files_ip = firewall_manager.SERVICE_IPS['files'] self.assertEqual(svc_rules.get(calendar_ip), 'ACCEPT') self.assertEqual(svc_rules.get(files_ip), 'DROP') def test_all_rules_tagged_with_peer_comment(self): calls = self._run_apply('10.0.0.2', {'internet_access': True, 'service_access': list(firewall_manager.SERVICE_IPS.keys()), 'peer_access': True}) iptables_calls = [c for c in calls if 'iptables' in c] comment = firewall_manager._peer_comment('10.0.0.2') for c in iptables_calls: if '-I' in c: # only insertion rules need the comment self.assertIn(comment, c, f"Rule missing comment tag: {c}") def test_peer_with_no_peer_access_gets_drop_for_vpn_subnet(self): calls = self._run_apply('10.0.0.5', {'internet_access': True, 'service_access': list(firewall_manager.SERVICE_IPS.keys()), 'peer_access': False}) iptables_calls = [c for c in calls if 'iptables' in c] vpn_rules = [c for c in iptables_calls if '-d' in c and '10.0.0.0/24' in c] self.assertTrue(vpn_rules, "Expected a rule for 10.0.0.0/24") for c in vpn_rules: self.assertIn('DROP', c) # --------------------------------------------------------------------------- # apply_all_peer_rules # --------------------------------------------------------------------------- class TestApplyAllPeerRules(unittest.TestCase): def test_calls_apply_per_peer(self): peers = [_make_peer('10.0.0.2'), _make_peer('10.0.0.3', internet=False)] with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \ patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply: firewall_manager.apply_all_peer_rules(peers) self.assertEqual(mock_apply.call_count, 2) called_ips = {c.args[0] for c in mock_apply.call_args_list} self.assertEqual(called_ips, {'10.0.0.2', '10.0.0.3'}) def test_peer_without_ip_is_skipped(self): peers = [{'internet_access': True}, _make_peer('10.0.0.2')] with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \ patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply: firewall_manager.apply_all_peer_rules(peers) self.assertEqual(mock_apply.call_count, 1) # --------------------------------------------------------------------------- # clear_peer_rules # --------------------------------------------------------------------------- class TestClearPeerRules(unittest.TestCase): def test_removes_only_matching_comment_lines(self): save_output = ( '*filter\n' ':INPUT ACCEPT [0:0]\n' ':FORWARD ACCEPT [0:0]\n' '-A FORWARD -s 10.0.0.2 -m comment --comment pic-peer-10-0-0-2 -j ACCEPT\n' '-A FORWARD -s 10.0.0.3 -m comment --comment pic-peer-10-0-0-3 -j DROP\n' 'COMMIT\n' ) restored = [] def fake_wg_exec(args): m = MagicMock() m.returncode = 0 if args == ['iptables-save']: m.stdout = save_output return m def fake_restore(cmd, input, **kwargs): restored.append(input) m = MagicMock() m.returncode = 0 return m with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \ patch('subprocess.run', side_effect=fake_restore): firewall_manager.clear_peer_rules('10.0.0.2') self.assertEqual(len(restored), 1) restored_content = restored[0] self.assertNotIn('pic-peer-10-0-0-2', restored_content) self.assertIn('pic-peer-10-0-0-3', restored_content) def test_no_op_when_no_matching_rules(self): save_output = '*filter\n:FORWARD ACCEPT [0:0]\nCOMMIT\n' def fake_wg_exec(args): m = MagicMock() m.returncode = 0 m.stdout = save_output return m with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \ patch('subprocess.run') as mock_restore: firewall_manager.clear_peer_rules('10.0.0.99') mock_restore.assert_not_called() if __name__ == '__main__': unittest.main()