Files
pic/tests/test_wireguard_endpoints.py
T
roof a338836bb8 add security fixes, port hardening, and expanded QA coverage
Security fixes:
- Replace debug=True with env-driven FLASK_DEBUG in app.py
- Add _safe_path helper and path-traversal protection to all 6 file routes
  in file_manager.py
- Add peer_name regex and input validation (public_key, name, endpoint_ip)
  in wireguard_manager.py
- Stop returning private key from GET /api/wireguard/keys; return only
  public_key + has_private_key boolean
- Fix is_local_request() XFF bypass by checking remote_addr only, ignoring
  X-Forwarded-For
- Remove duplicate get_all_configs / get_config_summary methods from
  config_manager.py

DevOps:
- Bind 6 internal service ports to 127.0.0.1 in docker-compose.yml
  (radicale, webdav, api, webui, rainloop, filegator)
- Move WebDAV credentials to env vars (WEBDAV_USER, WEBDAV_PASS)
- Pin flask, flask-cors, requests, cryptography, docker to secure minimum
  versions in requirements.txt

QA (560 tests, 0 failures):
- tests/test_wireguard_endpoints.py: 18 new endpoint tests
- tests/test_file_endpoints.py: 24 new endpoint tests incl. path traversal
- tests/test_container_manager.py: expanded from 2 to 30 tests
- tests/test_config_backup_restore_http.py: 25 new tests (new file)
- tests/test_config_apply.py: 9 new tests (new file)

Docs:
- Rewrite README.md with accurate architecture, ports, env vars, security notes
- Rewrite QUICKSTART.md with verified commands

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 13:08:24 -04:00

232 lines
9.9 KiB
Python

#!/usr/bin/env python3
"""
Unit tests for WireGuard-specific Flask endpoints in api/app.py.
Covers routes that were not already tested in test_api_endpoints.py:
POST /api/wireguard/check-port
GET /api/wireguard/server-config
POST /api/wireguard/refresh-ip
GET /api/wireguard/peers/statuses
POST /api/wireguard/apply-enforcement
POST /api/wireguard/network/setup
GET /api/wireguard/network/status
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
class TestWireGuardEndpoints(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
# ── POST /api/wireguard/check-port ─────────────────────────────────────
@patch('app.wireguard_manager')
def test_check_port_returns_port_open_true(self, mock_wg):
mock_wg.check_port_open.return_value = True
mock_wg._get_configured_port.return_value = 51820
r = self.client.post('/api/wireguard/check-port')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('port_open', data)
self.assertIn('port', data)
self.assertTrue(data['port_open'])
self.assertEqual(data['port'], 51820)
@patch('app.wireguard_manager')
def test_check_port_returns_port_open_false(self, mock_wg):
mock_wg.check_port_open.return_value = False
mock_wg._get_configured_port.return_value = 51820
r = self.client.post('/api/wireguard/check-port')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertFalse(data['port_open'])
@patch('app.wireguard_manager')
def test_check_port_returns_500_on_exception(self, mock_wg):
mock_wg.check_port_open.side_effect = Exception('socket error')
r = self.client.post('/api/wireguard/check-port')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
# ── GET /api/wireguard/server-config ───────────────────────────────────
@patch('app.wireguard_manager')
def test_server_config_returns_config_dict(self, mock_wg):
mock_wg.get_server_config.return_value = {
'public_key': 'PUBKEY==',
'endpoint': '1.2.3.4:51820',
'port': 51820,
}
r = self.client.get('/api/wireguard/server-config')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('public_key', data)
self.assertIn('endpoint', data)
@patch('app.wireguard_manager')
def test_server_config_returns_500_on_exception(self, mock_wg):
mock_wg.get_server_config.side_effect = RuntimeError('wg not running')
r = self.client.get('/api/wireguard/server-config')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
# ── POST /api/wireguard/refresh-ip ─────────────────────────────────────
@patch('app.wireguard_manager')
def test_refresh_ip_returns_external_ip_and_endpoint(self, mock_wg):
mock_wg.get_external_ip.return_value = '203.0.113.10'
mock_wg._get_configured_port.return_value = 51820
r = self.client.post('/api/wireguard/refresh-ip')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertEqual(data['external_ip'], '203.0.113.10')
self.assertEqual(data['port'], 51820)
self.assertEqual(data['endpoint'], '203.0.113.10:51820')
@patch('app.wireguard_manager')
def test_refresh_ip_endpoint_is_none_when_ip_unavailable(self, mock_wg):
mock_wg.get_external_ip.return_value = None
mock_wg._get_configured_port.return_value = 51820
r = self.client.post('/api/wireguard/refresh-ip')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsNone(data['endpoint'])
@patch('app.wireguard_manager')
def test_refresh_ip_passes_force_refresh_true(self, mock_wg):
mock_wg.get_external_ip.return_value = '1.2.3.4'
mock_wg._get_configured_port.return_value = 51820
self.client.post('/api/wireguard/refresh-ip')
mock_wg.get_external_ip.assert_called_once_with(force_refresh=True)
@patch('app.wireguard_manager')
def test_refresh_ip_returns_500_on_exception(self, mock_wg):
mock_wg.get_external_ip.side_effect = Exception('network error')
r = self.client.post('/api/wireguard/refresh-ip')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
# ── GET /api/wireguard/peers/statuses ──────────────────────────────────
@patch('app.wireguard_manager')
def test_peer_statuses_returns_dict_keyed_by_public_key(self, mock_wg):
mock_wg.get_all_peer_statuses.return_value = {
'KEY1==': {'latest_handshake': 1700000000, 'transfer_rx': 1024},
'KEY2==': {'latest_handshake': 1700000100, 'transfer_rx': 2048},
}
r = self.client.get('/api/wireguard/peers/statuses')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsInstance(data, dict)
self.assertIn('KEY1==', data)
self.assertIn('KEY2==', data)
@patch('app.wireguard_manager')
def test_peer_statuses_returns_empty_dict_when_no_peers(self, mock_wg):
mock_wg.get_all_peer_statuses.return_value = {}
r = self.client.get('/api/wireguard/peers/statuses')
self.assertEqual(r.status_code, 200)
self.assertEqual(json.loads(r.data), {})
@patch('app.wireguard_manager')
def test_peer_statuses_returns_500_on_exception(self, mock_wg):
mock_wg.get_all_peer_statuses.side_effect = Exception('wg show failed')
r = self.client.get('/api/wireguard/peers/statuses')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
# ── POST /api/wireguard/apply-enforcement ──────────────────────────────
@patch('app.firewall_manager')
@patch('app.peer_registry')
def test_apply_enforcement_returns_ok_and_peer_count(self, mock_reg, mock_fw):
mock_reg.list_peers.return_value = [
{'name': 'peer1', 'public_key': 'K1=='},
{'name': 'peer2', 'public_key': 'K2=='},
]
mock_fw.apply_all_peer_rules.return_value = None
mock_fw.apply_all_dns_rules.return_value = None
r = self.client.post('/api/wireguard/apply-enforcement')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertTrue(data['ok'])
self.assertEqual(data['peers'], 2)
@patch('app.firewall_manager')
@patch('app.peer_registry')
def test_apply_enforcement_calls_both_rule_functions(self, mock_reg, mock_fw):
mock_reg.list_peers.return_value = []
mock_fw.apply_all_peer_rules.return_value = None
mock_fw.apply_all_dns_rules.return_value = None
self.client.post('/api/wireguard/apply-enforcement')
mock_fw.apply_all_peer_rules.assert_called_once()
mock_fw.apply_all_dns_rules.assert_called_once()
@patch('app.peer_registry')
def test_apply_enforcement_returns_500_on_exception(self, mock_reg):
mock_reg.list_peers.side_effect = Exception('registry error')
r = self.client.post('/api/wireguard/apply-enforcement')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
# ── POST /api/wireguard/network/setup ──────────────────────────────────
@patch('app.wireguard_manager')
def test_network_setup_returns_200_on_success(self, mock_wg):
mock_wg.setup_network_configuration.return_value = True
r = self.client.post('/api/wireguard/network/setup')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('message', data)
@patch('app.wireguard_manager')
def test_network_setup_returns_500_when_manager_returns_false(self, mock_wg):
mock_wg.setup_network_configuration.return_value = False
r = self.client.post('/api/wireguard/network/setup')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
@patch('app.wireguard_manager')
def test_network_setup_returns_500_on_exception(self, mock_wg):
mock_wg.setup_network_configuration.side_effect = Exception('iptables fail')
r = self.client.post('/api/wireguard/network/setup')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
# ── GET /api/wireguard/network/status ──────────────────────────────────
@patch('app.wireguard_manager')
def test_network_status_returns_200_with_status_dict(self, mock_wg):
mock_wg.get_network_status.return_value = {
'ip_forwarding': True,
'nat_active': True,
'interface': 'wg0',
}
r = self.client.get('/api/wireguard/network/status')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('ip_forwarding', data)
@patch('app.wireguard_manager')
def test_network_status_returns_500_on_exception(self, mock_wg):
mock_wg.get_network_status.side_effect = Exception('iproute error')
r = self.client.get('/api/wireguard/network/status')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
if __name__ == '__main__':
unittest.main()