Files
pic/tests/test_firewall_manager_extra.py
T
roof aa1e5c41ec
Unit Tests / test (push) Successful in 12m6s
test: raise coverage 68.7% -> ~80.4%; add ~250 tests for new egress/DDNS/network paths
Coverage was below acceptable levels and several newly-added code paths
(sshuttle egress, proxy egress, DDNS provider stubs, DNS overview route,
peer-registry provisioning) had zero test coverage.

~250 new unit tests are added across 16 new test files. Existing test files
are updated to match refactored interfaces (DHCP removed, constants
introduced, network_manager restructured). .coveragerc is added to pin the
source mapping and the 70% floor so regressions are caught at commit time.

tests/test_enhanced_api.py was previously living in api/ (wrong location)
and is moved to tests/ where it belongs.

Integration test files are updated to remove references to DHCP endpoints
and add coverage for the new DNS overview and DDNS sync endpoints.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 09:03:39 -04:00

247 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Additional tests for firewall_manager.py covering missed lines:
- _run() exception path (lines 52-54)
- ensure_caddy_virtual_ips() add-failure branch and exception path
- _rule_exists, _ensure_rule, _delete_rule
- reload_coredns (success, failure, exception)
- apply_all_dns_rules
- _service_tag
- apply_service_rules
- clear_service_rules
"""
import sys
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock, call
import subprocess
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
import firewall_manager
def _make_proc(returncode=0, stdout='', stderr=''):
p = MagicMock()
p.returncode = returncode
p.stdout = stdout
p.stderr = stderr
return p
class TestRunFunction(unittest.TestCase):
def test_run_success_returns_result(self):
proc = _make_proc(returncode=0, stdout='ok')
with patch('subprocess.run', return_value=proc):
result = firewall_manager._run(['echo', 'ok'])
self.assertEqual(result.returncode, 0)
def test_run_nonzero_with_check_logs_warning(self):
proc = _make_proc(returncode=1, stderr='error')
with patch('subprocess.run', return_value=proc):
result = firewall_manager._run(['false'], check=True)
self.assertEqual(result.returncode, 1)
def test_run_exception_reraises(self):
with patch('subprocess.run', side_effect=subprocess.TimeoutExpired(['cmd'], 1)):
with self.assertRaises(subprocess.TimeoutExpired):
firewall_manager._run(['cmd'])
class TestEnsureCaddyVirtualIps(unittest.TestCase):
def test_exception_returns_false(self):
with patch.object(firewall_manager, '_caddy_exec', side_effect=RuntimeError('no docker')):
result = firewall_manager.ensure_caddy_virtual_ips()
self.assertFalse(result)
def test_ip_already_present_skips_add(self):
# All IPs are already in the existing output
all_ips = ' '.join(firewall_manager.SERVICE_IPS.values())
mock_result = _make_proc(returncode=0, stdout=all_ips)
with patch.object(firewall_manager, '_caddy_exec', return_value=mock_result) as mock_exec:
result = firewall_manager.ensure_caddy_virtual_ips()
self.assertTrue(result)
# ip addr show was called once; no add calls
self.assertEqual(mock_exec.call_count, 1)
def test_missing_ip_triggers_add(self):
# No IPs in stdout → all IPs need to be added
calls_made = []
def fake_caddy_exec(args):
calls_made.append(args)
return _make_proc(returncode=0, stdout='')
with patch.object(firewall_manager, '_caddy_exec', side_effect=fake_caddy_exec):
result = firewall_manager.ensure_caddy_virtual_ips()
self.assertTrue(result)
# First call is ip addr show; subsequent calls are ip addr add
self.assertGreater(len(calls_made), 1)
def test_add_failure_logs_warning(self):
# First call (ip addr show) returns empty; subsequent calls (ip addr add) fail
call_count = [0]
def fake_caddy_exec(args):
call_count[0] += 1
if call_count[0] == 1:
return _make_proc(returncode=0, stdout='')
return _make_proc(returncode=1, stderr='failed to add IP')
with patch.object(firewall_manager, '_caddy_exec', side_effect=fake_caddy_exec):
result = firewall_manager.ensure_caddy_virtual_ips()
self.assertTrue(result) # Function still returns True even on add failure
class TestRuleHelpers(unittest.TestCase):
def test_rule_exists_returns_true_when_returncode_0(self):
with patch.object(firewall_manager, '_iptables', return_value=_make_proc(returncode=0)):
result = firewall_manager._rule_exists('FORWARD', ['-j', 'ACCEPT'])
self.assertTrue(result)
def test_rule_exists_returns_false_when_nonzero(self):
with patch.object(firewall_manager, '_iptables', return_value=_make_proc(returncode=1)):
result = firewall_manager._rule_exists('FORWARD', ['-j', 'ACCEPT'])
self.assertFalse(result)
def test_ensure_rule_inserts_when_not_present(self):
calls = []
def fake_iptables(args, check=False):
calls.append(args[0])
if args[0] == '-C':
return _make_proc(returncode=1)
return _make_proc(returncode=0)
with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables):
firewall_manager._ensure_rule('FORWARD', ['-j', 'ACCEPT'])
self.assertIn('-I', calls)
def test_ensure_rule_skips_insert_when_already_present(self):
with patch.object(firewall_manager, '_iptables', return_value=_make_proc(returncode=0)) as mock_ipt:
firewall_manager._ensure_rule('FORWARD', ['-j', 'ACCEPT'])
# Only the -C check call was made
self.assertEqual(mock_ipt.call_count, 1)
def test_delete_rule_calls_delete_while_exists(self):
check_count = [0]
def fake_iptables(args, check=False):
if args[0] == '-C':
check_count[0] += 1
# Rule exists on first check, gone after first delete
return _make_proc(returncode=0 if check_count[0] == 1 else 1)
# -D delete call: return success
return _make_proc(returncode=0)
with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables):
firewall_manager._delete_rule('FORWARD', ['-j', 'ACCEPT'])
# Should have checked twice (once found, once not found) and deleted once
self.assertEqual(check_count[0], 2)
class TestReloadCoreDns(unittest.TestCase):
def test_success_returns_true(self):
with patch.object(firewall_manager, '_run', return_value=_make_proc(returncode=0)):
result = firewall_manager.reload_coredns()
self.assertTrue(result)
def test_nonzero_returncode_returns_false(self):
with patch.object(firewall_manager, '_run', return_value=_make_proc(returncode=1, stderr='not found')):
result = firewall_manager.reload_coredns()
self.assertFalse(result)
def test_exception_returns_false(self):
with patch.object(firewall_manager, '_run', side_effect=RuntimeError('no docker')):
result = firewall_manager.reload_coredns()
self.assertFalse(result)
class TestApplyAllDnsRules(unittest.TestCase):
def test_generates_corefile_and_calls_reload_on_success(self):
with patch.object(firewall_manager, 'generate_corefile', return_value=True) as mock_gen, \
patch.object(firewall_manager, 'reload_coredns', return_value=True) as mock_reload:
result = firewall_manager.apply_all_dns_rules([], '/tmp/Corefile')
self.assertTrue(result)
mock_gen.assert_called_once()
mock_reload.assert_called_once()
def test_does_not_call_reload_when_generate_fails(self):
with patch.object(firewall_manager, 'generate_corefile', return_value=False), \
patch.object(firewall_manager, 'reload_coredns') as mock_reload:
result = firewall_manager.apply_all_dns_rules([], '/tmp/Corefile')
self.assertFalse(result)
mock_reload.assert_not_called()
class TestServiceTag(unittest.TestCase):
def test_lowercase_and_replace_special_chars(self):
tag = firewall_manager._service_tag('my-service_v2!')
self.assertEqual(tag, 'pic-svc-my-service-v2-')
def test_simple_id(self):
tag = firewall_manager._service_tag('gitea')
self.assertEqual(tag, 'pic-svc-gitea')
class TestApplyServiceRules(unittest.TestCase):
def test_applies_accept_rules_via_iptables(self):
calls = []
def fake_iptables(args, check=False):
calls.append(args)
return _make_proc(returncode=0)
rules = [{'type': 'ACCEPT', 'dest_ip': '10.20.0.5', 'dest_port': 80, 'proto': 'tcp'}]
with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables), \
patch.object(firewall_manager, 'clear_service_rules'):
result = firewall_manager.apply_service_rules('gitea', '10.20.0.5', rules)
self.assertTrue(result)
self.assertTrue(any('FORWARD' in str(c) for c in calls))
def test_skips_non_accept_rules(self):
calls = []
rules = [{'type': 'DROP', 'dest_ip': '10.20.0.5', 'dest_port': 80, 'proto': 'tcp'}]
with patch.object(firewall_manager, '_iptables', side_effect=lambda *a, **kw: calls.append(a) or _make_proc()), \
patch.object(firewall_manager, 'clear_service_rules'):
result = firewall_manager.apply_service_rules('gitea', '10.20.0.5', rules)
self.assertTrue(result)
self.assertEqual(len(calls), 0)
def test_service_ip_placeholder_substituted(self):
captured = []
def fake_iptables(args, check=False):
captured.extend(args)
return _make_proc(returncode=0)
rules = [{'type': 'ACCEPT', 'dest_ip': '${SERVICE_IP}', 'dest_port': 8080, 'proto': 'tcp'}]
with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables), \
patch.object(firewall_manager, 'clear_service_rules'):
firewall_manager.apply_service_rules('app', '10.20.0.9', rules)
self.assertIn('10.20.0.9', captured)
class TestClearServiceRules(unittest.TestCase):
def test_no_matching_rules_skips_restore(self):
# iptables-save returns output with no matching tag
save_proc = _make_proc(returncode=0, stdout='*filter\n-A FORWARD -j ACCEPT\nCOMMIT\n')
with patch.object(firewall_manager, '_wg_exec', return_value=save_proc), \
patch('subprocess.run') as mock_restore:
firewall_manager.clear_service_rules('nonexistent-svc')
mock_restore.assert_not_called()
def test_exception_is_logged_not_raised(self):
with patch.object(firewall_manager, '_wg_exec', side_effect=RuntimeError('no docker')):
# Should not raise
firewall_manager.clear_service_rules('gitea')
def test_save_failure_skips_restore(self):
save_proc = _make_proc(returncode=1, stderr='failed')
with patch.object(firewall_manager, '_wg_exec', return_value=save_proc), \
patch('subprocess.run') as mock_restore:
firewall_manager.clear_service_rules('gitea')
mock_restore.assert_not_called()
if __name__ == '__main__':
unittest.main()