aa1e5c41ec
Unit Tests / test (push) Successful in 12m6s
Coverage was below acceptable levels and several newly-added code paths (sshuttle egress, proxy egress, DDNS provider stubs, DNS overview route, peer-registry provisioning) had zero test coverage. ~250 new unit tests are added across 16 new test files. Existing test files are updated to match refactored interfaces (DHCP removed, constants introduced, network_manager restructured). .coveragerc is added to pin the source mapping and the 70% floor so regressions are caught at commit time. tests/test_enhanced_api.py was previously living in api/ (wrong location) and is moved to tests/ where it belongs. Integration test files are updated to remove references to DHCP endpoints and add coverage for the new DNS overview and DDNS sync endpoints. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
247 lines
10 KiB
Python
247 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Additional tests for firewall_manager.py covering missed lines:
|
|
- _run() exception path (lines 52-54)
|
|
- ensure_caddy_virtual_ips() add-failure branch and exception path
|
|
- _rule_exists, _ensure_rule, _delete_rule
|
|
- reload_coredns (success, failure, exception)
|
|
- apply_all_dns_rules
|
|
- _service_tag
|
|
- apply_service_rules
|
|
- clear_service_rules
|
|
"""
|
|
|
|
import sys
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock, call
|
|
import subprocess
|
|
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
import firewall_manager
|
|
|
|
|
|
def _make_proc(returncode=0, stdout='', stderr=''):
|
|
p = MagicMock()
|
|
p.returncode = returncode
|
|
p.stdout = stdout
|
|
p.stderr = stderr
|
|
return p
|
|
|
|
|
|
class TestRunFunction(unittest.TestCase):
|
|
def test_run_success_returns_result(self):
|
|
proc = _make_proc(returncode=0, stdout='ok')
|
|
with patch('subprocess.run', return_value=proc):
|
|
result = firewall_manager._run(['echo', 'ok'])
|
|
self.assertEqual(result.returncode, 0)
|
|
|
|
def test_run_nonzero_with_check_logs_warning(self):
|
|
proc = _make_proc(returncode=1, stderr='error')
|
|
with patch('subprocess.run', return_value=proc):
|
|
result = firewall_manager._run(['false'], check=True)
|
|
self.assertEqual(result.returncode, 1)
|
|
|
|
def test_run_exception_reraises(self):
|
|
with patch('subprocess.run', side_effect=subprocess.TimeoutExpired(['cmd'], 1)):
|
|
with self.assertRaises(subprocess.TimeoutExpired):
|
|
firewall_manager._run(['cmd'])
|
|
|
|
|
|
class TestEnsureCaddyVirtualIps(unittest.TestCase):
|
|
def test_exception_returns_false(self):
|
|
with patch.object(firewall_manager, '_caddy_exec', side_effect=RuntimeError('no docker')):
|
|
result = firewall_manager.ensure_caddy_virtual_ips()
|
|
self.assertFalse(result)
|
|
|
|
def test_ip_already_present_skips_add(self):
|
|
# All IPs are already in the existing output
|
|
all_ips = ' '.join(firewall_manager.SERVICE_IPS.values())
|
|
mock_result = _make_proc(returncode=0, stdout=all_ips)
|
|
with patch.object(firewall_manager, '_caddy_exec', return_value=mock_result) as mock_exec:
|
|
result = firewall_manager.ensure_caddy_virtual_ips()
|
|
self.assertTrue(result)
|
|
# ip addr show was called once; no add calls
|
|
self.assertEqual(mock_exec.call_count, 1)
|
|
|
|
def test_missing_ip_triggers_add(self):
|
|
# No IPs in stdout → all IPs need to be added
|
|
calls_made = []
|
|
|
|
def fake_caddy_exec(args):
|
|
calls_made.append(args)
|
|
return _make_proc(returncode=0, stdout='')
|
|
|
|
with patch.object(firewall_manager, '_caddy_exec', side_effect=fake_caddy_exec):
|
|
result = firewall_manager.ensure_caddy_virtual_ips()
|
|
self.assertTrue(result)
|
|
# First call is ip addr show; subsequent calls are ip addr add
|
|
self.assertGreater(len(calls_made), 1)
|
|
|
|
def test_add_failure_logs_warning(self):
|
|
# First call (ip addr show) returns empty; subsequent calls (ip addr add) fail
|
|
call_count = [0]
|
|
|
|
def fake_caddy_exec(args):
|
|
call_count[0] += 1
|
|
if call_count[0] == 1:
|
|
return _make_proc(returncode=0, stdout='')
|
|
return _make_proc(returncode=1, stderr='failed to add IP')
|
|
|
|
with patch.object(firewall_manager, '_caddy_exec', side_effect=fake_caddy_exec):
|
|
result = firewall_manager.ensure_caddy_virtual_ips()
|
|
self.assertTrue(result) # Function still returns True even on add failure
|
|
|
|
|
|
class TestRuleHelpers(unittest.TestCase):
|
|
def test_rule_exists_returns_true_when_returncode_0(self):
|
|
with patch.object(firewall_manager, '_iptables', return_value=_make_proc(returncode=0)):
|
|
result = firewall_manager._rule_exists('FORWARD', ['-j', 'ACCEPT'])
|
|
self.assertTrue(result)
|
|
|
|
def test_rule_exists_returns_false_when_nonzero(self):
|
|
with patch.object(firewall_manager, '_iptables', return_value=_make_proc(returncode=1)):
|
|
result = firewall_manager._rule_exists('FORWARD', ['-j', 'ACCEPT'])
|
|
self.assertFalse(result)
|
|
|
|
def test_ensure_rule_inserts_when_not_present(self):
|
|
calls = []
|
|
def fake_iptables(args, check=False):
|
|
calls.append(args[0])
|
|
if args[0] == '-C':
|
|
return _make_proc(returncode=1)
|
|
return _make_proc(returncode=0)
|
|
|
|
with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables):
|
|
firewall_manager._ensure_rule('FORWARD', ['-j', 'ACCEPT'])
|
|
self.assertIn('-I', calls)
|
|
|
|
def test_ensure_rule_skips_insert_when_already_present(self):
|
|
with patch.object(firewall_manager, '_iptables', return_value=_make_proc(returncode=0)) as mock_ipt:
|
|
firewall_manager._ensure_rule('FORWARD', ['-j', 'ACCEPT'])
|
|
# Only the -C check call was made
|
|
self.assertEqual(mock_ipt.call_count, 1)
|
|
|
|
def test_delete_rule_calls_delete_while_exists(self):
|
|
check_count = [0]
|
|
def fake_iptables(args, check=False):
|
|
if args[0] == '-C':
|
|
check_count[0] += 1
|
|
# Rule exists on first check, gone after first delete
|
|
return _make_proc(returncode=0 if check_count[0] == 1 else 1)
|
|
# -D delete call: return success
|
|
return _make_proc(returncode=0)
|
|
|
|
with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables):
|
|
firewall_manager._delete_rule('FORWARD', ['-j', 'ACCEPT'])
|
|
# Should have checked twice (once found, once not found) and deleted once
|
|
self.assertEqual(check_count[0], 2)
|
|
|
|
|
|
class TestReloadCoreDns(unittest.TestCase):
|
|
def test_success_returns_true(self):
|
|
with patch.object(firewall_manager, '_run', return_value=_make_proc(returncode=0)):
|
|
result = firewall_manager.reload_coredns()
|
|
self.assertTrue(result)
|
|
|
|
def test_nonzero_returncode_returns_false(self):
|
|
with patch.object(firewall_manager, '_run', return_value=_make_proc(returncode=1, stderr='not found')):
|
|
result = firewall_manager.reload_coredns()
|
|
self.assertFalse(result)
|
|
|
|
def test_exception_returns_false(self):
|
|
with patch.object(firewall_manager, '_run', side_effect=RuntimeError('no docker')):
|
|
result = firewall_manager.reload_coredns()
|
|
self.assertFalse(result)
|
|
|
|
|
|
class TestApplyAllDnsRules(unittest.TestCase):
|
|
def test_generates_corefile_and_calls_reload_on_success(self):
|
|
with patch.object(firewall_manager, 'generate_corefile', return_value=True) as mock_gen, \
|
|
patch.object(firewall_manager, 'reload_coredns', return_value=True) as mock_reload:
|
|
result = firewall_manager.apply_all_dns_rules([], '/tmp/Corefile')
|
|
self.assertTrue(result)
|
|
mock_gen.assert_called_once()
|
|
mock_reload.assert_called_once()
|
|
|
|
def test_does_not_call_reload_when_generate_fails(self):
|
|
with patch.object(firewall_manager, 'generate_corefile', return_value=False), \
|
|
patch.object(firewall_manager, 'reload_coredns') as mock_reload:
|
|
result = firewall_manager.apply_all_dns_rules([], '/tmp/Corefile')
|
|
self.assertFalse(result)
|
|
mock_reload.assert_not_called()
|
|
|
|
|
|
class TestServiceTag(unittest.TestCase):
|
|
def test_lowercase_and_replace_special_chars(self):
|
|
tag = firewall_manager._service_tag('my-service_v2!')
|
|
self.assertEqual(tag, 'pic-svc-my-service-v2-')
|
|
|
|
def test_simple_id(self):
|
|
tag = firewall_manager._service_tag('gitea')
|
|
self.assertEqual(tag, 'pic-svc-gitea')
|
|
|
|
|
|
class TestApplyServiceRules(unittest.TestCase):
|
|
def test_applies_accept_rules_via_iptables(self):
|
|
calls = []
|
|
def fake_iptables(args, check=False):
|
|
calls.append(args)
|
|
return _make_proc(returncode=0)
|
|
|
|
rules = [{'type': 'ACCEPT', 'dest_ip': '10.20.0.5', 'dest_port': 80, 'proto': 'tcp'}]
|
|
with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables), \
|
|
patch.object(firewall_manager, 'clear_service_rules'):
|
|
result = firewall_manager.apply_service_rules('gitea', '10.20.0.5', rules)
|
|
self.assertTrue(result)
|
|
self.assertTrue(any('FORWARD' in str(c) for c in calls))
|
|
|
|
def test_skips_non_accept_rules(self):
|
|
calls = []
|
|
rules = [{'type': 'DROP', 'dest_ip': '10.20.0.5', 'dest_port': 80, 'proto': 'tcp'}]
|
|
with patch.object(firewall_manager, '_iptables', side_effect=lambda *a, **kw: calls.append(a) or _make_proc()), \
|
|
patch.object(firewall_manager, 'clear_service_rules'):
|
|
result = firewall_manager.apply_service_rules('gitea', '10.20.0.5', rules)
|
|
self.assertTrue(result)
|
|
self.assertEqual(len(calls), 0)
|
|
|
|
def test_service_ip_placeholder_substituted(self):
|
|
captured = []
|
|
def fake_iptables(args, check=False):
|
|
captured.extend(args)
|
|
return _make_proc(returncode=0)
|
|
|
|
rules = [{'type': 'ACCEPT', 'dest_ip': '${SERVICE_IP}', 'dest_port': 8080, 'proto': 'tcp'}]
|
|
with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables), \
|
|
patch.object(firewall_manager, 'clear_service_rules'):
|
|
firewall_manager.apply_service_rules('app', '10.20.0.9', rules)
|
|
self.assertIn('10.20.0.9', captured)
|
|
|
|
|
|
class TestClearServiceRules(unittest.TestCase):
|
|
def test_no_matching_rules_skips_restore(self):
|
|
# iptables-save returns output with no matching tag
|
|
save_proc = _make_proc(returncode=0, stdout='*filter\n-A FORWARD -j ACCEPT\nCOMMIT\n')
|
|
with patch.object(firewall_manager, '_wg_exec', return_value=save_proc), \
|
|
patch('subprocess.run') as mock_restore:
|
|
firewall_manager.clear_service_rules('nonexistent-svc')
|
|
mock_restore.assert_not_called()
|
|
|
|
def test_exception_is_logged_not_raised(self):
|
|
with patch.object(firewall_manager, '_wg_exec', side_effect=RuntimeError('no docker')):
|
|
# Should not raise
|
|
firewall_manager.clear_service_rules('gitea')
|
|
|
|
def test_save_failure_skips_restore(self):
|
|
save_proc = _make_proc(returncode=1, stderr='failed')
|
|
with patch.object(firewall_manager, '_wg_exec', return_value=save_proc), \
|
|
patch('subprocess.run') as mock_restore:
|
|
firewall_manager.clear_service_rules('gitea')
|
|
mock_restore.assert_not_called()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|