68c27b4521
The PostUp rule appended `iptables -A FORWARD -i wg0 -j ACCEPT` which allowed any WireGuard-connected client full internet access regardless of per-peer rules, even when no peers were configured in wg0.conf. Fix: change PostUp/PostDown to use DROP as the catch-all. Per-peer and per-cell rules use -I (insert at top) so they take precedence; unknown or unconfigured WG traffic hits the DROP at the bottom. Also add reconcile_stale_peer_rules() called on startup to remove FORWARD rules for peer IPs that no longer exist in the registry, preventing deleted peers from retaining firewall access across container restarts. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
546 lines
24 KiB
Python
546 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for WireGuard VPN routing: internet access and DNS resolution through tunnel.
|
|
|
|
Scenarios covered:
|
|
1. generate_config() produces PostUp/PostDown rules that enable internet forwarding
|
|
(MASQUERADE + FORWARD ACCEPT are the two iptables rules that make "internet
|
|
through VPN" work — without them, packets from 10.0.0.x are not NATted to eth0).
|
|
2. get_peer_config() sets DNS = <cell-dns-ip> so clients resolve domain names
|
|
through the PIC DNS container, not their local ISP resolver.
|
|
3. apply_config() bootstrap path (empty wg0.conf) restores all active peers from
|
|
peers.json so clients can reconnect after an API restart that regenerated the file.
|
|
4. _load_registered_peers() correctly filters peers.json.
|
|
5. add_peer() writes a /32 AllowedIPs entry so routing targets only that client.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
from wireguard_manager import WireGuardManager, _resolve_peer_dns
|
|
|
|
|
|
# A syntactically-valid WireGuard base64 public key (44 chars, ends with =).
|
|
FAKE_PUBKEY = 'O35JY6nc8sb9zEarZYZVl70jno/J9dRyiB37YSYy4nA='
|
|
FAKE_PUBKEY2 = 'AbCdEfGhIjKlMnOpQrStUvWxYz0123456789ABCDEFG='
|
|
|
|
|
|
def _make_wg(tmp: str) -> WireGuardManager:
|
|
"""Build a WireGuardManager rooted in *tmp*, with _syncconf disabled."""
|
|
with patch.object(WireGuardManager, '_syncconf', return_value=None):
|
|
wg = WireGuardManager(tmp, tmp)
|
|
return wg
|
|
|
|
|
|
# ── 1. Internet forwarding rules in generate_config() ─────────────────────────
|
|
|
|
class TestInternetForwardingRules(unittest.TestCase):
|
|
"""
|
|
Verify that generate_config() emits the exact iptables rules required for
|
|
'internet through VPN': MASQUERADE on eth0 (outbound NAT) and a catch-all
|
|
FORWARD DROP on the wg0 interface.
|
|
|
|
The catch-all is DROP (not ACCEPT) so that only per-peer rules inserted at
|
|
chain position 1 via apply_peer_rules() can forward traffic. An ACCEPT
|
|
catch-all would allow any WireGuard-connected client full internet access
|
|
even if they have no entry in peers.json.
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, self.tmp)
|
|
with patch.object(WireGuardManager, '_syncconf', return_value=None):
|
|
self.wg = WireGuardManager(self.tmp, self.tmp)
|
|
|
|
def test_postup_has_masquerade_on_eth0(self):
|
|
"""MASQUERADE on eth0 NATs VPN-subnet packets so internet routers see the host IP."""
|
|
cfg = self.wg.generate_config()
|
|
self.assertIn('POSTROUTING -o eth0 -j MASQUERADE', cfg)
|
|
|
|
def test_postup_has_forward_drop_on_wg_interface(self):
|
|
"""Catch-all DROP blocks unconfigured WG clients; per-peer rules inserted above it allow known peers."""
|
|
cfg = self.wg.generate_config()
|
|
self.assertIn('FORWARD -i %i -j DROP', cfg)
|
|
self.assertNotIn('FORWARD -i %i -j ACCEPT', cfg)
|
|
|
|
def test_postdown_removes_masquerade_rule(self):
|
|
"""PostDown must mirror PostUp so rules are cleaned up when the tunnel goes down."""
|
|
cfg = self.wg.generate_config()
|
|
self.assertIn('POSTROUTING -o eth0 -j MASQUERADE', cfg.split('PostDown')[1])
|
|
|
|
def test_postdown_removes_forward_rule(self):
|
|
cfg = self.wg.generate_config()
|
|
self.assertIn('FORWARD -i %i -j DROP', cfg.split('PostDown')[1])
|
|
|
|
def test_postup_and_postdown_are_present(self):
|
|
"""Both PostUp and PostDown must exist — PostUp without PostDown leaks rules."""
|
|
cfg = self.wg.generate_config()
|
|
self.assertIn('PostUp', cfg)
|
|
self.assertIn('PostDown', cfg)
|
|
|
|
def test_masquerade_is_in_postup_not_only_postdown(self):
|
|
"""MASQUERADE must appear in PostUp (adding the rule), not only PostDown."""
|
|
cfg = self.wg.generate_config()
|
|
postup_section = cfg.split('PostUp')[1].split('PostDown')[0]
|
|
self.assertIn('MASQUERADE', postup_section)
|
|
|
|
|
|
# ── 2. DNS resolution: get_peer_config() sets DNS field ───────────────────────
|
|
|
|
class TestPeerConfigDns(unittest.TestCase):
|
|
"""
|
|
Verify that peer client configs include a DNS = <ip> line pointing to the
|
|
PIC DNS container. Without DNS, the client tunnel has no internet-accessible
|
|
domain resolution even though packets are forwarded correctly.
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, self.tmp)
|
|
with patch.object(WireGuardManager, '_syncconf', return_value=None):
|
|
self.wg = WireGuardManager(self.tmp, self.tmp)
|
|
|
|
def test_peer_config_contains_dns_line(self):
|
|
keys = self.wg.generate_peer_keys('testpeer')
|
|
cfg = self.wg.get_peer_config('testpeer', '10.0.0.2', keys['private_key'])
|
|
self.assertIn('DNS =', cfg)
|
|
|
|
def test_peer_config_dns_is_valid_ip(self):
|
|
import ipaddress
|
|
keys = self.wg.generate_peer_keys('testpeer')
|
|
cfg = self.wg.get_peer_config('testpeer', '10.0.0.2', keys['private_key'])
|
|
dns_line = next(l for l in cfg.splitlines() if l.startswith('DNS ='))
|
|
dns_ip = dns_line.split('=', 1)[1].strip()
|
|
# Must be a parseable IPv4 address
|
|
ipaddress.IPv4Address(dns_ip)
|
|
|
|
def test_peer_config_dns_defaults_to_cell_dns_ip(self):
|
|
"""When cell-dns hostname can't be resolved, falls back to 172.20.0.3."""
|
|
with patch('wireguard_manager.socket.gethostbyname', side_effect=OSError):
|
|
keys = self.wg.generate_peer_keys('p1')
|
|
cfg = self.wg.get_peer_config('p1', '10.0.0.5', keys['private_key'])
|
|
self.assertIn('DNS = 172.20.0.3', cfg)
|
|
|
|
def test_peer_config_dns_uses_resolved_hostname(self):
|
|
"""When cell-dns resolves, its IP is used as the DNS server."""
|
|
with patch('wireguard_manager.socket.gethostbyname', return_value='172.20.0.3'):
|
|
keys = self.wg.generate_peer_keys('p2')
|
|
cfg = self.wg.get_peer_config('p2', '10.0.0.6', keys['private_key'])
|
|
self.assertIn('DNS = 172.20.0.3', cfg)
|
|
|
|
def test_resolve_peer_dns_fallback(self):
|
|
"""_resolve_peer_dns() always returns a string even when DNS lookup fails."""
|
|
with patch('wireguard_manager.socket.gethostbyname', side_effect=OSError):
|
|
result = _resolve_peer_dns()
|
|
self.assertIsInstance(result, str)
|
|
self.assertEqual(result, '172.20.0.3')
|
|
|
|
def test_peer_config_allowed_ips_default_full_tunnel(self):
|
|
"""Default AllowedIPs = 0.0.0.0/0 routes all traffic (including internet) through VPN."""
|
|
keys = self.wg.generate_peer_keys('p3')
|
|
cfg = self.wg.get_peer_config('p3', '10.0.0.7', keys['private_key'])
|
|
# Full tunnel: 0.0.0.0/0 means all traffic goes through the VPN
|
|
self.assertIn('0.0.0.0/0', cfg)
|
|
|
|
|
|
# ── 3. Bootstrap restores peers from peers.json ───────────────────────────────
|
|
|
|
class TestApplyConfigBootstrapRestoresPeers(unittest.TestCase):
|
|
"""
|
|
apply_config() is called when the WireGuard port changes. If wg0.conf is
|
|
empty or missing [Interface], it bootstraps from generate_config() — which
|
|
only generates the [Interface] section and loses all [Peer] blocks.
|
|
|
|
The fix: after bootstrap, load active peers from peers.json and restore their
|
|
[Peer] blocks so clients can reconnect without manual intervention.
|
|
"""
|
|
|
|
def _make_wg_with_conf(self, conf_content: str = '') -> tuple:
|
|
tmp = tempfile.mkdtemp()
|
|
with patch.object(WireGuardManager, '_syncconf', return_value=None):
|
|
wg = WireGuardManager(tmp, tmp)
|
|
|
|
# Ensure wg_confs/ dir and write the file
|
|
cf = wg._config_file()
|
|
os.makedirs(os.path.dirname(cf), exist_ok=True)
|
|
with open(cf, 'w') as f:
|
|
f.write(conf_content)
|
|
return wg, cf, tmp
|
|
|
|
def _write_peers_json(self, wg: WireGuardManager, peers: list):
|
|
peers_file = os.path.join(wg.data_dir, 'peers.json')
|
|
with open(peers_file, 'w') as f:
|
|
json.dump(peers, f)
|
|
|
|
def tearDown(self):
|
|
pass # each test manages its own tmp
|
|
|
|
def test_empty_conf_triggers_bootstrap(self):
|
|
wg, cf, tmp = self._make_wg_with_conf('')
|
|
try:
|
|
self._write_peers_json(wg, [])
|
|
with patch.object(wg, 'get_external_ip', return_value=None), \
|
|
patch('subprocess.run'):
|
|
result = wg.apply_config({'port': 51820})
|
|
self.assertIn('wg0.conf was empty — regenerated from keys', result['warnings'])
|
|
finally:
|
|
shutil.rmtree(tmp)
|
|
|
|
def test_bootstrap_restores_active_peer(self):
|
|
"""After bootstrap on empty conf, active peer from peers.json appears in wg0.conf."""
|
|
wg, cf, tmp = self._make_wg_with_conf('')
|
|
try:
|
|
self._write_peers_json(wg, [{
|
|
'peer': 'user1',
|
|
'ip': '10.0.0.2',
|
|
'public_key': FAKE_PUBKEY,
|
|
'active': True,
|
|
}])
|
|
with patch.object(wg, 'get_external_ip', return_value=None), \
|
|
patch('subprocess.run'):
|
|
wg.apply_config({'port': 51820})
|
|
content = open(cf).read()
|
|
self.assertIn('[Peer]', content)
|
|
self.assertIn(FAKE_PUBKEY, content)
|
|
self.assertIn('AllowedIPs = 10.0.0.2/32', content)
|
|
finally:
|
|
shutil.rmtree(tmp)
|
|
|
|
def test_bootstrap_restores_multiple_peers(self):
|
|
wg, cf, tmp = self._make_wg_with_conf('')
|
|
try:
|
|
self._write_peers_json(wg, [
|
|
{'peer': 'peer1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
|
|
{'peer': 'peer2', 'ip': '10.0.0.3', 'public_key': FAKE_PUBKEY2, 'active': True},
|
|
])
|
|
with patch.object(wg, 'get_external_ip', return_value=None), \
|
|
patch('subprocess.run'):
|
|
wg.apply_config({'port': 51820})
|
|
content = open(cf).read()
|
|
self.assertIn(FAKE_PUBKEY, content)
|
|
self.assertIn(FAKE_PUBKEY2, content)
|
|
self.assertEqual(content.count('[Peer]'), 2)
|
|
finally:
|
|
shutil.rmtree(tmp)
|
|
|
|
def test_bootstrap_skips_inactive_peers(self):
|
|
"""Inactive peers (active=False) must NOT be restored to wg0.conf."""
|
|
wg, cf, tmp = self._make_wg_with_conf('')
|
|
try:
|
|
self._write_peers_json(wg, [
|
|
{'peer': 'active', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
|
|
{'peer': 'inactive', 'ip': '10.0.0.3', 'public_key': FAKE_PUBKEY2, 'active': False},
|
|
])
|
|
with patch.object(wg, 'get_external_ip', return_value=None), \
|
|
patch('subprocess.run'):
|
|
wg.apply_config({'port': 51820})
|
|
content = open(cf).read()
|
|
self.assertIn(FAKE_PUBKEY, content)
|
|
self.assertNotIn(FAKE_PUBKEY2, content)
|
|
finally:
|
|
shutil.rmtree(tmp)
|
|
|
|
def test_bootstrap_skips_peer_missing_public_key(self):
|
|
wg, cf, tmp = self._make_wg_with_conf('')
|
|
try:
|
|
self._write_peers_json(wg, [
|
|
{'peer': 'nok', 'ip': '10.0.0.2', 'active': True}, # no public_key
|
|
])
|
|
with patch.object(wg, 'get_external_ip', return_value=None), \
|
|
patch('subprocess.run'):
|
|
wg.apply_config({'port': 51820})
|
|
content = open(cf).read()
|
|
self.assertEqual(content.count('[Peer]'), 0)
|
|
finally:
|
|
shutil.rmtree(tmp)
|
|
|
|
def test_bootstrap_skips_peer_missing_ip(self):
|
|
wg, cf, tmp = self._make_wg_with_conf('')
|
|
try:
|
|
self._write_peers_json(wg, [
|
|
{'peer': 'nok', 'public_key': FAKE_PUBKEY, 'active': True}, # no ip
|
|
])
|
|
with patch.object(wg, 'get_external_ip', return_value=None), \
|
|
patch('subprocess.run'):
|
|
wg.apply_config({'port': 51820})
|
|
content = open(cf).read()
|
|
self.assertNotIn(FAKE_PUBKEY, content)
|
|
finally:
|
|
shutil.rmtree(tmp)
|
|
|
|
def test_existing_conf_with_interface_not_bootstrapped(self):
|
|
"""If [Interface] is present, bootstrap must NOT run — existing peers are preserved."""
|
|
wg, cf, tmp = self._make_wg_with_conf(
|
|
'[Interface]\nListenPort = 51820\nPrivateKey = dummykey\n'
|
|
'\n[Peer]\n# existing\nPublicKey = ' + FAKE_PUBKEY + '\nAllowedIPs = 10.0.0.2/32\n'
|
|
)
|
|
try:
|
|
with patch.object(wg, 'get_external_ip', return_value=None), \
|
|
patch('subprocess.run'):
|
|
result = wg.apply_config({'port': 51821})
|
|
self.assertNotIn('wg0.conf was empty — regenerated from keys', result['warnings'])
|
|
# Original peer must still be there after port-only change
|
|
self.assertIn(FAKE_PUBKEY, open(cf).read())
|
|
finally:
|
|
shutil.rmtree(tmp)
|
|
|
|
def test_restored_peers_have_slash32_allowed_ips(self):
|
|
"""/32 is mandatory: a wider mask would route internet traffic to the wrong peer."""
|
|
wg, cf, tmp = self._make_wg_with_conf('')
|
|
try:
|
|
self._write_peers_json(wg, [
|
|
{'peer': 'user1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
|
|
])
|
|
with patch.object(wg, 'get_external_ip', return_value=None), \
|
|
patch('subprocess.run'):
|
|
wg.apply_config({'port': 51820})
|
|
content = open(cf).read()
|
|
# Must be /32, not /24 or /0
|
|
self.assertIn('AllowedIPs = 10.0.0.2/32', content)
|
|
self.assertNotIn('AllowedIPs = 10.0.0.2/24', content)
|
|
finally:
|
|
shutil.rmtree(tmp)
|
|
|
|
|
|
# ── 4. _load_registered_peers() ───────────────────────────────────────────────
|
|
|
|
class TestLoadRegisteredPeers(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, self.tmp)
|
|
with patch.object(WireGuardManager, '_syncconf', return_value=None):
|
|
self.wg = WireGuardManager(self.tmp, self.tmp)
|
|
|
|
def _write_peers(self, peers: list):
|
|
path = os.path.join(self.wg.data_dir, 'peers.json')
|
|
with open(path, 'w') as f:
|
|
json.dump(peers, f)
|
|
|
|
def test_returns_empty_list_when_file_missing(self):
|
|
self.assertEqual(self.wg._load_registered_peers(), [])
|
|
|
|
def test_returns_empty_list_on_malformed_json(self):
|
|
path = os.path.join(self.wg.data_dir, 'peers.json')
|
|
with open(path, 'w') as f:
|
|
f.write('not json {{{')
|
|
self.assertEqual(self.wg._load_registered_peers(), [])
|
|
|
|
def test_returns_active_peers(self):
|
|
self._write_peers([
|
|
{'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
|
|
])
|
|
result = self.wg._load_registered_peers()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]['public_key'], FAKE_PUBKEY)
|
|
|
|
def test_filters_out_inactive_peers(self):
|
|
self._write_peers([
|
|
{'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
|
|
{'peer': 'u2', 'ip': '10.0.0.3', 'public_key': FAKE_PUBKEY2, 'active': False},
|
|
])
|
|
result = self.wg._load_registered_peers()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]['public_key'], FAKE_PUBKEY)
|
|
|
|
def test_filters_out_peers_without_public_key(self):
|
|
self._write_peers([
|
|
{'peer': 'u1', 'ip': '10.0.0.2', 'active': True},
|
|
])
|
|
self.assertEqual(self.wg._load_registered_peers(), [])
|
|
|
|
def test_filters_out_peers_without_ip(self):
|
|
self._write_peers([
|
|
{'peer': 'u1', 'public_key': FAKE_PUBKEY, 'active': True},
|
|
])
|
|
self.assertEqual(self.wg._load_registered_peers(), [])
|
|
|
|
def test_treats_missing_active_field_as_active(self):
|
|
"""Peers without 'active' key should be treated as active (default True)."""
|
|
self._write_peers([
|
|
{'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY},
|
|
])
|
|
result = self.wg._load_registered_peers()
|
|
self.assertEqual(len(result), 1)
|
|
|
|
def test_skips_non_dict_entries(self):
|
|
self._write_peers([
|
|
'not_a_dict',
|
|
{'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
|
|
])
|
|
result = self.wg._load_registered_peers()
|
|
self.assertEqual(len(result), 1)
|
|
|
|
def test_returns_all_required_fields(self):
|
|
self._write_peers([
|
|
{'peer': 'u1', 'ip': '10.0.0.5', 'public_key': FAKE_PUBKEY, 'active': True},
|
|
])
|
|
result = self.wg._load_registered_peers()
|
|
self.assertIn('ip', result[0])
|
|
self.assertIn('public_key', result[0])
|
|
|
|
|
|
# ── 5. add_peer() writes correct server-side AllowedIPs ───────────────────────
|
|
|
|
class TestAddPeerServerSideAllowedIps(unittest.TestCase):
|
|
"""
|
|
Server-side AllowedIPs must be a /32 host address matching the peer's VPN IP.
|
|
Wider masks (e.g. 0.0.0.0/0) would route internet traffic from all other
|
|
clients to that single peer, breaking internet access for everyone else.
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, self.tmp)
|
|
with patch.object(WireGuardManager, '_syncconf', return_value=None):
|
|
self.wg = WireGuardManager(self.tmp, self.tmp)
|
|
|
|
def test_add_peer_writes_slash32_allowed_ips(self):
|
|
ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.2/32')
|
|
self.assertTrue(ok)
|
|
content = open(self.wg._config_file()).read()
|
|
self.assertIn('AllowedIPs = 10.0.0.2/32', content)
|
|
|
|
def test_add_peer_rejects_full_tunnel_allowed_ips(self):
|
|
"""0.0.0.0/0 as server AllowedIPs is invalid and must be rejected."""
|
|
ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '0.0.0.0/0')
|
|
self.assertFalse(ok)
|
|
|
|
def test_add_peer_rejects_subnet_allowed_ips(self):
|
|
"""10.0.0.0/24 as server AllowedIPs is invalid and must be rejected."""
|
|
ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.0/24')
|
|
self.assertFalse(ok)
|
|
|
|
def test_add_peer_does_not_write_peer_on_rejection(self):
|
|
# Add a valid peer first so the conf file exists, then attempt bad add
|
|
self.wg.add_peer('valid', FAKE_PUBKEY2, '', '10.0.0.99/32')
|
|
ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '0.0.0.0/0')
|
|
self.assertFalse(ok)
|
|
content = open(self.wg._config_file()).read()
|
|
# The bad peer's key must not appear; the valid one may
|
|
self.assertNotIn(FAKE_PUBKEY, content)
|
|
|
|
def test_add_peer_writes_public_key(self):
|
|
self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.2/32')
|
|
content = open(self.wg._config_file()).read()
|
|
self.assertIn(f'PublicKey = {FAKE_PUBKEY}', content)
|
|
|
|
def test_add_peer_writes_peer_name_as_comment(self):
|
|
self.wg.add_peer('user1', FAKE_PUBKEY, '', '10.0.0.2/32')
|
|
content = open(self.wg._config_file()).read()
|
|
self.assertIn('# user1', content)
|
|
|
|
def test_add_peer_writes_persistent_keepalive(self):
|
|
self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.2/32', 25)
|
|
content = open(self.wg._config_file()).read()
|
|
self.assertIn('PersistentKeepalive = 25', content)
|
|
|
|
|
|
# ── 6. Key sync: _sync_keys_from_conf() ──────────────────────────────────────
|
|
|
|
class TestSyncKeysFromConf(unittest.TestCase):
|
|
"""
|
|
linuxserver/wireguard auto-generates its own PrivateKey on first container start.
|
|
The PIC API generates a separate key independently. _sync_keys_from_conf() must
|
|
detect the mismatch and update the API key-store so get_peer_config() embeds
|
|
the correct server public key — otherwise the WireGuard handshake fails silently.
|
|
"""
|
|
|
|
def _make_wg(self, tmp: str) -> WireGuardManager:
|
|
with patch.object(WireGuardManager, '_syncconf', return_value=None):
|
|
return WireGuardManager(tmp, tmp)
|
|
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, self.tmp)
|
|
with patch.object(WireGuardManager, '_syncconf', return_value=None):
|
|
self.wg = WireGuardManager(self.tmp, self.tmp)
|
|
|
|
def _write_conf_with_key(self, priv_b64: str):
|
|
"""Write a minimal wg0.conf with the given PrivateKey."""
|
|
cf = self.wg._config_file()
|
|
os.makedirs(os.path.dirname(cf), exist_ok=True)
|
|
with open(cf, 'w') as f:
|
|
f.write(f'[Interface]\nPrivateKey = {priv_b64}\nListenPort = 51820\nAddress = 10.0.0.1/24\n')
|
|
|
|
def _generate_key_pair(self):
|
|
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
|
|
import base64 as _b64
|
|
priv = X25519PrivateKey.generate()
|
|
priv_bytes = priv.private_bytes_raw()
|
|
pub_bytes = priv.public_key().public_bytes_raw()
|
|
return _b64.b64encode(priv_bytes).decode(), _b64.b64encode(pub_bytes).decode()
|
|
|
|
def test_sync_updates_api_key_when_conf_differs(self):
|
|
"""When wg0.conf has a different PrivateKey, the API key-store must be updated."""
|
|
new_priv, new_pub = self._generate_key_pair()
|
|
self._write_conf_with_key(new_priv)
|
|
self.wg._sync_keys_from_conf()
|
|
api_keys = self.wg.get_keys()
|
|
self.assertEqual(api_keys['private_key'], new_priv)
|
|
self.assertEqual(api_keys['public_key'], new_pub)
|
|
|
|
def test_sync_no_op_when_keys_match(self):
|
|
"""If wg0.conf already has the same key as the API store, nothing changes."""
|
|
api_keys = self.wg.get_keys()
|
|
self._write_conf_with_key(api_keys['private_key'])
|
|
self.wg._sync_keys_from_conf() # should not raise or change anything
|
|
after = self.wg.get_keys()
|
|
self.assertEqual(api_keys['public_key'], after['public_key'])
|
|
|
|
def test_sync_makes_get_peer_config_use_correct_server_pubkey(self):
|
|
"""After sync, get_peer_config() must embed the updated server public key."""
|
|
new_priv, new_pub = self._generate_key_pair()
|
|
self._write_conf_with_key(new_priv)
|
|
self.wg._sync_keys_from_conf()
|
|
peer_keys = self.wg.generate_peer_keys('testpeer')
|
|
cfg = self.wg.get_peer_config('testpeer', '10.0.0.2', peer_keys['private_key'])
|
|
self.assertIn(new_pub, cfg)
|
|
|
|
def test_sync_is_noop_when_conf_missing(self):
|
|
"""_sync_keys_from_conf() must not raise when wg0.conf doesn't exist."""
|
|
# Don't create the conf file
|
|
self.wg._sync_keys_from_conf() # should not raise
|
|
|
|
def test_apply_config_calls_sync_before_bootstrap(self):
|
|
"""apply_config() must call _sync_keys_from_conf() so bootstrap uses the live key."""
|
|
new_priv, new_pub = self._generate_key_pair()
|
|
cf = self.wg._config_file()
|
|
os.makedirs(os.path.dirname(cf), exist_ok=True)
|
|
with open(cf, 'w') as f:
|
|
f.write('') # empty conf triggers bootstrap
|
|
# Write the "new" key to the API key store as if the container auto-generated it
|
|
import base64 as _b64
|
|
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
|
|
priv_obj = X25519PrivateKey.from_private_bytes(_b64.b64decode(new_priv))
|
|
priv_bytes = priv_obj.private_bytes_raw()
|
|
pub_bytes = priv_obj.public_key().public_bytes_raw()
|
|
with open(os.path.join(self.wg.keys_dir, 'private.key'), 'wb') as f:
|
|
f.write(priv_bytes)
|
|
with open(os.path.join(self.wg.keys_dir, 'public.key'), 'wb') as f:
|
|
f.write(pub_bytes)
|
|
|
|
peers_file = os.path.join(self.wg.data_dir, 'peers.json')
|
|
with open(peers_file, 'w') as f:
|
|
json.dump([], f)
|
|
|
|
with patch.object(self.wg, 'get_external_ip', return_value=None), \
|
|
patch('subprocess.run'):
|
|
self.wg.apply_config({'port': 51820})
|
|
|
|
content = open(cf).read()
|
|
self.assertIn(new_priv, content)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|