feat: per-peer access enforcement, live peer status, auto IP assignment
Server-side access control: - firewall_manager.py: per-peer iptables FORWARD rules in WireGuard container; virtual IPs on Caddy (172.20.0.21-24) for per-service DROP/ACCEPT targeting - CoreDNS Corefile regenerated with ACL blocks for blocked services per peer - POST /api/wireguard/apply-enforcement re-applies rules after WireGuard restart; wg0.conf PostUp calls it via curl so rules restore automatically on container start WireGuard fixes: - _syncconf uses `wg set peer` instead of `wg syncconf` to avoid resetting ListenPort - add_peer validates AllowedIPs must be /32 — rejects full/split tunnel CIDRs that would route internet or LAN traffic to that peer - _config_file() checks for linuxserver wg_confs/ subdirectory first UI: - Peers page fetches /api/wireguard/peers/statuses for live handshake data; status badge now shows real Online/Offline + seconds since last handshake - IP field removed from Add Peer form (auto-assigned from 10.0.0.0/24) Tests (246 pass): - test_firewall_manager.py: 22 tests for ACL generation, iptables rule correctness, comment tagging, clear_peer_rules filter logic - test_peer_wg_integration.py: 10 tests for /32 enforcement, IP auto-assignment, syncconf called on add/remove - test_wireguard_manager.py: updated to reflect correct IPs and /32 requirement Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,275 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for firewall_manager — per-peer iptables rule generation and DNS ACL logic.
|
||||
All docker exec calls are mocked so tests run without a live Docker environment.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
import unittest
|
||||
from unittest.mock import patch, call, MagicMock
|
||||
from pathlib import Path
|
||||
|
||||
api_dir = Path(__file__).parent.parent / 'api'
|
||||
sys.path.insert(0, str(api_dir))
|
||||
|
||||
import firewall_manager
|
||||
|
||||
|
||||
def _make_peer(ip, internet=True, services=None, peers=True):
|
||||
if services is None:
|
||||
services = list(firewall_manager.SERVICE_IPS.keys())
|
||||
return {'ip': ip, 'internet_access': internet, 'service_access': services, 'peer_access': peers}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _peer_comment
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPeerComment(unittest.TestCase):
|
||||
def test_dots_replaced_with_dashes(self):
|
||||
self.assertEqual(firewall_manager._peer_comment('10.0.0.2'), 'pic-peer-10-0-0-2')
|
||||
|
||||
def test_different_ip(self):
|
||||
self.assertEqual(firewall_manager._peer_comment('192.168.1.100'), 'pic-peer-192-168-1-100')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_acl_block
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBuildAclBlock(unittest.TestCase):
|
||||
def test_empty_returns_empty_string(self):
|
||||
self.assertEqual(firewall_manager._build_acl_block({}), '')
|
||||
|
||||
def test_no_blocked_peers_returns_empty(self):
|
||||
blocked = {s: [] for s in firewall_manager.SERVICE_IPS}
|
||||
self.assertEqual(firewall_manager._build_acl_block(blocked), '')
|
||||
|
||||
def test_blocked_peer_appears_in_acl(self):
|
||||
blocked = {'calendar': ['10.0.0.5'], 'files': [], 'mail': [], 'webdav': []}
|
||||
result = firewall_manager._build_acl_block(blocked)
|
||||
self.assertIn('acl calendar.cell.', result)
|
||||
self.assertIn('block net 10.0.0.5/32', result)
|
||||
self.assertIn('allow net 0.0.0.0/0', result)
|
||||
|
||||
def test_unknown_service_skipped(self):
|
||||
blocked = {'nonexistent': ['10.0.0.2']}
|
||||
result = firewall_manager._build_acl_block(blocked)
|
||||
self.assertEqual(result, '')
|
||||
|
||||
def test_multiple_peers_blocked_from_same_service(self):
|
||||
blocked = {'mail': ['10.0.0.2', '10.0.0.3'], 'calendar': [], 'files': [], 'webdav': []}
|
||||
result = firewall_manager._build_acl_block(blocked)
|
||||
self.assertEqual(result.count('block net'), 2)
|
||||
self.assertIn('10.0.0.2/32', result)
|
||||
self.assertIn('10.0.0.3/32', result)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# generate_corefile
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGenerateCorefile(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp()
|
||||
self.path = os.path.join(self.tmp, 'Corefile')
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp)
|
||||
|
||||
def test_creates_corefile(self):
|
||||
firewall_manager.generate_corefile([], self.path)
|
||||
self.assertTrue(os.path.exists(self.path))
|
||||
|
||||
def test_contains_forward_and_cache(self):
|
||||
firewall_manager.generate_corefile([], self.path)
|
||||
content = open(self.path).read()
|
||||
self.assertIn('forward . 8.8.8.8', content)
|
||||
self.assertIn('cache', content)
|
||||
self.assertIn('cell {', content)
|
||||
|
||||
def test_no_blocked_services_no_acl_block(self):
|
||||
peers = [_make_peer('10.0.0.2', internet=True,
|
||||
services=list(firewall_manager.SERVICE_IPS.keys()))]
|
||||
firewall_manager.generate_corefile(peers, self.path)
|
||||
content = open(self.path).read()
|
||||
self.assertNotIn('block net', content)
|
||||
|
||||
def test_blocked_service_generates_acl(self):
|
||||
peers = [_make_peer('10.0.0.3', internet=False, services=['calendar'])]
|
||||
firewall_manager.generate_corefile(peers, self.path)
|
||||
content = open(self.path).read()
|
||||
# files/mail/webdav are blocked for this peer
|
||||
self.assertIn('block net 10.0.0.3/32', content)
|
||||
|
||||
def test_peer_with_all_services_allowed_no_acl(self):
|
||||
peers = [_make_peer('10.0.0.2', services=list(firewall_manager.SERVICE_IPS.keys()))]
|
||||
firewall_manager.generate_corefile(peers, self.path)
|
||||
self.assertNotIn('block net', open(self.path).read())
|
||||
|
||||
def test_returns_false_on_write_error(self):
|
||||
result = firewall_manager.generate_corefile([], '/nonexistent/path/Corefile')
|
||||
self.assertFalse(result)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# apply_peer_rules — iptables call verification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestApplyPeerRules(unittest.TestCase):
|
||||
"""Verify correct iptables calls for full-internet vs split-tunnel peers."""
|
||||
|
||||
def _run_apply(self, peer_ip, settings):
|
||||
calls_made = []
|
||||
|
||||
def fake_wg_exec(args):
|
||||
calls_made.append(args)
|
||||
m = MagicMock()
|
||||
m.returncode = 0
|
||||
m.stdout = ''
|
||||
return m
|
||||
|
||||
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec):
|
||||
firewall_manager.apply_peer_rules(peer_ip, settings)
|
||||
|
||||
return calls_made
|
||||
|
||||
def test_full_internet_peer_gets_accept_rule(self):
|
||||
calls = self._run_apply('10.0.0.2', {'internet_access': True,
|
||||
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
||||
'peer_access': True})
|
||||
iptables_calls = [c for c in calls if 'iptables' in c]
|
||||
targets = [c[c.index('-j') + 1] for c in iptables_calls if '-j' in c]
|
||||
# Full-internet peer: only ACCEPT rules (no DROP except iptables-restore clears)
|
||||
self.assertNotIn('DROP', targets)
|
||||
self.assertIn('ACCEPT', targets)
|
||||
|
||||
def test_no_internet_peer_gets_drop_rule(self):
|
||||
calls = self._run_apply('10.0.0.3', {'internet_access': False,
|
||||
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
||||
'peer_access': True})
|
||||
iptables_calls = [c for c in calls if 'iptables' in c]
|
||||
targets = [c[c.index('-j') + 1] for c in iptables_calls if '-j' in c]
|
||||
self.assertIn('DROP', targets)
|
||||
self.assertIn('ACCEPT', targets)
|
||||
|
||||
def test_service_access_restriction_generates_drop(self):
|
||||
calls = self._run_apply('10.0.0.4', {'internet_access': False,
|
||||
'service_access': ['calendar'],
|
||||
'peer_access': True})
|
||||
iptables_calls = [c for c in calls if 'iptables' in c]
|
||||
# files/mail/webdav should be DROPped, calendar ACCEPTed
|
||||
targets_with_ips = [
|
||||
(c[c.index('-d') + 1], c[c.index('-j') + 1])
|
||||
for c in iptables_calls
|
||||
if '-d' in c and '-j' in c
|
||||
]
|
||||
svc_rules = {ip: t for ip, t in targets_with_ips
|
||||
if ip in firewall_manager.SERVICE_IPS.values()}
|
||||
calendar_ip = firewall_manager.SERVICE_IPS['calendar']
|
||||
files_ip = firewall_manager.SERVICE_IPS['files']
|
||||
self.assertEqual(svc_rules.get(calendar_ip), 'ACCEPT')
|
||||
self.assertEqual(svc_rules.get(files_ip), 'DROP')
|
||||
|
||||
def test_all_rules_tagged_with_peer_comment(self):
|
||||
calls = self._run_apply('10.0.0.2', {'internet_access': True,
|
||||
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
||||
'peer_access': True})
|
||||
iptables_calls = [c for c in calls if 'iptables' in c]
|
||||
comment = firewall_manager._peer_comment('10.0.0.2')
|
||||
for c in iptables_calls:
|
||||
if '-I' in c: # only insertion rules need the comment
|
||||
self.assertIn(comment, c, f"Rule missing comment tag: {c}")
|
||||
|
||||
def test_peer_with_no_peer_access_gets_drop_for_vpn_subnet(self):
|
||||
calls = self._run_apply('10.0.0.5', {'internet_access': True,
|
||||
'service_access': list(firewall_manager.SERVICE_IPS.keys()),
|
||||
'peer_access': False})
|
||||
iptables_calls = [c for c in calls if 'iptables' in c]
|
||||
vpn_rules = [c for c in iptables_calls if '-d' in c and '10.0.0.0/24' in c]
|
||||
self.assertTrue(vpn_rules, "Expected a rule for 10.0.0.0/24")
|
||||
for c in vpn_rules:
|
||||
self.assertIn('DROP', c)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# apply_all_peer_rules
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestApplyAllPeerRules(unittest.TestCase):
|
||||
def test_calls_apply_per_peer(self):
|
||||
peers = [_make_peer('10.0.0.2'), _make_peer('10.0.0.3', internet=False)]
|
||||
with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \
|
||||
patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply:
|
||||
firewall_manager.apply_all_peer_rules(peers)
|
||||
self.assertEqual(mock_apply.call_count, 2)
|
||||
called_ips = {c.args[0] for c in mock_apply.call_args_list}
|
||||
self.assertEqual(called_ips, {'10.0.0.2', '10.0.0.3'})
|
||||
|
||||
def test_peer_without_ip_is_skipped(self):
|
||||
peers = [{'internet_access': True}, _make_peer('10.0.0.2')]
|
||||
with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \
|
||||
patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply:
|
||||
firewall_manager.apply_all_peer_rules(peers)
|
||||
self.assertEqual(mock_apply.call_count, 1)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# clear_peer_rules
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestClearPeerRules(unittest.TestCase):
|
||||
def test_removes_only_matching_comment_lines(self):
|
||||
save_output = (
|
||||
'*filter\n'
|
||||
':INPUT ACCEPT [0:0]\n'
|
||||
':FORWARD ACCEPT [0:0]\n'
|
||||
'-A FORWARD -s 10.0.0.2 -m comment --comment pic-peer-10-0-0-2 -j ACCEPT\n'
|
||||
'-A FORWARD -s 10.0.0.3 -m comment --comment pic-peer-10-0-0-3 -j DROP\n'
|
||||
'COMMIT\n'
|
||||
)
|
||||
restored = []
|
||||
|
||||
def fake_wg_exec(args):
|
||||
m = MagicMock()
|
||||
m.returncode = 0
|
||||
if args == ['iptables-save']:
|
||||
m.stdout = save_output
|
||||
return m
|
||||
|
||||
def fake_restore(cmd, input, **kwargs):
|
||||
restored.append(input)
|
||||
m = MagicMock()
|
||||
m.returncode = 0
|
||||
return m
|
||||
|
||||
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
||||
patch('subprocess.run', side_effect=fake_restore):
|
||||
firewall_manager.clear_peer_rules('10.0.0.2')
|
||||
|
||||
self.assertEqual(len(restored), 1)
|
||||
restored_content = restored[0]
|
||||
self.assertNotIn('pic-peer-10-0-0-2', restored_content)
|
||||
self.assertIn('pic-peer-10-0-0-3', restored_content)
|
||||
|
||||
def test_no_op_when_no_matching_rules(self):
|
||||
save_output = '*filter\n:FORWARD ACCEPT [0:0]\nCOMMIT\n'
|
||||
|
||||
def fake_wg_exec(args):
|
||||
m = MagicMock()
|
||||
m.returncode = 0
|
||||
m.stdout = save_output
|
||||
return m
|
||||
|
||||
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
||||
patch('subprocess.run') as mock_restore:
|
||||
firewall_manager.clear_peer_rules('10.0.0.99')
|
||||
|
||||
mock_restore.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -0,0 +1,124 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for peer add/remove flow — ensures server-side WireGuard AllowedIPs
|
||||
are always the peer's /32 VPN IP, never the client tunnel AllowedIPs.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
api_dir = Path(__file__).parent.parent / 'api'
|
||||
sys.path.insert(0, str(api_dir))
|
||||
|
||||
from wireguard_manager import WireGuardManager
|
||||
from peer_registry import PeerRegistry
|
||||
|
||||
|
||||
class TestServerSideAllowedIPs(unittest.TestCase):
|
||||
"""Server-side peer AllowedIPs must always be peer_ip/32."""
|
||||
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp()
|
||||
self.data_dir = os.path.join(self.tmp, 'data')
|
||||
self.config_dir = os.path.join(self.tmp, 'config')
|
||||
os.makedirs(self.data_dir)
|
||||
os.makedirs(self.config_dir)
|
||||
# Patch syncconf so tests don't need docker
|
||||
patcher = patch.object(WireGuardManager, '_syncconf', return_value=None)
|
||||
self.mock_sync = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
self.wg = WireGuardManager(self.data_dir, self.config_dir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp)
|
||||
|
||||
def _config(self):
|
||||
with open(self.wg._config_file()) as f:
|
||||
return f.read()
|
||||
|
||||
def test_add_peer_uses_host_slash32(self):
|
||||
"""Peer added with /32 stays as /32 in config."""
|
||||
self.wg.add_peer('alice', 'ALICEPUBKEY=', '', allowed_ips='10.0.0.2/32')
|
||||
cfg = self._config()
|
||||
self.assertIn('AllowedIPs = 10.0.0.2/32', cfg)
|
||||
|
||||
def test_full_tunnel_client_ips_rejected(self):
|
||||
"""add_peer must refuse 0.0.0.0/0 — it would route all internet traffic to that peer."""
|
||||
result = self.wg.add_peer('bob', 'BOBPUBKEY=', '', allowed_ips='0.0.0.0/0, ::/0')
|
||||
self.assertFalse(result,
|
||||
"0.0.0.0/0 in server peer AllowedIPs routes ALL traffic to that peer, breaking internet")
|
||||
|
||||
def test_split_tunnel_client_ips_rejected(self):
|
||||
"""add_peer must refuse 172.20.0.0/16 — it would route docker network to that peer."""
|
||||
result = self.wg.add_peer('carol', 'CAROLPUBKEY=', '', allowed_ips='10.0.0.0/24, 172.20.0.0/16')
|
||||
self.assertFalse(result,
|
||||
"172.20.0.0/16 in server peer AllowedIPs routes docker network traffic to that peer")
|
||||
|
||||
def test_remove_peer_cleans_config(self):
|
||||
self.wg.add_peer('dave', 'DAVEPUBKEY=', '', allowed_ips='10.0.0.4/32')
|
||||
self.wg.remove_peer('DAVEPUBKEY=')
|
||||
cfg = self._config()
|
||||
self.assertNotIn('DAVEPUBKEY=', cfg)
|
||||
|
||||
def test_syncconf_called_on_add(self):
|
||||
self.wg.add_peer('eve', 'EVEPUBKEY=', '', allowed_ips='10.0.0.5/32')
|
||||
self.mock_sync.assert_called()
|
||||
|
||||
def test_syncconf_called_on_remove(self):
|
||||
self.wg.add_peer('frank', 'FRANKPUBKEY=', '', allowed_ips='10.0.0.6/32')
|
||||
self.mock_sync.reset_mock()
|
||||
self.wg.remove_peer('FRANKPUBKEY=')
|
||||
self.mock_sync.assert_called()
|
||||
|
||||
|
||||
class TestAutoAssignIP(unittest.TestCase):
|
||||
"""Auto-assigned peer IPs must be unique /32s starting at 10.0.0.2."""
|
||||
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp()
|
||||
self.registry = PeerRegistry(data_dir=self.tmp, config_dir=self.tmp)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp)
|
||||
|
||||
def _next_ip(self):
|
||||
import ipaddress
|
||||
used = {p.get('ip', '').split('/')[0] for p in self.registry.list_peers()}
|
||||
for host in ipaddress.ip_network('10.0.0.0/24').hosts():
|
||||
ip = str(host)
|
||||
if ip != '10.0.0.1' and ip not in used:
|
||||
return ip
|
||||
raise ValueError('No free IPs')
|
||||
|
||||
def test_first_peer_gets_10_0_0_2(self):
|
||||
ip = self._next_ip()
|
||||
self.assertEqual(ip, '10.0.0.2')
|
||||
|
||||
def test_second_peer_gets_10_0_0_3(self):
|
||||
self.registry.add_peer({'peer': 'p1', 'ip': '10.0.0.2'})
|
||||
ip = self._next_ip()
|
||||
self.assertEqual(ip, '10.0.0.3')
|
||||
|
||||
def test_no_duplicate_ips(self):
|
||||
assigned = []
|
||||
for i in range(5):
|
||||
ip = self._next_ip()
|
||||
self.assertNotIn(ip, assigned, f"Duplicate IP assigned: {ip}")
|
||||
assigned.append(ip)
|
||||
self.registry.add_peer({'peer': f'peer{i}', 'ip': ip})
|
||||
|
||||
def test_server_ip_never_assigned(self):
|
||||
# Fill up .2 through .10
|
||||
for i in range(2, 11):
|
||||
self.registry.add_peer({'peer': f'p{i}', 'ip': f'10.0.0.{i}'})
|
||||
ip = self._next_ip()
|
||||
self.assertNotEqual(ip, '10.0.0.1', "Server IP 10.0.0.1 must never be assigned to a peer")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -26,7 +26,7 @@ from wireguard_manager import WireGuardManager
|
||||
|
||||
class TestWireGuardManager(unittest.TestCase):
|
||||
"""Test cases for WireGuardManager class"""
|
||||
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment"""
|
||||
self.test_dir = tempfile.mkdtemp()
|
||||
@@ -34,10 +34,14 @@ class TestWireGuardManager(unittest.TestCase):
|
||||
self.config_dir = os.path.join(self.test_dir, 'config')
|
||||
os.makedirs(self.data_dir, exist_ok=True)
|
||||
os.makedirs(self.config_dir, exist_ok=True)
|
||||
|
||||
|
||||
patcher = patch.object(WireGuardManager, '_syncconf', return_value=None)
|
||||
self.mock_sync = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
# Create WireGuardManager instance
|
||||
self.wg_manager = WireGuardManager(self.data_dir, self.config_dir)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up test environment"""
|
||||
shutil.rmtree(self.test_dir)
|
||||
@@ -100,54 +104,51 @@ class TestWireGuardManager(unittest.TestCase):
|
||||
def test_generate_config(self):
|
||||
"""Test WireGuard configuration generation"""
|
||||
config = self.wg_manager.generate_config('wg0', 51820)
|
||||
|
||||
|
||||
self.assertIsInstance(config, str)
|
||||
self.assertIn('[Interface]', config)
|
||||
self.assertIn('PrivateKey', config)
|
||||
self.assertIn('Address = 172.20.0.1/16', config)
|
||||
self.assertIn('Address = 10.0.0.1/24', config)
|
||||
self.assertIn('ListenPort = 51820', config)
|
||||
self.assertIn('PostUp', config)
|
||||
self.assertIn('PostDown', config)
|
||||
|
||||
def test_add_peer(self):
|
||||
"""Test adding a peer to WireGuard configuration"""
|
||||
# Generate peer keys first
|
||||
"""Test adding a peer — server-side AllowedIPs must be /32."""
|
||||
peer_keys = self.wg_manager.generate_peer_keys('testpeer')
|
||||
|
||||
|
||||
success = self.wg_manager.add_peer(
|
||||
'testpeer',
|
||||
peer_keys['public_key'],
|
||||
'192.168.1.100',
|
||||
'172.20.0.0/16',
|
||||
'',
|
||||
'10.0.0.2/32',
|
||||
25
|
||||
)
|
||||
|
||||
|
||||
self.assertTrue(success)
|
||||
|
||||
# Check if config file was created
|
||||
config_file = os.path.join(self.wg_manager.wireguard_dir, 'wg0.conf')
|
||||
|
||||
config_file = self.wg_manager._config_file()
|
||||
self.assertTrue(os.path.exists(config_file))
|
||||
|
||||
# Check config content
|
||||
|
||||
with open(config_file, 'r') as f:
|
||||
config = f.read()
|
||||
self.assertIn('[Peer]', config)
|
||||
self.assertIn(peer_keys['public_key'], config)
|
||||
self.assertIn('AllowedIPs = 172.20.0.0/16', config)
|
||||
self.assertIn('AllowedIPs = 10.0.0.2/32', config)
|
||||
self.assertIn('PersistentKeepalive = 25', config)
|
||||
|
||||
def test_remove_peer(self):
|
||||
"""Test removing a peer from WireGuard configuration"""
|
||||
# Add a peer first
|
||||
peer_keys = self.wg_manager.generate_peer_keys('testpeer')
|
||||
self.wg_manager.add_peer('testpeer', peer_keys['public_key'], '192.168.1.100')
|
||||
self.wg_manager.add_peer('testpeer', peer_keys['public_key'], '', '10.0.0.2/32')
|
||||
|
||||
# Remove the peer
|
||||
success = self.wg_manager.remove_peer(peer_keys['public_key'])
|
||||
self.assertTrue(success)
|
||||
|
||||
# Check if peer was removed
|
||||
config_file = os.path.join(self.wg_manager.wireguard_dir, 'wg0.conf')
|
||||
config_file = self.wg_manager._config_file()
|
||||
with open(config_file, 'r') as f:
|
||||
config = f.read()
|
||||
self.assertNotIn(peer_keys['public_key'], config)
|
||||
@@ -156,7 +157,7 @@ class TestWireGuardManager(unittest.TestCase):
|
||||
"""Test getting list of configured peers"""
|
||||
# Add a peer first
|
||||
peer_keys = self.wg_manager.generate_peer_keys('testpeer')
|
||||
self.wg_manager.add_peer('testpeer', peer_keys['public_key'], '192.168.1.100')
|
||||
self.wg_manager.add_peer('testpeer', peer_keys['public_key'], '', '10.0.0.2/32')
|
||||
|
||||
peers = self.wg_manager.get_peers()
|
||||
|
||||
@@ -221,46 +222,40 @@ class TestWireGuardManager(unittest.TestCase):
|
||||
|
||||
def test_update_peer_ip(self):
|
||||
"""Test updating peer IP address"""
|
||||
# Add a peer first
|
||||
peer_keys = self.wg_manager.generate_peer_keys('testpeer')
|
||||
self.wg_manager.add_peer('testpeer', peer_keys['public_key'], '192.168.1.100')
|
||||
|
||||
# Update peer IP
|
||||
success = self.wg_manager.update_peer_ip(peer_keys['public_key'], '192.168.1.200')
|
||||
self.wg_manager.add_peer('testpeer', peer_keys['public_key'], '', '10.0.0.2/32')
|
||||
|
||||
success = self.wg_manager.update_peer_ip(peer_keys['public_key'], '10.0.0.9/32')
|
||||
self.assertTrue(success)
|
||||
|
||||
# Check if IP was updated in config
|
||||
config_file = os.path.join(self.wg_manager.wireguard_dir, 'wg0.conf')
|
||||
with open(config_file, 'r') as f:
|
||||
|
||||
with open(self.wg_manager._config_file(), 'r') as f:
|
||||
config = f.read()
|
||||
self.assertIn('192.168.1.200', config)
|
||||
self.assertIn('10.0.0.9/32', config)
|
||||
|
||||
def test_get_peer_config(self):
|
||||
"""Test generating peer configuration"""
|
||||
"""Test generating peer client configuration."""
|
||||
peer_keys = self.wg_manager.generate_peer_keys('testpeer')
|
||||
keys = self.wg_manager.get_keys()
|
||||
|
||||
config = self.wg_manager.get_peer_config('testpeer', '192.168.1.100', peer_keys['private_key'])
|
||||
|
||||
|
||||
config = self.wg_manager.get_peer_config('testpeer', '10.0.0.2', peer_keys['private_key'])
|
||||
|
||||
self.assertIsInstance(config, str)
|
||||
self.assertIn('[Interface]', config)
|
||||
self.assertIn('[Peer]', config)
|
||||
self.assertIn('PrivateKey', config)
|
||||
self.assertIn('Address = 192.168.1.100/32', config)
|
||||
self.assertIn('DNS = 172.20.0.2', config)
|
||||
self.assertIn('Address = 10.0.0.2/32', config)
|
||||
self.assertIn('DNS = 172.20.0.3', config)
|
||||
self.assertIn(keys['public_key'], config)
|
||||
self.assertIn('AllowedIPs = 172.20.0.0/16', config)
|
||||
self.assertIn('AllowedIPs', config)
|
||||
|
||||
def test_multiple_peers(self):
|
||||
"""Test managing multiple peers"""
|
||||
# Add first peer
|
||||
peer1_keys = self.wg_manager.generate_peer_keys('peer1')
|
||||
success1 = self.wg_manager.add_peer('peer1', peer1_keys['public_key'], '192.168.1.100')
|
||||
success1 = self.wg_manager.add_peer('peer1', peer1_keys['public_key'], '', '10.0.0.2/32')
|
||||
self.assertTrue(success1)
|
||||
|
||||
# Add second peer
|
||||
|
||||
peer2_keys = self.wg_manager.generate_peer_keys('peer2')
|
||||
success2 = self.wg_manager.add_peer('peer2', peer2_keys['public_key'], '192.168.1.101')
|
||||
success2 = self.wg_manager.add_peer('peer2', peer2_keys['public_key'], '', '10.0.0.3/32')
|
||||
self.assertTrue(success2)
|
||||
|
||||
# Get peers
|
||||
@@ -310,18 +305,21 @@ PersistentKeepalive = 30
|
||||
self.assertEqual(peers[1]['persistent_keepalive'], 30)
|
||||
|
||||
def test_error_handling(self):
|
||||
"""Test error handling in WireGuard operations"""
|
||||
# Test with invalid public key
|
||||
success = self.wg_manager.add_peer('testpeer', 'invalid_key', '192.168.1.100')
|
||||
# Should still return True as it writes to config file
|
||||
"""Test error handling in WireGuard operations."""
|
||||
# Wide CIDR rejected — server-side AllowedIPs must be /32
|
||||
success = self.wg_manager.add_peer('testpeer', 'invalid_key', '', '172.20.0.0/16')
|
||||
self.assertFalse(success, "Wide CIDR must be rejected")
|
||||
|
||||
# Valid /32 with any key string is accepted (key format not validated at this layer)
|
||||
success = self.wg_manager.add_peer('testpeer', 'any_key_string=', '', '10.0.0.2/32')
|
||||
self.assertTrue(success)
|
||||
|
||||
# Test removing non-existent peer
|
||||
|
||||
# Removing non-existent peer is a no-op, not an error
|
||||
success = self.wg_manager.remove_peer('non_existent_key')
|
||||
self.assertTrue(success)
|
||||
|
||||
# Test updating non-existent peer IP
|
||||
success = self.wg_manager.update_peer_ip('non_existent_key', '192.168.1.200')
|
||||
|
||||
# Updating IP for peer not in config returns False
|
||||
success = self.wg_manager.update_peer_ip('non_existent_key', '10.0.0.9/32')
|
||||
self.assertFalse(success)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Reference in New Issue
Block a user