From e5d59fd94de92a1332ee22aa0befe2cec7b567ee Mon Sep 17 00:00:00 2001 From: Dmitrii Iurco Date: Sun, 26 Apr 2026 16:40:21 -0400 Subject: [PATCH] fix: sync API key-store from wg0.conf to prevent WireGuard handshake failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit linuxserver/wireguard auto-generates its own PrivateKey on first container start, independently of the PIC API's key-store. When the two diverge, the API generates peer configs with the wrong server public key and the WireGuard handshake fails silently — the client can ping the VPN subnet (10.0.0.x) but gets no internet and cannot reach any Docker service (172.20.0.x). Adds _sync_keys_from_conf(): called at the top of apply_config(), reads the PrivateKey from wg0.conf, derives the matching public key, and overwrites the API key files (private.key / public.key) if they differ. This makes wg0.conf the authoritative source for the server identity, keeping get_peer_config() consistent with the live WireGuard interface. Adds 5 new tests in TestSyncKeysFromConf covering: - key-store update when conf key differs - no-op when keys already match - get_peer_config() uses the synced key - no raise when conf is missing - apply_config() passes the synced key through bootstrap Co-Authored-By: Claude Sonnet 4.6 --- api/wireguard_manager.py | 44 +++++++++++++ tests/test_wireguard_vpn_routing.py | 96 +++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/api/wireguard_manager.py b/api/wireguard_manager.py index f57472e..8e32666 100644 --- a/api/wireguard_manager.py +++ b/api/wireguard_manager.py @@ -223,6 +223,45 @@ class WireGuardManager(BaseServiceManager): except Exception: return [] + def _sync_keys_from_conf(self) -> None: + """Sync the API's key store from wg0.conf so both agree on the server identity. + + linuxserver/wireguard auto-generates a PrivateKey on first container start. + The API generates its own key independently. Any time apply_config() runs, + read the PrivateKey from wg0.conf (the container's authoritative source) and + update the API's key-store files to match — keeping get_keys() consistent. + """ + import base64 as _b64 + cf = self._config_file() + if not os.path.exists(cf): + return + try: + with open(cf) as f: + raw = f.read() + for line in raw.splitlines(): + stripped = line.strip() + if stripped.startswith('PrivateKey'): + conf_priv = stripped.split('=', 1)[1].strip() + api_keys = self.get_keys() + if conf_priv == api_keys.get('private_key'): + return # already in sync + # Derive public key from private key and update both files + from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + priv_bytes = _b64.b64decode(conf_priv) + priv_obj = X25519PrivateKey.from_private_bytes(priv_bytes) + pub_bytes = priv_obj.public_key().public_bytes_raw() + pub_b64 = _b64.b64encode(pub_bytes).decode() + priv_file = os.path.join(self.keys_dir, 'private.key') + pub_file = os.path.join(self.keys_dir, 'public.key') + with open(priv_file, 'wb') as f: + f.write(priv_bytes) + with open(pub_file, 'wb') as f: + f.write(pub_bytes) + logger.info(f'wg: key-store synced from wg0.conf (new pub={pub_b64[:16]}...)') + return + except Exception as e: + logger.warning(f'_sync_keys_from_conf failed (non-fatal): {e}') + def apply_config(self, config: Dict[str, Any]) -> Dict[str, Any]: """Update wg0.conf interface fields and restart cell-wireguard.""" restarted = [] @@ -232,6 +271,11 @@ class WireGuardManager(BaseServiceManager): warnings.append('wg0.conf not found — skipping') return {'restarted': restarted, 'warnings': warnings} try: + # Sync the API key-store from wg0.conf before doing anything else. + # linuxserver/wireguard auto-generates its own key; this keeps both in sync + # so get_peer_config() always embeds the correct server public key. + self._sync_keys_from_conf() + with open(cf) as f: raw = f.read() diff --git a/tests/test_wireguard_vpn_routing.py b/tests/test_wireguard_vpn_routing.py index 8911f2b..777deb2 100644 --- a/tests/test_wireguard_vpn_routing.py +++ b/tests/test_wireguard_vpn_routing.py @@ -439,5 +439,101 @@ class TestAddPeerServerSideAllowedIps(unittest.TestCase): self.assertIn('PersistentKeepalive = 25', content) +# ── 6. Key sync: _sync_keys_from_conf() ────────────────────────────────────── + +class TestSyncKeysFromConf(unittest.TestCase): + """ + linuxserver/wireguard auto-generates its own PrivateKey on first container start. + The PIC API generates a separate key independently. _sync_keys_from_conf() must + detect the mismatch and update the API key-store so get_peer_config() embeds + the correct server public key — otherwise the WireGuard handshake fails silently. + """ + + def _make_wg(self, tmp: str) -> WireGuardManager: + with patch.object(WireGuardManager, '_syncconf', return_value=None): + return WireGuardManager(tmp, tmp) + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + with patch.object(WireGuardManager, '_syncconf', return_value=None): + self.wg = WireGuardManager(self.tmp, self.tmp) + + def _write_conf_with_key(self, priv_b64: str): + """Write a minimal wg0.conf with the given PrivateKey.""" + cf = self.wg._config_file() + os.makedirs(os.path.dirname(cf), exist_ok=True) + with open(cf, 'w') as f: + f.write(f'[Interface]\nPrivateKey = {priv_b64}\nListenPort = 51820\nAddress = 10.0.0.1/24\n') + + def _generate_key_pair(self): + from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + import base64 as _b64 + priv = X25519PrivateKey.generate() + priv_bytes = priv.private_bytes_raw() + pub_bytes = priv.public_key().public_bytes_raw() + return _b64.b64encode(priv_bytes).decode(), _b64.b64encode(pub_bytes).decode() + + def test_sync_updates_api_key_when_conf_differs(self): + """When wg0.conf has a different PrivateKey, the API key-store must be updated.""" + new_priv, new_pub = self._generate_key_pair() + self._write_conf_with_key(new_priv) + self.wg._sync_keys_from_conf() + api_keys = self.wg.get_keys() + self.assertEqual(api_keys['private_key'], new_priv) + self.assertEqual(api_keys['public_key'], new_pub) + + def test_sync_no_op_when_keys_match(self): + """If wg0.conf already has the same key as the API store, nothing changes.""" + api_keys = self.wg.get_keys() + self._write_conf_with_key(api_keys['private_key']) + self.wg._sync_keys_from_conf() # should not raise or change anything + after = self.wg.get_keys() + self.assertEqual(api_keys['public_key'], after['public_key']) + + def test_sync_makes_get_peer_config_use_correct_server_pubkey(self): + """After sync, get_peer_config() must embed the updated server public key.""" + new_priv, new_pub = self._generate_key_pair() + self._write_conf_with_key(new_priv) + self.wg._sync_keys_from_conf() + peer_keys = self.wg.generate_peer_keys('testpeer') + cfg = self.wg.get_peer_config('testpeer', '10.0.0.2', peer_keys['private_key']) + self.assertIn(new_pub, cfg) + + def test_sync_is_noop_when_conf_missing(self): + """_sync_keys_from_conf() must not raise when wg0.conf doesn't exist.""" + # Don't create the conf file + self.wg._sync_keys_from_conf() # should not raise + + def test_apply_config_calls_sync_before_bootstrap(self): + """apply_config() must call _sync_keys_from_conf() so bootstrap uses the live key.""" + new_priv, new_pub = self._generate_key_pair() + cf = self.wg._config_file() + os.makedirs(os.path.dirname(cf), exist_ok=True) + with open(cf, 'w') as f: + f.write('') # empty conf triggers bootstrap + # Write the "new" key to the API key store as if the container auto-generated it + import base64 as _b64 + from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + priv_obj = X25519PrivateKey.from_private_bytes(_b64.b64decode(new_priv)) + priv_bytes = priv_obj.private_bytes_raw() + pub_bytes = priv_obj.public_key().public_bytes_raw() + with open(os.path.join(self.wg.keys_dir, 'private.key'), 'wb') as f: + f.write(priv_bytes) + with open(os.path.join(self.wg.keys_dir, 'public.key'), 'wb') as f: + f.write(pub_bytes) + + peers_file = os.path.join(self.wg.data_dir, 'peers.json') + with open(peers_file, 'w') as f: + json.dump([], f) + + with patch.object(self.wg, 'get_external_ip', return_value=None), \ + patch('subprocess.run'): + self.wg.apply_config({'port': 51820}) + + content = open(cf).read() + self.assertIn(new_priv, content) + + if __name__ == '__main__': unittest.main()