3a35cf72d3
Tests assumed write to /nonexistent/... fails, but CI runs as root where Linux allows creating any path. Use unittest.mock.patch on builtins.open with OSError side_effect so the test is environment-independent. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1037 lines
48 KiB
Python
1037 lines
48 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for firewall_manager — per-peer iptables rule generation and DNS ACL logic.
|
|
All docker exec calls are mocked so tests run without a live Docker environment.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
import unittest
|
|
from unittest.mock import patch, call, MagicMock
|
|
from pathlib import Path
|
|
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
import firewall_manager
|
|
|
|
|
|
def _make_peer(ip, internet=True, services=None, peers=True):
|
|
if services is None:
|
|
services = list(firewall_manager.SERVICE_IPS.keys())
|
|
return {'ip': ip, 'internet_access': internet, 'service_access': services, 'peer_access': peers}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _peer_comment
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPeerComment(unittest.TestCase):
|
|
def test_dots_replaced_with_dashes(self):
|
|
# Comment format now includes /32 suffix to prevent substring matches
|
|
# (e.g. pic-peer-10-0-0-1/32 is not a prefix of pic-peer-10-0-0-10/32)
|
|
self.assertEqual(firewall_manager._peer_comment('10.0.0.2'), 'pic-peer-10-0-0-2/32')
|
|
|
|
def test_different_ip(self):
|
|
self.assertEqual(firewall_manager._peer_comment('192.168.1.100'), 'pic-peer-192-168-1-100/32')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _build_acl_block
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestBuildAclBlock(unittest.TestCase):
|
|
def test_empty_returns_empty_string(self):
|
|
self.assertEqual(firewall_manager._build_acl_block({}), '')
|
|
|
|
def test_no_blocked_peers_returns_empty(self):
|
|
blocked = {s: [] for s in firewall_manager.SERVICE_IPS}
|
|
self.assertEqual(firewall_manager._build_acl_block(blocked), '')
|
|
|
|
def test_blocked_peer_appears_in_acl(self):
|
|
blocked = {'calendar': ['10.0.0.5'], 'files': [], 'mail': [], 'webdav': []}
|
|
result = firewall_manager._build_acl_block(blocked)
|
|
self.assertIn('acl calendar.cell.', result)
|
|
self.assertIn('block net 10.0.0.5/32', result)
|
|
self.assertIn('allow net 0.0.0.0/0', result)
|
|
|
|
def test_unknown_service_skipped(self):
|
|
blocked = {'nonexistent': ['10.0.0.2']}
|
|
result = firewall_manager._build_acl_block(blocked)
|
|
self.assertEqual(result, '')
|
|
|
|
def test_multiple_peers_blocked_from_same_service(self):
|
|
blocked = {'mail': ['10.0.0.2', '10.0.0.3'], 'calendar': [], 'files': [], 'webdav': []}
|
|
result = firewall_manager._build_acl_block(blocked)
|
|
self.assertEqual(result.count('block net'), 2)
|
|
self.assertIn('10.0.0.2/32', result)
|
|
self.assertIn('10.0.0.3/32', result)
|
|
|
|
def test_multiple_peers_in_single_acl_block(self):
|
|
# Both IPs must be in ONE acl block, not separate blocks.
|
|
# Separate blocks cause the first block's allow-all to match before
|
|
# the second block's block rule — silently granting access.
|
|
blocked = {'mail': ['10.0.0.2', '10.0.0.3'], 'calendar': [], 'files': [], 'webdav': []}
|
|
result = firewall_manager._build_acl_block(blocked)
|
|
self.assertEqual(result.count('acl mail.cell.'), 1,
|
|
'Both blocked peers must share a single acl block')
|
|
# Both block lines must appear before the allow-all
|
|
idx_block_2 = result.index('block net 10.0.0.2/32')
|
|
idx_block_3 = result.index('block net 10.0.0.3/32')
|
|
idx_allow = result.index('allow net 0.0.0.0/0')
|
|
self.assertLess(idx_block_2, idx_allow)
|
|
self.assertLess(idx_block_3, idx_allow)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# generate_corefile
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGenerateCorefile(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.path = os.path.join(self.tmp, 'Corefile')
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp)
|
|
|
|
def test_creates_corefile(self):
|
|
firewall_manager.generate_corefile([], self.path)
|
|
self.assertTrue(os.path.exists(self.path))
|
|
|
|
def test_contains_forward_and_cache(self):
|
|
firewall_manager.generate_corefile([], self.path)
|
|
content = open(self.path).read()
|
|
self.assertIn('forward . 8.8.8.8', content)
|
|
self.assertIn('cache', content)
|
|
self.assertIn('cell {', content)
|
|
|
|
def test_no_blocked_services_no_acl_block(self):
|
|
peers = [_make_peer('10.0.0.2', internet=True,
|
|
services=list(firewall_manager.SERVICE_IPS.keys()))]
|
|
firewall_manager.generate_corefile(peers, self.path)
|
|
content = open(self.path).read()
|
|
self.assertNotIn('block net', content)
|
|
|
|
def test_blocked_service_generates_acl(self):
|
|
peers = [_make_peer('10.0.0.3', internet=False, services=['calendar'])]
|
|
firewall_manager.generate_corefile(peers, self.path)
|
|
content = open(self.path).read()
|
|
# files/mail/webdav are blocked for this peer
|
|
self.assertIn('block net 10.0.0.3/32', content)
|
|
|
|
def test_peer_with_all_services_allowed_no_acl(self):
|
|
peers = [_make_peer('10.0.0.2', services=list(firewall_manager.SERVICE_IPS.keys()))]
|
|
firewall_manager.generate_corefile(peers, self.path)
|
|
self.assertNotIn('block net', open(self.path).read())
|
|
|
|
def test_corefile_contains_reload_plugin(self):
|
|
firewall_manager.generate_corefile([], self.path)
|
|
content = open(self.path).read()
|
|
self.assertIn('reload', content)
|
|
|
|
def test_returns_false_on_write_error(self):
|
|
with unittest.mock.patch('builtins.open', side_effect=OSError('Permission denied')):
|
|
result = firewall_manager.generate_corefile([], '/any/path/Corefile')
|
|
self.assertFalse(result)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# generate_corefile with cell_links
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGenerateCorefileWithCellLinks(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.path = os.path.join(self.tmp, 'Corefile')
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp)
|
|
|
|
def _content(self):
|
|
return open(self.path).read()
|
|
|
|
def test_cell_links_none_produces_no_forwarding_stanzas(self):
|
|
"""Default (None) produces no extra forwarding blocks beyond the primary zone."""
|
|
firewall_manager.generate_corefile([], self.path, cell_links=None)
|
|
content = self._content()
|
|
# The only 'forward' line should be the default internet forwarder
|
|
forward_lines = [l for l in content.splitlines() if 'forward' in l]
|
|
self.assertEqual(len(forward_lines), 1)
|
|
self.assertIn('8.8.8.8', forward_lines[0])
|
|
|
|
def test_cell_links_empty_list_produces_no_extra_stanzas(self):
|
|
"""An empty cell_links list produces no extra forwarding blocks."""
|
|
firewall_manager.generate_corefile([], self.path, cell_links=[])
|
|
content = self._content()
|
|
forward_lines = [l for l in content.splitlines() if 'forward' in l]
|
|
self.assertEqual(len(forward_lines), 1)
|
|
self.assertIn('8.8.8.8', forward_lines[0])
|
|
|
|
def test_single_cell_link_produces_forwarding_block(self):
|
|
"""One cell link produces one forwarding stanza with correct domain and dns_ip."""
|
|
cell_links = [{'domain': 'remote.cell', 'dns_ip': '10.1.0.1'}]
|
|
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
|
|
content = self._content()
|
|
self.assertIn('remote.cell {', content)
|
|
self.assertIn('forward . 10.1.0.1', content)
|
|
self.assertIn('cache', content)
|
|
|
|
def test_multiple_cell_links_produce_multiple_forwarding_blocks(self):
|
|
"""Multiple cell links produce one stanza each."""
|
|
cell_links = [
|
|
{'domain': 'alpha.cell', 'dns_ip': '10.1.0.1'},
|
|
{'domain': 'beta.cell', 'dns_ip': '10.2.0.1'},
|
|
]
|
|
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
|
|
content = self._content()
|
|
self.assertIn('alpha.cell {', content)
|
|
self.assertIn('forward . 10.1.0.1', content)
|
|
self.assertIn('beta.cell {', content)
|
|
self.assertIn('forward . 10.2.0.1', content)
|
|
|
|
def test_cell_links_do_not_overwrite_peer_acls(self):
|
|
"""Cell link stanzas are appended; peer ACLs in the primary zone survive."""
|
|
peers = [_make_peer('10.0.0.3', services=['calendar'])]
|
|
cell_links = [{'domain': 'other.cell', 'dns_ip': '10.99.0.1'}]
|
|
firewall_manager.generate_corefile(peers, self.path, cell_links=cell_links)
|
|
content = self._content()
|
|
self.assertIn('block net 10.0.0.3/32', content)
|
|
self.assertIn('other.cell {', content)
|
|
self.assertIn('forward . 10.99.0.1', content)
|
|
|
|
def test_link_with_missing_domain_is_skipped(self):
|
|
"""A cell_link entry with no domain key is silently skipped."""
|
|
cell_links = [{'dns_ip': '10.1.0.1'}] # no 'domain'
|
|
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
|
|
content = self._content()
|
|
# Only the default internet forwarder
|
|
forward_lines = [l for l in content.splitlines() if 'forward' in l]
|
|
self.assertEqual(len(forward_lines), 1)
|
|
|
|
def test_link_with_missing_dns_ip_is_skipped(self):
|
|
"""A cell_link entry with no dns_ip key is silently skipped."""
|
|
cell_links = [{'domain': 'nope.cell'}] # no 'dns_ip'
|
|
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
|
|
content = self._content()
|
|
self.assertNotIn('nope.cell', content)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# apply_peer_rules — iptables call verification
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestApplyPeerRules(unittest.TestCase):
|
|
"""Verify correct iptables calls for full-internet vs split-tunnel peers."""
|
|
|
|
_FAKE_CADDY_IP = '172.20.0.2'
|
|
|
|
def _run_apply(self, peer_ip, settings):
|
|
calls_made = []
|
|
|
|
def fake_wg_exec(args):
|
|
calls_made.append(args)
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
m.stdout = ''
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch.object(firewall_manager, '_get_caddy_container_ip',
|
|
return_value=self._FAKE_CADDY_IP):
|
|
firewall_manager.apply_peer_rules(peer_ip, settings)
|
|
|
|
return calls_made
|
|
|
|
def test_full_internet_peer_gets_accept_rule(self):
|
|
calls = self._run_apply('10.0.0.2', {'internet_access': True,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
|
'peer_access': True})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
targets = [c[c.index('-j') + 1] for c in iptables_calls if '-j' in c]
|
|
# Full-internet peer: only ACCEPT rules (no DROP except iptables-restore clears)
|
|
self.assertNotIn('DROP', targets)
|
|
self.assertIn('ACCEPT', targets)
|
|
|
|
def test_no_internet_peer_gets_drop_rule(self):
|
|
calls = self._run_apply('10.0.0.3', {'internet_access': False,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
|
'peer_access': True})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
targets = [c[c.index('-j') + 1] for c in iptables_calls if '-j' in c]
|
|
self.assertIn('DROP', targets)
|
|
self.assertIn('ACCEPT', targets)
|
|
|
|
def test_service_access_has_no_caddy_iptables_rule(self):
|
|
"""service_access is enforced by CoreDNS ACL only — no per-peer Caddy iptables rule.
|
|
|
|
The PIC UI is served through Caddy:80; blocking it at the iptables level
|
|
would prevent peers from accessing the management UI even if service_access=[].
|
|
"""
|
|
for sa in (['calendar'], []):
|
|
calls = self._run_apply('10.0.0.4', {'internet_access': False,
|
|
'service_access': sa,
|
|
'peer_access': True})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
caddy_rules = [c for c in iptables_calls
|
|
if '-d' in c and self._FAKE_CADDY_IP in c
|
|
and '--dport' in c and '80' in c]
|
|
self.assertFalse(caddy_rules,
|
|
f"No Caddy port-80 iptables rule expected (service_access={sa!r}); "
|
|
f"service access is DNS-ACL only so the PIC UI remains accessible")
|
|
# No per-VIP rules either — per-service control is at DNS ACL level
|
|
for svc_ip in firewall_manager.SERVICE_IPS.values():
|
|
calls = self._run_apply('10.0.0.4', {'internet_access': True,
|
|
'service_access': ['calendar'],
|
|
'peer_access': True})
|
|
vip_rules = [c for c in calls if 'iptables' in c and '-d' in c and svc_ip in c]
|
|
self.assertFalse(vip_rules, f"No per-VIP FORWARD rules expected for {svc_ip}")
|
|
|
|
def test_all_rules_tagged_with_peer_comment(self):
|
|
calls = self._run_apply('10.0.0.2', {'internet_access': True,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
|
'peer_access': True})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
comment = firewall_manager._peer_comment('10.0.0.2')
|
|
for c in iptables_calls:
|
|
if '-I' in c: # only insertion rules need the comment
|
|
self.assertIn(comment, c, f"Rule missing comment tag: {c}")
|
|
|
|
def test_peer_with_no_peer_access_gets_drop_for_vpn_subnet(self):
|
|
calls = self._run_apply('10.0.0.5', {'internet_access': True,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
|
'peer_access': False})
|
|
iptables_calls = [c for c in calls if 'iptables' in c]
|
|
vpn_rules = [c for c in iptables_calls if '-d' in c and '10.0.0.0/24' in c]
|
|
self.assertTrue(vpn_rules, "Expected a rule for 10.0.0.0/24")
|
|
for c in vpn_rules:
|
|
self.assertIn('DROP', c)
|
|
|
|
def test_custom_wg_subnet_replaces_default(self):
|
|
"""wg_subnet parameter is used instead of hardcoded 10.0.0.0/24."""
|
|
calls_made = []
|
|
|
|
def fake_wg_exec(args):
|
|
calls_made.append(args)
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
m.stdout = ''
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec):
|
|
firewall_manager.apply_peer_rules(
|
|
'10.0.2.5',
|
|
{'internet_access': True, 'peer_access': True,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys())},
|
|
wg_subnet='10.0.2.0/24',
|
|
)
|
|
|
|
iptables_calls = [c for c in calls_made if 'iptables' in c]
|
|
subnets_in_rules = [token for c in iptables_calls for token in c
|
|
if '/' in token and token.startswith('10.')]
|
|
self.assertIn('10.0.2.0/24', subnets_in_rules,
|
|
"Custom wg_subnet should appear in peer-to-peer FORWARD rule")
|
|
self.assertNotIn('10.0.0.0/24', subnets_in_rules,
|
|
"Default hardcoded subnet must not appear when custom wg_subnet given")
|
|
|
|
def test_cell_subnets_get_explicit_accept_rules(self):
|
|
"""Each cell subnet gets an explicit ACCEPT rule for cross-cell routing."""
|
|
calls_made = []
|
|
|
|
def fake_wg_exec(args):
|
|
calls_made.append(args)
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
m.stdout = ''
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec):
|
|
firewall_manager.apply_peer_rules(
|
|
'10.0.2.5',
|
|
{'internet_access': False, 'peer_access': True,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys())},
|
|
wg_subnet='10.0.2.0/24',
|
|
cell_subnets=['10.0.0.0/24', '10.0.1.0/24'],
|
|
)
|
|
|
|
iptables_calls = [c for c in calls_made if 'iptables' in c]
|
|
for cell_net in ('10.0.0.0/24', '10.0.1.0/24'):
|
|
cell_rules = [c for c in iptables_calls if '-d' in c and cell_net in c]
|
|
self.assertTrue(cell_rules,
|
|
f"Expected FORWARD rule for cell subnet {cell_net}")
|
|
for c in cell_rules:
|
|
self.assertIn('ACCEPT', c,
|
|
f"Cell subnet rule for {cell_net} must be ACCEPT")
|
|
|
|
def test_no_cell_subnets_no_extra_rules(self):
|
|
"""When cell_subnets is empty/None, no extra FORWARD rules are added."""
|
|
calls_made = []
|
|
|
|
def fake_wg_exec(args):
|
|
calls_made.append(args)
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
m.stdout = ''
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec):
|
|
firewall_manager.apply_peer_rules(
|
|
'10.0.2.5',
|
|
{'internet_access': True, 'peer_access': True,
|
|
'service_access': list(firewall_manager.SERVICE_IPS.keys())},
|
|
wg_subnet='10.0.2.0/24',
|
|
cell_subnets=[],
|
|
)
|
|
|
|
iptables_calls = [c for c in calls_made if 'iptables' in c and '-I' in c]
|
|
# Only 2 rules expected: the catch-all ACCEPT + the peer-to-peer ACCEPT
|
|
self.assertEqual(len(iptables_calls), 2,
|
|
f"Expected exactly 2 INSERT rules, got: {iptables_calls}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# apply_all_peer_rules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestApplyAllPeerRules(unittest.TestCase):
|
|
def test_calls_apply_per_peer(self):
|
|
peers = [_make_peer('10.0.0.2'), _make_peer('10.0.0.3', internet=False)]
|
|
with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \
|
|
patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply:
|
|
firewall_manager.apply_all_peer_rules(peers)
|
|
self.assertEqual(mock_apply.call_count, 2)
|
|
called_ips = {c.args[0] for c in mock_apply.call_args_list}
|
|
self.assertEqual(called_ips, {'10.0.0.2', '10.0.0.3'})
|
|
|
|
def test_peer_without_ip_is_skipped(self):
|
|
peers = [{'internet_access': True}, _make_peer('10.0.0.2')]
|
|
with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \
|
|
patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply:
|
|
firewall_manager.apply_all_peer_rules(peers)
|
|
self.assertEqual(mock_apply.call_count, 1)
|
|
|
|
def test_wg_subnet_and_cell_subnets_forwarded(self):
|
|
"""apply_all_peer_rules passes wg_subnet and cell_subnets to each apply_peer_rules call."""
|
|
peers = [_make_peer('10.0.2.2')]
|
|
cell_subnets = ['10.0.0.0/24', '10.0.1.0/24']
|
|
with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \
|
|
patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply:
|
|
firewall_manager.apply_all_peer_rules(peers, wg_subnet='10.0.2.0/24',
|
|
cell_subnets=cell_subnets)
|
|
call_kwargs = mock_apply.call_args_list[0].kwargs
|
|
self.assertEqual(call_kwargs.get('wg_subnet'), '10.0.2.0/24')
|
|
self.assertEqual(call_kwargs.get('cell_subnets'), cell_subnets)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# clear_peer_rules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestClearPeerRules(unittest.TestCase):
|
|
def test_removes_only_matching_comment_lines(self):
|
|
save_output = (
|
|
'*filter\n'
|
|
':INPUT ACCEPT [0:0]\n'
|
|
':FORWARD ACCEPT [0:0]\n'
|
|
'-A FORWARD -s 10.0.0.2 -m comment --comment "pic-peer-10-0-0-2/32" -j ACCEPT\n'
|
|
'-A FORWARD -s 10.0.0.3 -m comment --comment "pic-peer-10-0-0-3/32" -j DROP\n'
|
|
'COMMIT\n'
|
|
)
|
|
restored = []
|
|
|
|
def fake_wg_exec(args):
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
if args == ['iptables-save']:
|
|
m.stdout = save_output
|
|
return m
|
|
|
|
def fake_restore(cmd, input, **kwargs):
|
|
restored.append(input)
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch('subprocess.run', side_effect=fake_restore):
|
|
firewall_manager.clear_peer_rules('10.0.0.2')
|
|
|
|
self.assertEqual(len(restored), 1)
|
|
restored_content = restored[0]
|
|
self.assertNotIn('pic-peer-10-0-0-2/32', restored_content)
|
|
self.assertIn('pic-peer-10-0-0-3/32', restored_content)
|
|
|
|
def test_no_op_when_no_matching_rules(self):
|
|
save_output = '*filter\n:FORWARD ACCEPT [0:0]\nCOMMIT\n'
|
|
|
|
def fake_wg_exec(args):
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
m.stdout = save_output
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch('subprocess.run') as mock_restore:
|
|
firewall_manager.clear_peer_rules('10.0.0.99')
|
|
|
|
mock_restore.assert_not_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# update_service_ips
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestUpdateServiceIps(unittest.TestCase):
|
|
def tearDown(self):
|
|
# Restore default SERVICE_IPS after each test
|
|
firewall_manager.update_service_ips('172.20.0.0/16')
|
|
|
|
def test_default_ips_are_172_20(self):
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['calendar'], '172.20.0.21')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['webdav'], '172.20.0.24')
|
|
|
|
def test_update_changes_all_virtual_ips(self):
|
|
firewall_manager.update_service_ips('10.0.0.0/24')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['calendar'], '10.0.0.21')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['files'], '10.0.0.22')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['mail'], '10.0.0.23')
|
|
self.assertEqual(firewall_manager.SERVICE_IPS['webdav'], '10.0.0.24')
|
|
|
|
def test_update_replaces_not_extends(self):
|
|
firewall_manager.update_service_ips('10.0.0.0/24')
|
|
# Should only have the four virtual-IP keys
|
|
self.assertEqual(set(firewall_manager.SERVICE_IPS.keys()),
|
|
{'calendar', 'files', 'mail', 'webdav'})
|
|
|
|
def test_apply_peer_rules_no_caddy_or_vip_rules(self):
|
|
"""Service access is DNS-ACL only — no Caddy or per-VIP FORWARD rules in apply_peer_rules."""
|
|
firewall_manager.update_service_ips('10.0.0.0/24')
|
|
called_with = []
|
|
_CADDY_IP = '172.20.0.2'
|
|
|
|
def fake_wg_exec(args):
|
|
called_with.append(args)
|
|
m = MagicMock()
|
|
m.returncode = 1
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch.object(firewall_manager, 'clear_peer_rules'), \
|
|
patch.object(firewall_manager, '_get_caddy_container_ip', return_value=_CADDY_IP):
|
|
firewall_manager.apply_peer_rules('10.0.0.5', {
|
|
'internet_access': True,
|
|
'service_access': ['calendar'],
|
|
'peer_access': True,
|
|
})
|
|
|
|
iptables_calls = [c for c in called_with if c and c[0] == 'iptables']
|
|
dest_ips = [c[c.index('-d') + 1] for c in iptables_calls if '-d' in c]
|
|
# No Caddy or VIP rules — service access is purely DNS-ACL based
|
|
self.assertNotIn(_CADDY_IP, dest_ips)
|
|
self.assertNotIn('10.0.0.21', dest_ips)
|
|
self.assertNotIn('172.20.0.21', dest_ips)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestCellRules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCellRules(unittest.TestCase):
|
|
"""Tests for apply_cell_rules, clear_cell_rules, _cell_tag, and apply_all_cell_rules."""
|
|
|
|
# ── helpers ───────────────────────────────────────────────────────────────
|
|
|
|
_FAKE_API_IP = '172.20.0.10'
|
|
_FAKE_CADDY_IP = '172.20.0.2'
|
|
_FAKE_DNS_IP = '172.20.0.3'
|
|
|
|
def _capture_apply(self, cell_name, vpn_subnet, inbound_services):
|
|
"""Run apply_cell_rules with _wg_exec and container IP helpers mocked."""
|
|
calls_made = []
|
|
|
|
def fake_wg_exec(args):
|
|
calls_made.append(args)
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
m.stdout = ''
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch.object(firewall_manager, '_get_cell_api_ip', return_value=self._FAKE_API_IP), \
|
|
patch.object(firewall_manager, '_get_caddy_container_ip', return_value=self._FAKE_CADDY_IP), \
|
|
patch.object(firewall_manager, '_get_dns_container_ip', return_value=self._FAKE_DNS_IP), \
|
|
patch.object(firewall_manager, 'ensure_forward_stateful', return_value=True):
|
|
firewall_manager.apply_cell_rules(cell_name, vpn_subnet, inbound_services)
|
|
|
|
return [c for c in calls_made if 'iptables' in c]
|
|
|
|
def _targets_for_dest(self, iptables_calls, dest_ip):
|
|
"""Return list of -j targets where -d matches dest_ip."""
|
|
targets = []
|
|
for c in iptables_calls:
|
|
if '-d' in c and dest_ip in c and '-j' in c:
|
|
targets.append(c[c.index('-j') + 1])
|
|
return targets
|
|
|
|
# ── _cell_tag ─────────────────────────────────────────────────────────────
|
|
|
|
def test_cell_tag_sanitises_spaces_and_punctuation(self):
|
|
"""_cell_tag replaces non-alphanumeric chars with dashes."""
|
|
tag = firewall_manager._cell_tag('my cell!')
|
|
self.assertTrue(tag.startswith('pic-cell-'))
|
|
self.assertNotIn(' ', tag)
|
|
self.assertNotIn('!', tag)
|
|
|
|
def test_cell_tag_lowercase(self):
|
|
"""_cell_tag lowercases the cell name."""
|
|
tag = firewall_manager._cell_tag('Office')
|
|
self.assertIn('office', tag)
|
|
|
|
def test_cell_tag_has_pic_cell_prefix(self):
|
|
"""_cell_tag always starts with 'pic-cell-'."""
|
|
self.assertTrue(firewall_manager._cell_tag('remote').startswith('pic-cell-'))
|
|
|
|
def test_cell_tag_distinct_from_peer_tag(self):
|
|
"""A cell tag must not equal the peer comment for the same string."""
|
|
cell_tag = firewall_manager._cell_tag('10.0.0.2')
|
|
peer_tag = firewall_manager._peer_comment('10.0.0.2')
|
|
self.assertNotEqual(cell_tag, peer_tag)
|
|
|
|
# ── apply_cell_rules — catch-all DROP ─────────────────────────────────────
|
|
|
|
def test_apply_cell_rules_sends_catch_all_drop(self):
|
|
"""apply_cell_rules always inserts a DROP for the entire vpn_subnet."""
|
|
calls = self._capture_apply('office', '10.0.1.0/24', ['calendar'])
|
|
subnet_drops = [
|
|
c for c in calls
|
|
if '-s' in c and '10.0.1.0/24' in c
|
|
and '-j' in c and c[c.index('-j') + 1] == 'DROP'
|
|
and '-d' not in c # catch-all has no destination
|
|
]
|
|
self.assertTrue(subnet_drops, "Expected a catch-all DROP rule for the subnet")
|
|
|
|
def test_apply_cell_rules_sends_accept_for_allowed_service(self):
|
|
"""apply_cell_rules inserts Caddy ACCEPT when inbound_services is non-empty."""
|
|
calls = self._capture_apply('office', '10.0.1.0/24', ['calendar'])
|
|
caddy_targets = self._targets_for_dest(calls, self._FAKE_CADDY_IP)
|
|
self.assertIn('ACCEPT', caddy_targets,
|
|
"Expected ACCEPT to Caddy when inbound_services is non-empty")
|
|
|
|
def test_apply_cell_rules_no_caddy_accept_when_no_inbound(self):
|
|
"""apply_cell_rules does NOT insert Caddy ACCEPT when inbound_services is empty."""
|
|
calls = self._capture_apply('office', '10.0.1.0/24', [])
|
|
caddy_targets = self._targets_for_dest(calls, self._FAKE_CADDY_IP)
|
|
self.assertNotIn('ACCEPT', caddy_targets,
|
|
"No Caddy ACCEPT expected when inbound_services is empty")
|
|
|
|
def test_apply_cell_rules_accepts_api_sync_traffic(self):
|
|
"""apply_cell_rules inserts ACCEPT for cell-api:3000 so permission-sync pushes pass."""
|
|
calls = self._capture_apply('office', '10.0.1.0/24', [])
|
|
api_ip = self._FAKE_API_IP
|
|
api_accepts = [
|
|
c for c in calls
|
|
if '-s' in c and '10.0.1.0/24' in c
|
|
and '-d' in c and api_ip in c
|
|
and '--dport' in c and '3000' in c
|
|
and '-j' in c and c[c.index('-j') + 1] == 'ACCEPT'
|
|
]
|
|
self.assertTrue(api_accepts, 'Expected an ACCEPT rule for cell-api:3000')
|
|
|
|
def test_apply_cell_rules_api_sync_accept_before_catchall_drop(self):
|
|
"""The API-sync ACCEPT must be inserted after service rules so it ends up above DROP."""
|
|
insertion_order = []
|
|
|
|
def fake_wg_exec(args):
|
|
if '-I' in args and 'FORWARD' in args:
|
|
if '-j' in args:
|
|
insertion_order.append(args[args.index('-j') + 1])
|
|
m = MagicMock(); m.returncode = 0; m.stdout = ''; return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch.object(firewall_manager, '_get_cell_api_ip', return_value='172.20.0.10'), \
|
|
patch.object(firewall_manager, '_get_caddy_container_ip', return_value='172.20.0.2'), \
|
|
patch.object(firewall_manager, '_get_dns_container_ip', return_value='172.20.0.3'), \
|
|
patch.object(firewall_manager, 'ensure_forward_stateful', return_value=True):
|
|
firewall_manager.apply_cell_rules('office', '10.0.1.0/24', [])
|
|
|
|
# The API-sync ACCEPT must be the LAST -I FORWARD insertion so it sits at position 1
|
|
self.assertTrue(insertion_order, 'Expected at least one FORWARD rule inserted')
|
|
self.assertEqual(insertion_order[-1], 'ACCEPT',
|
|
f'Last -I FORWARD insertion must be ACCEPT (got {insertion_order})')
|
|
|
|
# ── apply_cell_rules — empty inbound (all-deny) ───────────────────────────
|
|
|
|
def test_apply_cell_rules_empty_inbound_no_service_accept(self):
|
|
"""With inbound_services=[], no service ACCEPT is added; catch-all DROP blocks traffic."""
|
|
calls = self._capture_apply('office', '10.0.1.0/24', [])
|
|
# No ACCEPT to Caddy
|
|
caddy_targets = self._targets_for_dest(calls, self._FAKE_CADDY_IP)
|
|
self.assertNotIn('ACCEPT', caddy_targets,
|
|
"No Caddy ACCEPT expected with empty inbound_services")
|
|
# No per-VIP rules at all
|
|
for service, svc_ip in firewall_manager.SERVICE_IPS.items():
|
|
svc_targets = self._targets_for_dest(calls, svc_ip)
|
|
self.assertFalse(svc_targets,
|
|
f"No per-VIP rules expected for {service} ({svc_ip})")
|
|
|
|
# ── apply_cell_rules — all inbound (all-accept) ───────────────────────────
|
|
|
|
def test_apply_cell_rules_all_inbound_caddy_accept(self):
|
|
"""With all four services in inbound, an ACCEPT rule is added for Caddy port 80."""
|
|
all_services = list(firewall_manager.SERVICE_IPS.keys())
|
|
calls = self._capture_apply('office', '10.0.1.0/24', all_services)
|
|
caddy_targets = self._targets_for_dest(calls, self._FAKE_CADDY_IP)
|
|
self.assertIn('ACCEPT', caddy_targets,
|
|
"Expected ACCEPT to Caddy when all services are in inbound_services")
|
|
|
|
# ── apply_cell_rules — all rules tagged ───────────────────────────────────
|
|
|
|
def test_apply_cell_rules_all_rules_tagged_with_cell_tag(self):
|
|
"""Every insertion rule must carry the cell's comment tag."""
|
|
calls = self._capture_apply('office', '10.0.1.0/24', ['calendar'])
|
|
tag = firewall_manager._cell_tag('office')
|
|
for c in calls:
|
|
if '-I' in c:
|
|
self.assertIn(tag, c, f"Rule missing cell tag: {c}")
|
|
|
|
# ── clear_cell_rules — noop when no matching rules ────────────────────────
|
|
|
|
def test_clear_cell_rules_noop_when_no_rules(self):
|
|
"""When iptables-save returns no pic-cell-office lines, iptables-restore is NOT called."""
|
|
save_output = '*filter\n:FORWARD ACCEPT [0:0]\nCOMMIT\n'
|
|
|
|
def fake_wg_exec(args):
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
m.stdout = save_output
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch('subprocess.run') as mock_restore:
|
|
firewall_manager.clear_cell_rules('office')
|
|
|
|
mock_restore.assert_not_called()
|
|
|
|
def test_clear_cell_rules_removes_tagged_lines(self):
|
|
"""clear_cell_rules removes lines carrying the cell tag and keeps others."""
|
|
tag = firewall_manager._cell_tag('office')
|
|
save_output = (
|
|
'*filter\n'
|
|
':FORWARD ACCEPT [0:0]\n'
|
|
f'-A FORWARD -s 10.0.1.0/24 -m comment --comment "{tag}" -j DROP\n'
|
|
'-A FORWARD -s 10.0.0.2 -m comment --comment "pic-peer-10-0-0-2/32" -j ACCEPT\n'
|
|
'COMMIT\n'
|
|
)
|
|
restored = []
|
|
|
|
def fake_wg_exec(args):
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
if args == ['iptables-save']:
|
|
m.stdout = save_output
|
|
return m
|
|
|
|
def fake_restore(cmd, input, **kwargs):
|
|
restored.append(input)
|
|
m = MagicMock()
|
|
m.returncode = 0
|
|
return m
|
|
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
|
patch('subprocess.run', side_effect=fake_restore):
|
|
firewall_manager.clear_cell_rules('office')
|
|
|
|
self.assertEqual(len(restored), 1)
|
|
content = restored[0]
|
|
self.assertNotIn(tag, content)
|
|
# peer rule for a different entity must survive
|
|
self.assertIn('pic-peer-10-0-0-2/32', content)
|
|
|
|
# ── apply_all_cell_rules ──────────────────────────────────────────────────
|
|
|
|
def test_apply_all_cell_rules_calls_apply_for_each(self):
|
|
"""apply_all_cell_rules calls apply_cell_rules once per link with correct args."""
|
|
cell_links = [
|
|
{
|
|
'cell_name': 'office',
|
|
'vpn_subnet': '10.1.0.0/24',
|
|
'permissions': {'inbound': {'calendar': True, 'files': False, 'mail': False, 'webdav': False},
|
|
'outbound': {}},
|
|
},
|
|
{
|
|
'cell_name': 'cabin',
|
|
'vpn_subnet': '10.2.0.0/24',
|
|
'permissions': {'inbound': {'calendar': False, 'files': True, 'mail': False, 'webdav': False},
|
|
'outbound': {}},
|
|
},
|
|
]
|
|
with patch.object(firewall_manager, 'apply_cell_rules', return_value=True) as mock_apply:
|
|
firewall_manager.apply_all_cell_rules(cell_links)
|
|
|
|
self.assertEqual(mock_apply.call_count, 2)
|
|
call_kwargs = {c.args[0]: c.args for c in mock_apply.call_args_list}
|
|
self.assertIn('office', call_kwargs)
|
|
self.assertIn('cabin', call_kwargs)
|
|
office_args = call_kwargs['office']
|
|
self.assertEqual(office_args[1], '10.1.0.0/24')
|
|
self.assertIn('calendar', office_args[2])
|
|
self.assertNotIn('files', office_args[2])
|
|
|
|
def test_apply_all_cell_rules_skips_links_with_missing_fields(self):
|
|
"""Links without cell_name or vpn_subnet are silently skipped."""
|
|
cell_links = [
|
|
{'vpn_subnet': '10.1.0.0/24'}, # no cell_name
|
|
{'cell_name': 'broken'}, # no vpn_subnet
|
|
{'cell_name': 'office', 'vpn_subnet': '10.3.0.0/24',
|
|
'permissions': {'inbound': {}, 'outbound': {}}},
|
|
]
|
|
with patch.object(firewall_manager, 'apply_cell_rules', return_value=True) as mock_apply:
|
|
firewall_manager.apply_all_cell_rules(cell_links)
|
|
|
|
# Only the complete entry should be processed
|
|
self.assertEqual(mock_apply.call_count, 1)
|
|
self.assertEqual(mock_apply.call_args.args[0], 'office')
|
|
|
|
|
|
class TestEnsureCellApiDnat(unittest.TestCase):
|
|
"""Tests for ensure_cell_api_dnat — DNAT wg0:3000 (scoped) → cell-api:3000."""
|
|
|
|
def _wg_exec_no_existing_rules(self, args):
|
|
r = MagicMock()
|
|
r.returncode = 1 if '-C' in args else 0 # -C = check: fail = not present
|
|
r.stdout = ''
|
|
r.stderr = ''
|
|
return r
|
|
|
|
def _wg_exec_all_rules_exist(self, args):
|
|
r = MagicMock()
|
|
r.returncode = 0 # -C succeeds = rule already present
|
|
r.stdout = ''
|
|
return r
|
|
|
|
def _inspect_ok(self, api_ip='172.20.0.10'):
|
|
r = MagicMock()
|
|
r.returncode = 0
|
|
r.stdout = api_ip
|
|
return r
|
|
|
|
def test_dnat_rules_added_when_not_present(self):
|
|
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
|
patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
|
|
patch.object(firewall_manager, '_wg_exec',
|
|
side_effect=self._wg_exec_no_existing_rules) as wg_mock:
|
|
result = firewall_manager.ensure_cell_api_dnat()
|
|
|
|
self.assertTrue(result)
|
|
calls_args = [c.args[0] for c in wg_mock.call_args_list]
|
|
dnat_adds = [a for a in calls_args if 'DNAT' in a and '-A' in a]
|
|
self.assertTrue(len(dnat_adds) >= 1, 'DNAT -A rule must be added')
|
|
|
|
def test_dnat_is_scoped_to_server_ip(self):
|
|
"""DNAT rule must include -d <server_ip> to avoid intercepting cross-cell traffic."""
|
|
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
|
patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
|
|
patch.object(firewall_manager, '_wg_exec',
|
|
side_effect=self._wg_exec_no_existing_rules) as wg_mock:
|
|
firewall_manager.ensure_cell_api_dnat()
|
|
|
|
all_args = [c.args[0] for c in wg_mock.call_args_list]
|
|
dnat_adds = [a for a in all_args if 'DNAT' in a and '-A' in a]
|
|
for rule in dnat_adds:
|
|
self.assertIn('10.0.0.1', rule, 'DNAT rule must be scoped to server IP')
|
|
self.assertIn('-d', rule, 'DNAT rule must use -d to scope to server IP')
|
|
|
|
def test_dnat_skipped_if_already_present(self):
|
|
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
|
patch.object(firewall_manager, '_run', return_value=self._inspect_ok()), \
|
|
patch.object(firewall_manager, '_wg_exec',
|
|
side_effect=self._wg_exec_all_rules_exist) as wg_mock:
|
|
result = firewall_manager.ensure_cell_api_dnat()
|
|
|
|
self.assertTrue(result)
|
|
calls_args = [c.args[0] for c in wg_mock.call_args_list]
|
|
add_calls = [a for a in calls_args if '-A' in a or '-I' in a]
|
|
self.assertEqual(len(add_calls), 0, 'No rules should be added when they already exist')
|
|
|
|
def test_returns_false_when_wg_server_ip_not_found(self):
|
|
with patch.object(firewall_manager, '_get_wg_server_ip', return_value=None):
|
|
result = firewall_manager.ensure_cell_api_dnat()
|
|
self.assertFalse(result)
|
|
|
|
def test_returns_false_when_cell_api_not_found(self):
|
|
r = MagicMock()
|
|
r.returncode = 0
|
|
r.stdout = ''
|
|
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
|
patch.object(firewall_manager, '_run', return_value=r):
|
|
result = firewall_manager.ensure_cell_api_dnat()
|
|
self.assertFalse(result)
|
|
|
|
def test_returns_false_on_exception(self):
|
|
with patch.object(firewall_manager, '_get_wg_server_ip', return_value='10.0.0.1'), \
|
|
patch.object(firewall_manager, '_run', side_effect=RuntimeError('docker gone')):
|
|
result = firewall_manager.ensure_cell_api_dnat()
|
|
self.assertFalse(result)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# reconcile_stale_peer_rules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestReconcileStale(unittest.TestCase):
|
|
|
|
def _save_result(self, stdout_text):
|
|
r = MagicMock()
|
|
r.returncode = 0
|
|
r.stdout = stdout_text
|
|
return r
|
|
|
|
def test_returns_zero_when_no_rules(self):
|
|
with patch.object(firewall_manager, '_wg_exec', return_value=self._save_result('*filter\nCOMMIT\n')):
|
|
n = firewall_manager.reconcile_stale_peer_rules([])
|
|
self.assertEqual(n, 0)
|
|
|
|
def test_returns_zero_when_all_peers_known(self):
|
|
save_out = (
|
|
'*filter\n'
|
|
'-A FORWARD -s 10.0.0.2 -m comment --comment "pic-peer-10-0-0-2/32" -j ACCEPT\n'
|
|
'COMMIT\n'
|
|
)
|
|
peers = [{'ip': '10.0.0.2'}]
|
|
with patch.object(firewall_manager, '_wg_exec', return_value=self._save_result(save_out)):
|
|
n = firewall_manager.reconcile_stale_peer_rules(peers)
|
|
self.assertEqual(n, 0)
|
|
|
|
def test_clears_stale_peer(self):
|
|
save_out = (
|
|
'*filter\n'
|
|
'-A FORWARD -s 10.0.0.9 -m comment --comment "pic-peer-10-0-0-9/32" -j ACCEPT\n'
|
|
'COMMIT\n'
|
|
)
|
|
cleared = []
|
|
with patch.object(firewall_manager, '_wg_exec', return_value=self._save_result(save_out)):
|
|
with patch.object(firewall_manager, 'clear_peer_rules', side_effect=cleared.append) as mock_clear:
|
|
n = firewall_manager.reconcile_stale_peer_rules([])
|
|
self.assertEqual(n, 1)
|
|
mock_clear.assert_called_once_with('10.0.0.9')
|
|
|
|
def test_handles_cidr_peer_ip(self):
|
|
"""Peer IPs stored as 'x.x.x.x/32' should still match."""
|
|
save_out = (
|
|
'*filter\n'
|
|
'-A FORWARD -s 10.0.0.5 -m comment --comment "pic-peer-10-0-0-5/32" -j ACCEPT\n'
|
|
'COMMIT\n'
|
|
)
|
|
peers = [{'ip': '10.0.0.5/32'}]
|
|
with patch.object(firewall_manager, '_wg_exec', return_value=self._save_result(save_out)):
|
|
with patch.object(firewall_manager, 'clear_peer_rules') as mock_clear:
|
|
n = firewall_manager.reconcile_stale_peer_rules(peers)
|
|
self.assertEqual(n, 0)
|
|
mock_clear.assert_not_called()
|
|
|
|
def test_returns_zero_on_iptables_save_failure(self):
|
|
fail_r = MagicMock()
|
|
fail_r.returncode = 1
|
|
fail_r.stdout = ''
|
|
with patch.object(firewall_manager, '_wg_exec', return_value=fail_r):
|
|
n = firewall_manager.reconcile_stale_peer_rules([])
|
|
self.assertEqual(n, 0)
|
|
|
|
def test_multiple_stale_ips_all_cleared(self):
|
|
save_out = (
|
|
'*filter\n'
|
|
'-A FORWARD -s 10.0.0.7 -m comment --comment "pic-peer-10-0-0-7/32" -j DROP\n'
|
|
'-A FORWARD -s 10.0.0.8 -m comment --comment "pic-peer-10-0-0-8/32" -j ACCEPT\n'
|
|
'COMMIT\n'
|
|
)
|
|
cleared = []
|
|
with patch.object(firewall_manager, '_wg_exec', return_value=self._save_result(save_out)):
|
|
with patch.object(firewall_manager, 'clear_peer_rules', side_effect=cleared.append):
|
|
n = firewall_manager.reconcile_stale_peer_rules([])
|
|
self.assertEqual(n, 2)
|
|
self.assertIn('10.0.0.7', cleared)
|
|
self.assertIn('10.0.0.8', cleared)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ensure_forward_stateful
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEnsureForwardStateful(unittest.TestCase):
|
|
"""ensure_forward_stateful deletes any existing copies then re-inserts at position 1."""
|
|
|
|
def _make_exec(self, existing_copies=0):
|
|
"""Return (calls_list, fake_wg_exec).
|
|
|
|
The fake simulates *existing_copies* existing ESTABLISHED,RELATED rules.
|
|
Each -D call with returncode 0 "removes" one copy; once they are all gone
|
|
subsequent -D calls return 1 (rule not found). All other calls succeed.
|
|
"""
|
|
calls = []
|
|
state = {'remaining': existing_copies}
|
|
|
|
def fake_wg_exec(args):
|
|
calls.append(args)
|
|
r = MagicMock()
|
|
r.stdout = ''
|
|
if '-D' in args:
|
|
if state['remaining'] > 0:
|
|
state['remaining'] -= 1
|
|
r.returncode = 0 # deletion succeeded
|
|
else:
|
|
r.returncode = 1 # nothing left to delete
|
|
else:
|
|
r.returncode = 0
|
|
return r
|
|
|
|
return calls, fake_wg_exec
|
|
|
|
def test_inserts_rule_when_not_present(self):
|
|
"""With no pre-existing rule the -D loop exits immediately and -I inserts once."""
|
|
calls, fake = self._make_exec(existing_copies=0)
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake):
|
|
result = firewall_manager.ensure_forward_stateful()
|
|
self.assertTrue(result)
|
|
# Exactly one -D attempt (returns 1 straight away, loop body never ran)
|
|
delete_calls = [c for c in calls if '-D' in c]
|
|
self.assertEqual(len(delete_calls), 1)
|
|
# Exactly one -I insert
|
|
insert_calls = [c for c in calls if '-I' in c]
|
|
self.assertEqual(len(insert_calls), 1)
|
|
flat = ' '.join(insert_calls[0])
|
|
self.assertIn('ESTABLISHED,RELATED', flat)
|
|
self.assertIn('ACCEPT', flat)
|
|
|
|
def test_deletes_existing_and_reinserts(self):
|
|
"""With 2 stale copies the loop deletes both, then inserts once at position 1."""
|
|
calls, fake = self._make_exec(existing_copies=2)
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake):
|
|
result = firewall_manager.ensure_forward_stateful()
|
|
self.assertTrue(result)
|
|
# Two successful -D calls to drain existing rules, one more that fails
|
|
delete_calls = [c for c in calls if '-D' in c]
|
|
self.assertEqual(len(delete_calls), 3) # 2 succeed + 1 fails (loop exit)
|
|
# Exactly one -I insert anchored at position 1
|
|
insert_calls = [c for c in calls if '-I' in c]
|
|
self.assertEqual(len(insert_calls), 1)
|
|
flat = ' '.join(insert_calls[0])
|
|
self.assertIn('1', flat)
|
|
self.assertIn('ESTABLISHED,RELATED', flat)
|
|
self.assertIn('ACCEPT', flat)
|
|
|
|
def test_apply_cell_rules_calls_ensure_forward_stateful(self):
|
|
"""apply_cell_rules must call ensure_forward_stateful so replies are never dropped."""
|
|
with patch.object(firewall_manager, '_wg_exec', return_value=MagicMock(returncode=0, stdout='')), \
|
|
patch.object(firewall_manager, '_get_caddy_container_ip', return_value='172.20.0.2'), \
|
|
patch.object(firewall_manager, '_get_dns_container_ip', return_value='172.20.0.3'), \
|
|
patch.object(firewall_manager, '_get_cell_api_ip', return_value='172.20.0.10'), \
|
|
patch.object(firewall_manager, 'ensure_forward_stateful') as mock_stateful:
|
|
firewall_manager.apply_cell_rules('testcell', '10.0.0.0/24', [])
|
|
mock_stateful.assert_called_once()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|