#!/usr/bin/env python3 """ Tests for WireGuard VPN routing: internet access and DNS resolution through tunnel. Scenarios covered: 1. generate_config() produces PostUp/PostDown rules that enable internet forwarding (MASQUERADE + FORWARD ACCEPT are the two iptables rules that make "internet through VPN" work — without them, packets from 10.0.0.x are not NATted to eth0). 2. get_peer_config() sets DNS = so clients resolve domain names through the PIC DNS container, not their local ISP resolver. 3. apply_config() bootstrap path (empty wg0.conf) restores all active peers from peers.json so clients can reconnect after an API restart that regenerated the file. 4. _load_registered_peers() correctly filters peers.json. 5. add_peer() writes a /32 AllowedIPs entry so routing targets only that client. """ import sys import os import json import shutil import tempfile import unittest from pathlib import Path from unittest.mock import patch, MagicMock api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) from wireguard_manager import WireGuardManager, _resolve_peer_dns # A syntactically-valid WireGuard base64 public key (44 chars, ends with =). FAKE_PUBKEY = 'O35JY6nc8sb9zEarZYZVl70jno/J9dRyiB37YSYy4nA=' FAKE_PUBKEY2 = 'AbCdEfGhIjKlMnOpQrStUvWxYz0123456789ABCDEFG=' def _make_wg(tmp: str) -> WireGuardManager: """Build a WireGuardManager rooted in *tmp*, with _syncconf disabled.""" with patch.object(WireGuardManager, '_syncconf', return_value=None): wg = WireGuardManager(tmp, tmp) return wg # ── 1. Internet forwarding rules in generate_config() ───────────────────────── class TestInternetForwardingRules(unittest.TestCase): """ Verify that generate_config() emits the exact iptables rules required for 'internet through VPN': MASQUERADE on eth0 (outbound NAT) and FORWARD ACCEPT on the wg0 interface. Missing either rule means VPN clients get no internet. """ def setUp(self): self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) with patch.object(WireGuardManager, '_syncconf', return_value=None): self.wg = WireGuardManager(self.tmp, self.tmp) def test_postup_has_masquerade_on_eth0(self): """MASQUERADE on eth0 NATs VPN-subnet packets so internet routers see the host IP.""" cfg = self.wg.generate_config() self.assertIn('POSTROUTING -o eth0 -j MASQUERADE', cfg) def test_postup_has_forward_accept_on_wg_interface(self): """FORWARD ACCEPT allows packets from the WireGuard interface through the kernel.""" cfg = self.wg.generate_config() self.assertIn('FORWARD -i %i -j ACCEPT', cfg) def test_postdown_removes_masquerade_rule(self): """PostDown must mirror PostUp so rules are cleaned up when the tunnel goes down.""" cfg = self.wg.generate_config() self.assertIn('POSTROUTING -o eth0 -j MASQUERADE', cfg.split('PostDown')[1]) def test_postdown_removes_forward_rule(self): cfg = self.wg.generate_config() self.assertIn('FORWARD -i %i -j ACCEPT', cfg.split('PostDown')[1]) def test_postup_and_postdown_are_present(self): """Both PostUp and PostDown must exist — PostUp without PostDown leaks rules.""" cfg = self.wg.generate_config() self.assertIn('PostUp', cfg) self.assertIn('PostDown', cfg) def test_masquerade_is_in_postup_not_only_postdown(self): """MASQUERADE must appear in PostUp (adding the rule), not only PostDown.""" cfg = self.wg.generate_config() postup_section = cfg.split('PostUp')[1].split('PostDown')[0] self.assertIn('MASQUERADE', postup_section) # ── 2. DNS resolution: get_peer_config() sets DNS field ─────────────────────── class TestPeerConfigDns(unittest.TestCase): """ Verify that peer client configs include a DNS = line pointing to the PIC DNS container. Without DNS, the client tunnel has no internet-accessible domain resolution even though packets are forwarded correctly. """ def setUp(self): self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) with patch.object(WireGuardManager, '_syncconf', return_value=None): self.wg = WireGuardManager(self.tmp, self.tmp) def test_peer_config_contains_dns_line(self): keys = self.wg.generate_peer_keys('testpeer') cfg = self.wg.get_peer_config('testpeer', '10.0.0.2', keys['private_key']) self.assertIn('DNS =', cfg) def test_peer_config_dns_is_valid_ip(self): import ipaddress keys = self.wg.generate_peer_keys('testpeer') cfg = self.wg.get_peer_config('testpeer', '10.0.0.2', keys['private_key']) dns_line = next(l for l in cfg.splitlines() if l.startswith('DNS =')) dns_ip = dns_line.split('=', 1)[1].strip() # Must be a parseable IPv4 address ipaddress.IPv4Address(dns_ip) def test_peer_config_dns_defaults_to_cell_dns_ip(self): """When cell-dns hostname can't be resolved, falls back to 172.20.0.3.""" with patch('wireguard_manager.socket.gethostbyname', side_effect=OSError): keys = self.wg.generate_peer_keys('p1') cfg = self.wg.get_peer_config('p1', '10.0.0.5', keys['private_key']) self.assertIn('DNS = 172.20.0.3', cfg) def test_peer_config_dns_uses_resolved_hostname(self): """When cell-dns resolves, its IP is used as the DNS server.""" with patch('wireguard_manager.socket.gethostbyname', return_value='172.20.0.3'): keys = self.wg.generate_peer_keys('p2') cfg = self.wg.get_peer_config('p2', '10.0.0.6', keys['private_key']) self.assertIn('DNS = 172.20.0.3', cfg) def test_resolve_peer_dns_fallback(self): """_resolve_peer_dns() always returns a string even when DNS lookup fails.""" with patch('wireguard_manager.socket.gethostbyname', side_effect=OSError): result = _resolve_peer_dns() self.assertIsInstance(result, str) self.assertEqual(result, '172.20.0.3') def test_peer_config_allowed_ips_default_full_tunnel(self): """Default AllowedIPs = 0.0.0.0/0 routes all traffic (including internet) through VPN.""" keys = self.wg.generate_peer_keys('p3') cfg = self.wg.get_peer_config('p3', '10.0.0.7', keys['private_key']) # Full tunnel: 0.0.0.0/0 means all traffic goes through the VPN self.assertIn('0.0.0.0/0', cfg) # ── 3. Bootstrap restores peers from peers.json ─────────────────────────────── class TestApplyConfigBootstrapRestoresPeers(unittest.TestCase): """ apply_config() is called when the WireGuard port changes. If wg0.conf is empty or missing [Interface], it bootstraps from generate_config() — which only generates the [Interface] section and loses all [Peer] blocks. The fix: after bootstrap, load active peers from peers.json and restore their [Peer] blocks so clients can reconnect without manual intervention. """ def _make_wg_with_conf(self, conf_content: str = '') -> tuple: tmp = tempfile.mkdtemp() with patch.object(WireGuardManager, '_syncconf', return_value=None): wg = WireGuardManager(tmp, tmp) # Ensure wg_confs/ dir and write the file cf = wg._config_file() os.makedirs(os.path.dirname(cf), exist_ok=True) with open(cf, 'w') as f: f.write(conf_content) return wg, cf, tmp def _write_peers_json(self, wg: WireGuardManager, peers: list): peers_file = os.path.join(wg.data_dir, 'peers.json') with open(peers_file, 'w') as f: json.dump(peers, f) def tearDown(self): pass # each test manages its own tmp def test_empty_conf_triggers_bootstrap(self): wg, cf, tmp = self._make_wg_with_conf('') try: self._write_peers_json(wg, []) with patch.object(wg, 'get_external_ip', return_value=None), \ patch('subprocess.run'): result = wg.apply_config({'port': 51820}) self.assertIn('wg0.conf was empty — regenerated from keys', result['warnings']) finally: shutil.rmtree(tmp) def test_bootstrap_restores_active_peer(self): """After bootstrap on empty conf, active peer from peers.json appears in wg0.conf.""" wg, cf, tmp = self._make_wg_with_conf('') try: self._write_peers_json(wg, [{ 'peer': 'user1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True, }]) with patch.object(wg, 'get_external_ip', return_value=None), \ patch('subprocess.run'): wg.apply_config({'port': 51820}) content = open(cf).read() self.assertIn('[Peer]', content) self.assertIn(FAKE_PUBKEY, content) self.assertIn('AllowedIPs = 10.0.0.2/32', content) finally: shutil.rmtree(tmp) def test_bootstrap_restores_multiple_peers(self): wg, cf, tmp = self._make_wg_with_conf('') try: self._write_peers_json(wg, [ {'peer': 'peer1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True}, {'peer': 'peer2', 'ip': '10.0.0.3', 'public_key': FAKE_PUBKEY2, 'active': True}, ]) with patch.object(wg, 'get_external_ip', return_value=None), \ patch('subprocess.run'): wg.apply_config({'port': 51820}) content = open(cf).read() self.assertIn(FAKE_PUBKEY, content) self.assertIn(FAKE_PUBKEY2, content) self.assertEqual(content.count('[Peer]'), 2) finally: shutil.rmtree(tmp) def test_bootstrap_skips_inactive_peers(self): """Inactive peers (active=False) must NOT be restored to wg0.conf.""" wg, cf, tmp = self._make_wg_with_conf('') try: self._write_peers_json(wg, [ {'peer': 'active', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True}, {'peer': 'inactive', 'ip': '10.0.0.3', 'public_key': FAKE_PUBKEY2, 'active': False}, ]) with patch.object(wg, 'get_external_ip', return_value=None), \ patch('subprocess.run'): wg.apply_config({'port': 51820}) content = open(cf).read() self.assertIn(FAKE_PUBKEY, content) self.assertNotIn(FAKE_PUBKEY2, content) finally: shutil.rmtree(tmp) def test_bootstrap_skips_peer_missing_public_key(self): wg, cf, tmp = self._make_wg_with_conf('') try: self._write_peers_json(wg, [ {'peer': 'nok', 'ip': '10.0.0.2', 'active': True}, # no public_key ]) with patch.object(wg, 'get_external_ip', return_value=None), \ patch('subprocess.run'): wg.apply_config({'port': 51820}) content = open(cf).read() self.assertEqual(content.count('[Peer]'), 0) finally: shutil.rmtree(tmp) def test_bootstrap_skips_peer_missing_ip(self): wg, cf, tmp = self._make_wg_with_conf('') try: self._write_peers_json(wg, [ {'peer': 'nok', 'public_key': FAKE_PUBKEY, 'active': True}, # no ip ]) with patch.object(wg, 'get_external_ip', return_value=None), \ patch('subprocess.run'): wg.apply_config({'port': 51820}) content = open(cf).read() self.assertNotIn(FAKE_PUBKEY, content) finally: shutil.rmtree(tmp) def test_existing_conf_with_interface_not_bootstrapped(self): """If [Interface] is present, bootstrap must NOT run — existing peers are preserved.""" wg, cf, tmp = self._make_wg_with_conf( '[Interface]\nListenPort = 51820\nPrivateKey = dummykey\n' '\n[Peer]\n# existing\nPublicKey = ' + FAKE_PUBKEY + '\nAllowedIPs = 10.0.0.2/32\n' ) try: with patch.object(wg, 'get_external_ip', return_value=None), \ patch('subprocess.run'): result = wg.apply_config({'port': 51821}) self.assertNotIn('wg0.conf was empty — regenerated from keys', result['warnings']) # Original peer must still be there after port-only change self.assertIn(FAKE_PUBKEY, open(cf).read()) finally: shutil.rmtree(tmp) def test_restored_peers_have_slash32_allowed_ips(self): """/32 is mandatory: a wider mask would route internet traffic to the wrong peer.""" wg, cf, tmp = self._make_wg_with_conf('') try: self._write_peers_json(wg, [ {'peer': 'user1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True}, ]) with patch.object(wg, 'get_external_ip', return_value=None), \ patch('subprocess.run'): wg.apply_config({'port': 51820}) content = open(cf).read() # Must be /32, not /24 or /0 self.assertIn('AllowedIPs = 10.0.0.2/32', content) self.assertNotIn('AllowedIPs = 10.0.0.2/24', content) finally: shutil.rmtree(tmp) # ── 4. _load_registered_peers() ─────────────────────────────────────────────── class TestLoadRegisteredPeers(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) with patch.object(WireGuardManager, '_syncconf', return_value=None): self.wg = WireGuardManager(self.tmp, self.tmp) def _write_peers(self, peers: list): path = os.path.join(self.wg.data_dir, 'peers.json') with open(path, 'w') as f: json.dump(peers, f) def test_returns_empty_list_when_file_missing(self): self.assertEqual(self.wg._load_registered_peers(), []) def test_returns_empty_list_on_malformed_json(self): path = os.path.join(self.wg.data_dir, 'peers.json') with open(path, 'w') as f: f.write('not json {{{') self.assertEqual(self.wg._load_registered_peers(), []) def test_returns_active_peers(self): self._write_peers([ {'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True}, ]) result = self.wg._load_registered_peers() self.assertEqual(len(result), 1) self.assertEqual(result[0]['public_key'], FAKE_PUBKEY) def test_filters_out_inactive_peers(self): self._write_peers([ {'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True}, {'peer': 'u2', 'ip': '10.0.0.3', 'public_key': FAKE_PUBKEY2, 'active': False}, ]) result = self.wg._load_registered_peers() self.assertEqual(len(result), 1) self.assertEqual(result[0]['public_key'], FAKE_PUBKEY) def test_filters_out_peers_without_public_key(self): self._write_peers([ {'peer': 'u1', 'ip': '10.0.0.2', 'active': True}, ]) self.assertEqual(self.wg._load_registered_peers(), []) def test_filters_out_peers_without_ip(self): self._write_peers([ {'peer': 'u1', 'public_key': FAKE_PUBKEY, 'active': True}, ]) self.assertEqual(self.wg._load_registered_peers(), []) def test_treats_missing_active_field_as_active(self): """Peers without 'active' key should be treated as active (default True).""" self._write_peers([ {'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY}, ]) result = self.wg._load_registered_peers() self.assertEqual(len(result), 1) def test_skips_non_dict_entries(self): self._write_peers([ 'not_a_dict', {'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True}, ]) result = self.wg._load_registered_peers() self.assertEqual(len(result), 1) def test_returns_all_required_fields(self): self._write_peers([ {'peer': 'u1', 'ip': '10.0.0.5', 'public_key': FAKE_PUBKEY, 'active': True}, ]) result = self.wg._load_registered_peers() self.assertIn('ip', result[0]) self.assertIn('public_key', result[0]) # ── 5. add_peer() writes correct server-side AllowedIPs ─────────────────────── class TestAddPeerServerSideAllowedIps(unittest.TestCase): """ Server-side AllowedIPs must be a /32 host address matching the peer's VPN IP. Wider masks (e.g. 0.0.0.0/0) would route internet traffic from all other clients to that single peer, breaking internet access for everyone else. """ def setUp(self): self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) with patch.object(WireGuardManager, '_syncconf', return_value=None): self.wg = WireGuardManager(self.tmp, self.tmp) def test_add_peer_writes_slash32_allowed_ips(self): ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.2/32') self.assertTrue(ok) content = open(self.wg._config_file()).read() self.assertIn('AllowedIPs = 10.0.0.2/32', content) def test_add_peer_rejects_full_tunnel_allowed_ips(self): """0.0.0.0/0 as server AllowedIPs is invalid and must be rejected.""" ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '0.0.0.0/0') self.assertFalse(ok) def test_add_peer_rejects_subnet_allowed_ips(self): """10.0.0.0/24 as server AllowedIPs is invalid and must be rejected.""" ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.0/24') self.assertFalse(ok) def test_add_peer_does_not_write_peer_on_rejection(self): # Add a valid peer first so the conf file exists, then attempt bad add self.wg.add_peer('valid', FAKE_PUBKEY2, '', '10.0.0.99/32') ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '0.0.0.0/0') self.assertFalse(ok) content = open(self.wg._config_file()).read() # The bad peer's key must not appear; the valid one may self.assertNotIn(FAKE_PUBKEY, content) def test_add_peer_writes_public_key(self): self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.2/32') content = open(self.wg._config_file()).read() self.assertIn(f'PublicKey = {FAKE_PUBKEY}', content) def test_add_peer_writes_peer_name_as_comment(self): self.wg.add_peer('user1', FAKE_PUBKEY, '', '10.0.0.2/32') content = open(self.wg._config_file()).read() self.assertIn('# user1', content) def test_add_peer_writes_persistent_keepalive(self): self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.2/32', 25) content = open(self.wg._config_file()).read() self.assertIn('PersistentKeepalive = 25', content) if __name__ == '__main__': unittest.main()