Files
pic/tests/test_wireguard_vpn_routing.py
T
roof 9a800e3b6b feat: fix cross-cell service access — DNS DNAT, service DNAT, Caddy routing
DNS A records now return the WireGuard server IP (10.0.0.1) instead of
Docker bridge VIPs so cross-cell peers resolve service names correctly
regardless of their bridge subnet. DNAT rules (wg0:53→cell-dns:53 and
wg0:80→cell-caddy:80) are applied at startup. Caddy routes by Host header,
eliminating the Docker bridge subnet conflict. Firewall cell rules allow
DNS and service (Caddy) traffic from linked cell subnets. Split-tunnel
AllowedIPs now dynamically includes connected-cell VPN subnets and drops
the 172.20.0.0/16 range. Peers with route_via set now receive full-tunnel
config (0.0.0.0/0) so all their traffic exits via the remote cell.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 03:12:09 -04:00

550 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Tests for WireGuard VPN routing: internet access and DNS resolution through tunnel.
Scenarios covered:
1. generate_config() produces PostUp/PostDown rules that enable internet forwarding
(MASQUERADE + FORWARD ACCEPT are the two iptables rules that make "internet
through VPN" work — without them, packets from 10.0.0.x are not NATted to eth0).
2. get_peer_config() sets DNS = <cell-dns-ip> so clients resolve domain names
through the PIC DNS container, not their local ISP resolver.
3. apply_config() bootstrap path (empty wg0.conf) restores all active peers from
peers.json so clients can reconnect after an API restart that regenerated the file.
4. _load_registered_peers() correctly filters peers.json.
5. add_peer() writes a /32 AllowedIPs entry so routing targets only that client.
"""
import sys
import os
import json
import shutil
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from wireguard_manager import WireGuardManager, _resolve_peer_dns
# A syntactically-valid WireGuard base64 public key (44 chars, ends with =).
FAKE_PUBKEY = 'O35JY6nc8sb9zEarZYZVl70jno/J9dRyiB37YSYy4nA='
FAKE_PUBKEY2 = 'AbCdEfGhIjKlMnOpQrStUvWxYz0123456789ABCDEFG='
def _make_wg(tmp: str) -> WireGuardManager:
"""Build a WireGuardManager rooted in *tmp*, with _syncconf disabled."""
with patch.object(WireGuardManager, '_syncconf', return_value=None):
wg = WireGuardManager(tmp, tmp)
return wg
# ── 1. Internet forwarding rules in generate_config() ─────────────────────────
class TestInternetForwardingRules(unittest.TestCase):
"""
Verify that generate_config() emits the exact iptables rules required for
'internet through VPN': MASQUERADE on eth0 (outbound NAT) and a catch-all
FORWARD DROP on the wg0 interface.
The catch-all is DROP (not ACCEPT) so that only per-peer rules inserted at
chain position 1 via apply_peer_rules() can forward traffic. An ACCEPT
catch-all would allow any WireGuard-connected client full internet access
even if they have no entry in peers.json.
"""
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
with patch.object(WireGuardManager, '_syncconf', return_value=None):
self.wg = WireGuardManager(self.tmp, self.tmp)
def test_postup_has_masquerade_on_eth0(self):
"""MASQUERADE on eth0 NATs VPN-subnet packets so internet routers see the host IP."""
cfg = self.wg.generate_config()
self.assertIn('POSTROUTING -o eth0 -j MASQUERADE', cfg)
def test_postup_has_forward_drop_on_wg_interface(self):
"""Catch-all DROP blocks unconfigured WG clients; per-peer rules inserted above it allow known peers."""
cfg = self.wg.generate_config()
self.assertIn('FORWARD -i %i -j DROP', cfg)
self.assertNotIn('FORWARD -i %i -j ACCEPT', cfg)
def test_postdown_removes_masquerade_rule(self):
"""PostDown must mirror PostUp so rules are cleaned up when the tunnel goes down."""
cfg = self.wg.generate_config()
self.assertIn('POSTROUTING -o eth0 -j MASQUERADE', cfg.split('PostDown')[1])
def test_postdown_removes_forward_rule(self):
cfg = self.wg.generate_config()
self.assertIn('FORWARD -i %i -j DROP', cfg.split('PostDown')[1])
def test_postup_and_postdown_are_present(self):
"""Both PostUp and PostDown must exist — PostUp without PostDown leaks rules."""
cfg = self.wg.generate_config()
self.assertIn('PostUp', cfg)
self.assertIn('PostDown', cfg)
def test_masquerade_is_in_postup_not_only_postdown(self):
"""MASQUERADE must appear in PostUp (adding the rule), not only PostDown."""
cfg = self.wg.generate_config()
postup_section = cfg.split('PostUp')[1].split('PostDown')[0]
self.assertIn('MASQUERADE', postup_section)
# ── 2. DNS resolution: get_peer_config() sets DNS field ───────────────────────
class TestPeerConfigDns(unittest.TestCase):
"""
Verify that peer client configs include a DNS = <wg_server_ip> line.
DNS is set to the WG server IP (e.g. 10.0.0.1) rather than the Docker
cell-dns container IP. ensure_dns_dnat() routes wg0:53 → cell-dns, so
peers reach CoreDNS via the WG server IP — works for both split-tunnel
(10.0.x.x in AllowedIPs) and cross-cell peers.
"""
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
with patch.object(WireGuardManager, '_syncconf', return_value=None):
self.wg = WireGuardManager(self.tmp, self.tmp)
def test_peer_config_contains_dns_line(self):
keys = self.wg.generate_peer_keys('testpeer')
cfg = self.wg.get_peer_config('testpeer', '10.0.0.2', keys['private_key'])
self.assertIn('DNS =', cfg)
def test_peer_config_dns_is_valid_ip(self):
import ipaddress
keys = self.wg.generate_peer_keys('testpeer')
cfg = self.wg.get_peer_config('testpeer', '10.0.0.2', keys['private_key'])
dns_line = next(l for l in cfg.splitlines() if l.startswith('DNS ='))
dns_ip = dns_line.split('=', 1)[1].strip()
# Must be a parseable IPv4 address
ipaddress.IPv4Address(dns_ip)
def test_peer_config_dns_uses_wg_server_ip(self):
"""DNS in peer config is the WG server IP; ensure_dns_dnat() routes wg0:53 → cell-dns."""
keys = self.wg.generate_peer_keys('p1')
cfg = self.wg.get_peer_config('p1', '10.0.0.5', keys['private_key'])
# Default WG server address is 10.0.0.1/24 when no wg0.conf exists
self.assertIn('DNS = 10.0.0.1', cfg)
def test_peer_config_dns_fallback_to_resolve_on_error(self):
"""If WG address parsing fails, _resolve_peer_dns() is used as fallback."""
with patch.object(self.wg, '_get_configured_address', return_value='invalid'), \
patch('wireguard_manager.socket.gethostbyname', return_value='172.20.0.9'):
keys = self.wg.generate_peer_keys('p2')
cfg = self.wg.get_peer_config('p2', '10.0.0.6', keys['private_key'])
self.assertIn('DNS = 172.20.0.9', cfg)
def test_resolve_peer_dns_fallback(self):
"""_resolve_peer_dns() always returns a string even when DNS lookup fails."""
with patch('wireguard_manager.socket.gethostbyname', side_effect=OSError):
result = _resolve_peer_dns()
self.assertIsInstance(result, str)
self.assertEqual(result, '172.20.0.3')
def test_peer_config_allowed_ips_default_full_tunnel(self):
"""Default AllowedIPs = 0.0.0.0/0 routes all traffic (including internet) through VPN."""
keys = self.wg.generate_peer_keys('p3')
cfg = self.wg.get_peer_config('p3', '10.0.0.7', keys['private_key'])
# Full tunnel: 0.0.0.0/0 means all traffic goes through the VPN
self.assertIn('0.0.0.0/0', cfg)
# ── 3. Bootstrap restores peers from peers.json ───────────────────────────────
class TestApplyConfigBootstrapRestoresPeers(unittest.TestCase):
"""
apply_config() is called when the WireGuard port changes. If wg0.conf is
empty or missing [Interface], it bootstraps from generate_config() — which
only generates the [Interface] section and loses all [Peer] blocks.
The fix: after bootstrap, load active peers from peers.json and restore their
[Peer] blocks so clients can reconnect without manual intervention.
"""
def _make_wg_with_conf(self, conf_content: str = '') -> tuple:
tmp = tempfile.mkdtemp()
with patch.object(WireGuardManager, '_syncconf', return_value=None):
wg = WireGuardManager(tmp, tmp)
# Ensure wg_confs/ dir and write the file
cf = wg._config_file()
os.makedirs(os.path.dirname(cf), exist_ok=True)
with open(cf, 'w') as f:
f.write(conf_content)
return wg, cf, tmp
def _write_peers_json(self, wg: WireGuardManager, peers: list):
peers_file = os.path.join(wg.data_dir, 'peers.json')
with open(peers_file, 'w') as f:
json.dump(peers, f)
def tearDown(self):
pass # each test manages its own tmp
def test_empty_conf_triggers_bootstrap(self):
wg, cf, tmp = self._make_wg_with_conf('')
try:
self._write_peers_json(wg, [])
with patch.object(wg, 'get_external_ip', return_value=None), \
patch('subprocess.run'):
result = wg.apply_config({'port': 51820})
self.assertIn('wg0.conf was empty — regenerated from keys', result['warnings'])
finally:
shutil.rmtree(tmp)
def test_bootstrap_restores_active_peer(self):
"""After bootstrap on empty conf, active peer from peers.json appears in wg0.conf."""
wg, cf, tmp = self._make_wg_with_conf('')
try:
self._write_peers_json(wg, [{
'peer': 'user1',
'ip': '10.0.0.2',
'public_key': FAKE_PUBKEY,
'active': True,
}])
with patch.object(wg, 'get_external_ip', return_value=None), \
patch('subprocess.run'):
wg.apply_config({'port': 51820})
content = open(cf).read()
self.assertIn('[Peer]', content)
self.assertIn(FAKE_PUBKEY, content)
self.assertIn('AllowedIPs = 10.0.0.2/32', content)
finally:
shutil.rmtree(tmp)
def test_bootstrap_restores_multiple_peers(self):
wg, cf, tmp = self._make_wg_with_conf('')
try:
self._write_peers_json(wg, [
{'peer': 'peer1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
{'peer': 'peer2', 'ip': '10.0.0.3', 'public_key': FAKE_PUBKEY2, 'active': True},
])
with patch.object(wg, 'get_external_ip', return_value=None), \
patch('subprocess.run'):
wg.apply_config({'port': 51820})
content = open(cf).read()
self.assertIn(FAKE_PUBKEY, content)
self.assertIn(FAKE_PUBKEY2, content)
self.assertEqual(content.count('[Peer]'), 2)
finally:
shutil.rmtree(tmp)
def test_bootstrap_skips_inactive_peers(self):
"""Inactive peers (active=False) must NOT be restored to wg0.conf."""
wg, cf, tmp = self._make_wg_with_conf('')
try:
self._write_peers_json(wg, [
{'peer': 'active', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
{'peer': 'inactive', 'ip': '10.0.0.3', 'public_key': FAKE_PUBKEY2, 'active': False},
])
with patch.object(wg, 'get_external_ip', return_value=None), \
patch('subprocess.run'):
wg.apply_config({'port': 51820})
content = open(cf).read()
self.assertIn(FAKE_PUBKEY, content)
self.assertNotIn(FAKE_PUBKEY2, content)
finally:
shutil.rmtree(tmp)
def test_bootstrap_skips_peer_missing_public_key(self):
wg, cf, tmp = self._make_wg_with_conf('')
try:
self._write_peers_json(wg, [
{'peer': 'nok', 'ip': '10.0.0.2', 'active': True}, # no public_key
])
with patch.object(wg, 'get_external_ip', return_value=None), \
patch('subprocess.run'):
wg.apply_config({'port': 51820})
content = open(cf).read()
self.assertEqual(content.count('[Peer]'), 0)
finally:
shutil.rmtree(tmp)
def test_bootstrap_skips_peer_missing_ip(self):
wg, cf, tmp = self._make_wg_with_conf('')
try:
self._write_peers_json(wg, [
{'peer': 'nok', 'public_key': FAKE_PUBKEY, 'active': True}, # no ip
])
with patch.object(wg, 'get_external_ip', return_value=None), \
patch('subprocess.run'):
wg.apply_config({'port': 51820})
content = open(cf).read()
self.assertNotIn(FAKE_PUBKEY, content)
finally:
shutil.rmtree(tmp)
def test_existing_conf_with_interface_not_bootstrapped(self):
"""If [Interface] is present, bootstrap must NOT run — existing peers are preserved."""
wg, cf, tmp = self._make_wg_with_conf(
'[Interface]\nListenPort = 51820\nPrivateKey = dummykey\n'
'\n[Peer]\n# existing\nPublicKey = ' + FAKE_PUBKEY + '\nAllowedIPs = 10.0.0.2/32\n'
)
try:
with patch.object(wg, 'get_external_ip', return_value=None), \
patch('subprocess.run'):
result = wg.apply_config({'port': 51821})
self.assertNotIn('wg0.conf was empty — regenerated from keys', result['warnings'])
# Original peer must still be there after port-only change
self.assertIn(FAKE_PUBKEY, open(cf).read())
finally:
shutil.rmtree(tmp)
def test_restored_peers_have_slash32_allowed_ips(self):
"""/32 is mandatory: a wider mask would route internet traffic to the wrong peer."""
wg, cf, tmp = self._make_wg_with_conf('')
try:
self._write_peers_json(wg, [
{'peer': 'user1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
])
with patch.object(wg, 'get_external_ip', return_value=None), \
patch('subprocess.run'):
wg.apply_config({'port': 51820})
content = open(cf).read()
# Must be /32, not /24 or /0
self.assertIn('AllowedIPs = 10.0.0.2/32', content)
self.assertNotIn('AllowedIPs = 10.0.0.2/24', content)
finally:
shutil.rmtree(tmp)
# ── 4. _load_registered_peers() ───────────────────────────────────────────────
class TestLoadRegisteredPeers(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
with patch.object(WireGuardManager, '_syncconf', return_value=None):
self.wg = WireGuardManager(self.tmp, self.tmp)
def _write_peers(self, peers: list):
path = os.path.join(self.wg.data_dir, 'peers.json')
with open(path, 'w') as f:
json.dump(peers, f)
def test_returns_empty_list_when_file_missing(self):
self.assertEqual(self.wg._load_registered_peers(), [])
def test_returns_empty_list_on_malformed_json(self):
path = os.path.join(self.wg.data_dir, 'peers.json')
with open(path, 'w') as f:
f.write('not json {{{')
self.assertEqual(self.wg._load_registered_peers(), [])
def test_returns_active_peers(self):
self._write_peers([
{'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
])
result = self.wg._load_registered_peers()
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['public_key'], FAKE_PUBKEY)
def test_filters_out_inactive_peers(self):
self._write_peers([
{'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
{'peer': 'u2', 'ip': '10.0.0.3', 'public_key': FAKE_PUBKEY2, 'active': False},
])
result = self.wg._load_registered_peers()
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['public_key'], FAKE_PUBKEY)
def test_filters_out_peers_without_public_key(self):
self._write_peers([
{'peer': 'u1', 'ip': '10.0.0.2', 'active': True},
])
self.assertEqual(self.wg._load_registered_peers(), [])
def test_filters_out_peers_without_ip(self):
self._write_peers([
{'peer': 'u1', 'public_key': FAKE_PUBKEY, 'active': True},
])
self.assertEqual(self.wg._load_registered_peers(), [])
def test_treats_missing_active_field_as_active(self):
"""Peers without 'active' key should be treated as active (default True)."""
self._write_peers([
{'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY},
])
result = self.wg._load_registered_peers()
self.assertEqual(len(result), 1)
def test_skips_non_dict_entries(self):
self._write_peers([
'not_a_dict',
{'peer': 'u1', 'ip': '10.0.0.2', 'public_key': FAKE_PUBKEY, 'active': True},
])
result = self.wg._load_registered_peers()
self.assertEqual(len(result), 1)
def test_returns_all_required_fields(self):
self._write_peers([
{'peer': 'u1', 'ip': '10.0.0.5', 'public_key': FAKE_PUBKEY, 'active': True},
])
result = self.wg._load_registered_peers()
self.assertIn('ip', result[0])
self.assertIn('public_key', result[0])
# ── 5. add_peer() writes correct server-side AllowedIPs ───────────────────────
class TestAddPeerServerSideAllowedIps(unittest.TestCase):
"""
Server-side AllowedIPs must be a /32 host address matching the peer's VPN IP.
Wider masks (e.g. 0.0.0.0/0) would route internet traffic from all other
clients to that single peer, breaking internet access for everyone else.
"""
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
with patch.object(WireGuardManager, '_syncconf', return_value=None):
self.wg = WireGuardManager(self.tmp, self.tmp)
def test_add_peer_writes_slash32_allowed_ips(self):
ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.2/32')
self.assertTrue(ok)
content = open(self.wg._config_file()).read()
self.assertIn('AllowedIPs = 10.0.0.2/32', content)
def test_add_peer_rejects_full_tunnel_allowed_ips(self):
"""0.0.0.0/0 as server AllowedIPs is invalid and must be rejected."""
ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '0.0.0.0/0')
self.assertFalse(ok)
def test_add_peer_rejects_subnet_allowed_ips(self):
"""10.0.0.0/24 as server AllowedIPs is invalid and must be rejected."""
ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.0/24')
self.assertFalse(ok)
def test_add_peer_does_not_write_peer_on_rejection(self):
# Add a valid peer first so the conf file exists, then attempt bad add
self.wg.add_peer('valid', FAKE_PUBKEY2, '', '10.0.0.99/32')
ok = self.wg.add_peer('peer1', FAKE_PUBKEY, '', '0.0.0.0/0')
self.assertFalse(ok)
content = open(self.wg._config_file()).read()
# The bad peer's key must not appear; the valid one may
self.assertNotIn(FAKE_PUBKEY, content)
def test_add_peer_writes_public_key(self):
self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.2/32')
content = open(self.wg._config_file()).read()
self.assertIn(f'PublicKey = {FAKE_PUBKEY}', content)
def test_add_peer_writes_peer_name_as_comment(self):
self.wg.add_peer('user1', FAKE_PUBKEY, '', '10.0.0.2/32')
content = open(self.wg._config_file()).read()
self.assertIn('# user1', content)
def test_add_peer_writes_persistent_keepalive(self):
self.wg.add_peer('peer1', FAKE_PUBKEY, '', '10.0.0.2/32', 25)
content = open(self.wg._config_file()).read()
self.assertIn('PersistentKeepalive = 25', content)
# ── 6. Key sync: _sync_keys_from_conf() ──────────────────────────────────────
class TestSyncKeysFromConf(unittest.TestCase):
"""
linuxserver/wireguard auto-generates its own PrivateKey on first container start.
The PIC API generates a separate key independently. _sync_keys_from_conf() must
detect the mismatch and update the API key-store so get_peer_config() embeds
the correct server public key — otherwise the WireGuard handshake fails silently.
"""
def _make_wg(self, tmp: str) -> WireGuardManager:
with patch.object(WireGuardManager, '_syncconf', return_value=None):
return WireGuardManager(tmp, tmp)
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
with patch.object(WireGuardManager, '_syncconf', return_value=None):
self.wg = WireGuardManager(self.tmp, self.tmp)
def _write_conf_with_key(self, priv_b64: str):
"""Write a minimal wg0.conf with the given PrivateKey."""
cf = self.wg._config_file()
os.makedirs(os.path.dirname(cf), exist_ok=True)
with open(cf, 'w') as f:
f.write(f'[Interface]\nPrivateKey = {priv_b64}\nListenPort = 51820\nAddress = 10.0.0.1/24\n')
def _generate_key_pair(self):
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
import base64 as _b64
priv = X25519PrivateKey.generate()
priv_bytes = priv.private_bytes_raw()
pub_bytes = priv.public_key().public_bytes_raw()
return _b64.b64encode(priv_bytes).decode(), _b64.b64encode(pub_bytes).decode()
def test_sync_updates_api_key_when_conf_differs(self):
"""When wg0.conf has a different PrivateKey, the API key-store must be updated."""
new_priv, new_pub = self._generate_key_pair()
self._write_conf_with_key(new_priv)
self.wg._sync_keys_from_conf()
api_keys = self.wg.get_keys()
self.assertEqual(api_keys['private_key'], new_priv)
self.assertEqual(api_keys['public_key'], new_pub)
def test_sync_no_op_when_keys_match(self):
"""If wg0.conf already has the same key as the API store, nothing changes."""
api_keys = self.wg.get_keys()
self._write_conf_with_key(api_keys['private_key'])
self.wg._sync_keys_from_conf() # should not raise or change anything
after = self.wg.get_keys()
self.assertEqual(api_keys['public_key'], after['public_key'])
def test_sync_makes_get_peer_config_use_correct_server_pubkey(self):
"""After sync, get_peer_config() must embed the updated server public key."""
new_priv, new_pub = self._generate_key_pair()
self._write_conf_with_key(new_priv)
self.wg._sync_keys_from_conf()
peer_keys = self.wg.generate_peer_keys('testpeer')
cfg = self.wg.get_peer_config('testpeer', '10.0.0.2', peer_keys['private_key'])
self.assertIn(new_pub, cfg)
def test_sync_is_noop_when_conf_missing(self):
"""_sync_keys_from_conf() must not raise when wg0.conf doesn't exist."""
# Don't create the conf file
self.wg._sync_keys_from_conf() # should not raise
def test_apply_config_calls_sync_before_bootstrap(self):
"""apply_config() must call _sync_keys_from_conf() so bootstrap uses the live key."""
new_priv, new_pub = self._generate_key_pair()
cf = self.wg._config_file()
os.makedirs(os.path.dirname(cf), exist_ok=True)
with open(cf, 'w') as f:
f.write('') # empty conf triggers bootstrap
# Write the "new" key to the API key store as if the container auto-generated it
import base64 as _b64
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
priv_obj = X25519PrivateKey.from_private_bytes(_b64.b64decode(new_priv))
priv_bytes = priv_obj.private_bytes_raw()
pub_bytes = priv_obj.public_key().public_bytes_raw()
with open(os.path.join(self.wg.keys_dir, 'private.key'), 'wb') as f:
f.write(priv_bytes)
with open(os.path.join(self.wg.keys_dir, 'public.key'), 'wb') as f:
f.write(pub_bytes)
peers_file = os.path.join(self.wg.data_dir, 'peers.json')
with open(peers_file, 'w') as f:
json.dump([], f)
with patch.object(self.wg, 'get_external_ip', return_value=None), \
patch('subprocess.run'):
self.wg.apply_config({'port': 51820})
content = open(cf).read()
self.assertIn(new_priv, content)
if __name__ == '__main__':
unittest.main()