#!/usr/bin/env python3 """ Additional tests for firewall_manager.py covering missed lines: - _run() exception path (lines 52-54) - ensure_caddy_virtual_ips() add-failure branch and exception path - _rule_exists, _ensure_rule, _delete_rule - reload_coredns (success, failure, exception) - apply_all_dns_rules - _service_tag - apply_service_rules - clear_service_rules """ import sys import unittest from pathlib import Path from unittest.mock import patch, MagicMock, call import subprocess api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) import firewall_manager def _make_proc(returncode=0, stdout='', stderr=''): p = MagicMock() p.returncode = returncode p.stdout = stdout p.stderr = stderr return p class TestRunFunction(unittest.TestCase): def test_run_success_returns_result(self): proc = _make_proc(returncode=0, stdout='ok') with patch('subprocess.run', return_value=proc): result = firewall_manager._run(['echo', 'ok']) self.assertEqual(result.returncode, 0) def test_run_nonzero_with_check_logs_warning(self): proc = _make_proc(returncode=1, stderr='error') with patch('subprocess.run', return_value=proc): result = firewall_manager._run(['false'], check=True) self.assertEqual(result.returncode, 1) def test_run_exception_reraises(self): with patch('subprocess.run', side_effect=subprocess.TimeoutExpired(['cmd'], 1)): with self.assertRaises(subprocess.TimeoutExpired): firewall_manager._run(['cmd']) class TestEnsureCaddyVirtualIps(unittest.TestCase): def test_exception_returns_false(self): with patch.object(firewall_manager, '_caddy_exec', side_effect=RuntimeError('no docker')): result = firewall_manager.ensure_caddy_virtual_ips() self.assertFalse(result) def test_ip_already_present_skips_add(self): # All IPs are already in the existing output all_ips = ' '.join(firewall_manager.SERVICE_IPS.values()) mock_result = _make_proc(returncode=0, stdout=all_ips) with patch.object(firewall_manager, '_caddy_exec', return_value=mock_result) as mock_exec: result = firewall_manager.ensure_caddy_virtual_ips() self.assertTrue(result) # ip addr show was called once; no add calls self.assertEqual(mock_exec.call_count, 1) def test_missing_ip_triggers_add(self): # No IPs in stdout → all IPs need to be added calls_made = [] def fake_caddy_exec(args): calls_made.append(args) return _make_proc(returncode=0, stdout='') with patch.object(firewall_manager, '_caddy_exec', side_effect=fake_caddy_exec): result = firewall_manager.ensure_caddy_virtual_ips() self.assertTrue(result) # First call is ip addr show; subsequent calls are ip addr add self.assertGreater(len(calls_made), 1) def test_add_failure_logs_warning(self): # First call (ip addr show) returns empty; subsequent calls (ip addr add) fail call_count = [0] def fake_caddy_exec(args): call_count[0] += 1 if call_count[0] == 1: return _make_proc(returncode=0, stdout='') return _make_proc(returncode=1, stderr='failed to add IP') with patch.object(firewall_manager, '_caddy_exec', side_effect=fake_caddy_exec): result = firewall_manager.ensure_caddy_virtual_ips() self.assertTrue(result) # Function still returns True even on add failure class TestRuleHelpers(unittest.TestCase): def test_rule_exists_returns_true_when_returncode_0(self): with patch.object(firewall_manager, '_iptables', return_value=_make_proc(returncode=0)): result = firewall_manager._rule_exists('FORWARD', ['-j', 'ACCEPT']) self.assertTrue(result) def test_rule_exists_returns_false_when_nonzero(self): with patch.object(firewall_manager, '_iptables', return_value=_make_proc(returncode=1)): result = firewall_manager._rule_exists('FORWARD', ['-j', 'ACCEPT']) self.assertFalse(result) def test_ensure_rule_inserts_when_not_present(self): calls = [] def fake_iptables(args, check=False): calls.append(args[0]) if args[0] == '-C': return _make_proc(returncode=1) return _make_proc(returncode=0) with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables): firewall_manager._ensure_rule('FORWARD', ['-j', 'ACCEPT']) self.assertIn('-I', calls) def test_ensure_rule_skips_insert_when_already_present(self): with patch.object(firewall_manager, '_iptables', return_value=_make_proc(returncode=0)) as mock_ipt: firewall_manager._ensure_rule('FORWARD', ['-j', 'ACCEPT']) # Only the -C check call was made self.assertEqual(mock_ipt.call_count, 1) def test_delete_rule_calls_delete_while_exists(self): check_count = [0] def fake_iptables(args, check=False): if args[0] == '-C': check_count[0] += 1 # Rule exists on first check, gone after first delete return _make_proc(returncode=0 if check_count[0] == 1 else 1) # -D delete call: return success return _make_proc(returncode=0) with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables): firewall_manager._delete_rule('FORWARD', ['-j', 'ACCEPT']) # Should have checked twice (once found, once not found) and deleted once self.assertEqual(check_count[0], 2) class TestReloadCoreDns(unittest.TestCase): def test_success_returns_true(self): with patch.object(firewall_manager, '_run', return_value=_make_proc(returncode=0)): result = firewall_manager.reload_coredns() self.assertTrue(result) def test_nonzero_returncode_returns_false(self): with patch.object(firewall_manager, '_run', return_value=_make_proc(returncode=1, stderr='not found')): result = firewall_manager.reload_coredns() self.assertFalse(result) def test_exception_returns_false(self): with patch.object(firewall_manager, '_run', side_effect=RuntimeError('no docker')): result = firewall_manager.reload_coredns() self.assertFalse(result) class TestApplyAllDnsRules(unittest.TestCase): def test_generates_corefile_and_calls_reload_on_success(self): with patch.object(firewall_manager, 'generate_corefile', return_value=True) as mock_gen, \ patch.object(firewall_manager, 'reload_coredns', return_value=True) as mock_reload: result = firewall_manager.apply_all_dns_rules([], '/tmp/Corefile') self.assertTrue(result) mock_gen.assert_called_once() mock_reload.assert_called_once() def test_does_not_call_reload_when_generate_fails(self): with patch.object(firewall_manager, 'generate_corefile', return_value=False), \ patch.object(firewall_manager, 'reload_coredns') as mock_reload: result = firewall_manager.apply_all_dns_rules([], '/tmp/Corefile') self.assertFalse(result) mock_reload.assert_not_called() class TestServiceTag(unittest.TestCase): def test_lowercase_and_replace_special_chars(self): tag = firewall_manager._service_tag('my-service_v2!') self.assertEqual(tag, 'pic-svc-my-service-v2-') def test_simple_id(self): tag = firewall_manager._service_tag('gitea') self.assertEqual(tag, 'pic-svc-gitea') class TestApplyServiceRules(unittest.TestCase): def test_applies_accept_rules_via_iptables(self): calls = [] def fake_iptables(args, check=False): calls.append(args) return _make_proc(returncode=0) rules = [{'type': 'ACCEPT', 'dest_ip': '10.20.0.5', 'dest_port': 80, 'proto': 'tcp'}] with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables), \ patch.object(firewall_manager, 'clear_service_rules'): result = firewall_manager.apply_service_rules('gitea', '10.20.0.5', rules) self.assertTrue(result) self.assertTrue(any('FORWARD' in str(c) for c in calls)) def test_skips_non_accept_rules(self): calls = [] rules = [{'type': 'DROP', 'dest_ip': '10.20.0.5', 'dest_port': 80, 'proto': 'tcp'}] with patch.object(firewall_manager, '_iptables', side_effect=lambda *a, **kw: calls.append(a) or _make_proc()), \ patch.object(firewall_manager, 'clear_service_rules'): result = firewall_manager.apply_service_rules('gitea', '10.20.0.5', rules) self.assertTrue(result) self.assertEqual(len(calls), 0) def test_service_ip_placeholder_substituted(self): captured = [] def fake_iptables(args, check=False): captured.extend(args) return _make_proc(returncode=0) rules = [{'type': 'ACCEPT', 'dest_ip': '${SERVICE_IP}', 'dest_port': 8080, 'proto': 'tcp'}] with patch.object(firewall_manager, '_iptables', side_effect=fake_iptables), \ patch.object(firewall_manager, 'clear_service_rules'): firewall_manager.apply_service_rules('app', '10.20.0.9', rules) self.assertIn('10.20.0.9', captured) class TestClearServiceRules(unittest.TestCase): def test_no_matching_rules_skips_restore(self): # iptables-save returns output with no matching tag save_proc = _make_proc(returncode=0, stdout='*filter\n-A FORWARD -j ACCEPT\nCOMMIT\n') with patch.object(firewall_manager, '_wg_exec', return_value=save_proc), \ patch('subprocess.run') as mock_restore: firewall_manager.clear_service_rules('nonexistent-svc') mock_restore.assert_not_called() def test_exception_is_logged_not_raised(self): with patch.object(firewall_manager, '_wg_exec', side_effect=RuntimeError('no docker')): # Should not raise firewall_manager.clear_service_rules('gitea') def test_save_failure_skips_restore(self): save_proc = _make_proc(returncode=1, stderr='failed') with patch.object(firewall_manager, '_wg_exec', return_value=save_proc), \ patch('subprocess.run') as mock_restore: firewall_manager.clear_service_rules('gitea') mock_restore.assert_not_called() if __name__ == '__main__': unittest.main()