a338836bb8
Security fixes: - Replace debug=True with env-driven FLASK_DEBUG in app.py - Add _safe_path helper and path-traversal protection to all 6 file routes in file_manager.py - Add peer_name regex and input validation (public_key, name, endpoint_ip) in wireguard_manager.py - Stop returning private key from GET /api/wireguard/keys; return only public_key + has_private_key boolean - Fix is_local_request() XFF bypass by checking remote_addr only, ignoring X-Forwarded-For - Remove duplicate get_all_configs / get_config_summary methods from config_manager.py DevOps: - Bind 6 internal service ports to 127.0.0.1 in docker-compose.yml (radicale, webdav, api, webui, rainloop, filegator) - Move WebDAV credentials to env vars (WEBDAV_USER, WEBDAV_PASS) - Pin flask, flask-cors, requests, cryptography, docker to secure minimum versions in requirements.txt QA (560 tests, 0 failures): - tests/test_wireguard_endpoints.py: 18 new endpoint tests - tests/test_file_endpoints.py: 24 new endpoint tests incl. path traversal - tests/test_container_manager.py: expanded from 2 to 30 tests - tests/test_config_backup_restore_http.py: 25 new tests (new file) - tests/test_config_apply.py: 9 new tests (new file) Docs: - Rewrite README.md with accurate architecture, ports, env vars, security notes - Rewrite QUICKSTART.md with verified commands Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
232 lines
9.9 KiB
Python
232 lines
9.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Unit tests for WireGuard-specific Flask endpoints in api/app.py.
|
|
|
|
Covers routes that were not already tested in test_api_endpoints.py:
|
|
POST /api/wireguard/check-port
|
|
GET /api/wireguard/server-config
|
|
POST /api/wireguard/refresh-ip
|
|
GET /api/wireguard/peers/statuses
|
|
POST /api/wireguard/apply-enforcement
|
|
POST /api/wireguard/network/setup
|
|
GET /api/wireguard/network/status
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
from app import app
|
|
|
|
|
|
class TestWireGuardEndpoints(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
app.config['TESTING'] = True
|
|
self.client = app.test_client()
|
|
|
|
# ── POST /api/wireguard/check-port ─────────────────────────────────────
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_check_port_returns_port_open_true(self, mock_wg):
|
|
mock_wg.check_port_open.return_value = True
|
|
mock_wg._get_configured_port.return_value = 51820
|
|
r = self.client.post('/api/wireguard/check-port')
|
|
self.assertEqual(r.status_code, 200)
|
|
data = json.loads(r.data)
|
|
self.assertIn('port_open', data)
|
|
self.assertIn('port', data)
|
|
self.assertTrue(data['port_open'])
|
|
self.assertEqual(data['port'], 51820)
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_check_port_returns_port_open_false(self, mock_wg):
|
|
mock_wg.check_port_open.return_value = False
|
|
mock_wg._get_configured_port.return_value = 51820
|
|
r = self.client.post('/api/wireguard/check-port')
|
|
self.assertEqual(r.status_code, 200)
|
|
data = json.loads(r.data)
|
|
self.assertFalse(data['port_open'])
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_check_port_returns_500_on_exception(self, mock_wg):
|
|
mock_wg.check_port_open.side_effect = Exception('socket error')
|
|
r = self.client.post('/api/wireguard/check-port')
|
|
self.assertEqual(r.status_code, 500)
|
|
self.assertIn('error', json.loads(r.data))
|
|
|
|
# ── GET /api/wireguard/server-config ───────────────────────────────────
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_server_config_returns_config_dict(self, mock_wg):
|
|
mock_wg.get_server_config.return_value = {
|
|
'public_key': 'PUBKEY==',
|
|
'endpoint': '1.2.3.4:51820',
|
|
'port': 51820,
|
|
}
|
|
r = self.client.get('/api/wireguard/server-config')
|
|
self.assertEqual(r.status_code, 200)
|
|
data = json.loads(r.data)
|
|
self.assertIn('public_key', data)
|
|
self.assertIn('endpoint', data)
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_server_config_returns_500_on_exception(self, mock_wg):
|
|
mock_wg.get_server_config.side_effect = RuntimeError('wg not running')
|
|
r = self.client.get('/api/wireguard/server-config')
|
|
self.assertEqual(r.status_code, 500)
|
|
self.assertIn('error', json.loads(r.data))
|
|
|
|
# ── POST /api/wireguard/refresh-ip ─────────────────────────────────────
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_refresh_ip_returns_external_ip_and_endpoint(self, mock_wg):
|
|
mock_wg.get_external_ip.return_value = '203.0.113.10'
|
|
mock_wg._get_configured_port.return_value = 51820
|
|
r = self.client.post('/api/wireguard/refresh-ip')
|
|
self.assertEqual(r.status_code, 200)
|
|
data = json.loads(r.data)
|
|
self.assertEqual(data['external_ip'], '203.0.113.10')
|
|
self.assertEqual(data['port'], 51820)
|
|
self.assertEqual(data['endpoint'], '203.0.113.10:51820')
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_refresh_ip_endpoint_is_none_when_ip_unavailable(self, mock_wg):
|
|
mock_wg.get_external_ip.return_value = None
|
|
mock_wg._get_configured_port.return_value = 51820
|
|
r = self.client.post('/api/wireguard/refresh-ip')
|
|
self.assertEqual(r.status_code, 200)
|
|
data = json.loads(r.data)
|
|
self.assertIsNone(data['endpoint'])
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_refresh_ip_passes_force_refresh_true(self, mock_wg):
|
|
mock_wg.get_external_ip.return_value = '1.2.3.4'
|
|
mock_wg._get_configured_port.return_value = 51820
|
|
self.client.post('/api/wireguard/refresh-ip')
|
|
mock_wg.get_external_ip.assert_called_once_with(force_refresh=True)
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_refresh_ip_returns_500_on_exception(self, mock_wg):
|
|
mock_wg.get_external_ip.side_effect = Exception('network error')
|
|
r = self.client.post('/api/wireguard/refresh-ip')
|
|
self.assertEqual(r.status_code, 500)
|
|
self.assertIn('error', json.loads(r.data))
|
|
|
|
# ── GET /api/wireguard/peers/statuses ──────────────────────────────────
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_peer_statuses_returns_dict_keyed_by_public_key(self, mock_wg):
|
|
mock_wg.get_all_peer_statuses.return_value = {
|
|
'KEY1==': {'latest_handshake': 1700000000, 'transfer_rx': 1024},
|
|
'KEY2==': {'latest_handshake': 1700000100, 'transfer_rx': 2048},
|
|
}
|
|
r = self.client.get('/api/wireguard/peers/statuses')
|
|
self.assertEqual(r.status_code, 200)
|
|
data = json.loads(r.data)
|
|
self.assertIsInstance(data, dict)
|
|
self.assertIn('KEY1==', data)
|
|
self.assertIn('KEY2==', data)
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_peer_statuses_returns_empty_dict_when_no_peers(self, mock_wg):
|
|
mock_wg.get_all_peer_statuses.return_value = {}
|
|
r = self.client.get('/api/wireguard/peers/statuses')
|
|
self.assertEqual(r.status_code, 200)
|
|
self.assertEqual(json.loads(r.data), {})
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_peer_statuses_returns_500_on_exception(self, mock_wg):
|
|
mock_wg.get_all_peer_statuses.side_effect = Exception('wg show failed')
|
|
r = self.client.get('/api/wireguard/peers/statuses')
|
|
self.assertEqual(r.status_code, 500)
|
|
self.assertIn('error', json.loads(r.data))
|
|
|
|
# ── POST /api/wireguard/apply-enforcement ──────────────────────────────
|
|
|
|
@patch('app.firewall_manager')
|
|
@patch('app.peer_registry')
|
|
def test_apply_enforcement_returns_ok_and_peer_count(self, mock_reg, mock_fw):
|
|
mock_reg.list_peers.return_value = [
|
|
{'name': 'peer1', 'public_key': 'K1=='},
|
|
{'name': 'peer2', 'public_key': 'K2=='},
|
|
]
|
|
mock_fw.apply_all_peer_rules.return_value = None
|
|
mock_fw.apply_all_dns_rules.return_value = None
|
|
r = self.client.post('/api/wireguard/apply-enforcement')
|
|
self.assertEqual(r.status_code, 200)
|
|
data = json.loads(r.data)
|
|
self.assertTrue(data['ok'])
|
|
self.assertEqual(data['peers'], 2)
|
|
|
|
@patch('app.firewall_manager')
|
|
@patch('app.peer_registry')
|
|
def test_apply_enforcement_calls_both_rule_functions(self, mock_reg, mock_fw):
|
|
mock_reg.list_peers.return_value = []
|
|
mock_fw.apply_all_peer_rules.return_value = None
|
|
mock_fw.apply_all_dns_rules.return_value = None
|
|
self.client.post('/api/wireguard/apply-enforcement')
|
|
mock_fw.apply_all_peer_rules.assert_called_once()
|
|
mock_fw.apply_all_dns_rules.assert_called_once()
|
|
|
|
@patch('app.peer_registry')
|
|
def test_apply_enforcement_returns_500_on_exception(self, mock_reg):
|
|
mock_reg.list_peers.side_effect = Exception('registry error')
|
|
r = self.client.post('/api/wireguard/apply-enforcement')
|
|
self.assertEqual(r.status_code, 500)
|
|
self.assertIn('error', json.loads(r.data))
|
|
|
|
# ── POST /api/wireguard/network/setup ──────────────────────────────────
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_network_setup_returns_200_on_success(self, mock_wg):
|
|
mock_wg.setup_network_configuration.return_value = True
|
|
r = self.client.post('/api/wireguard/network/setup')
|
|
self.assertEqual(r.status_code, 200)
|
|
data = json.loads(r.data)
|
|
self.assertIn('message', data)
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_network_setup_returns_500_when_manager_returns_false(self, mock_wg):
|
|
mock_wg.setup_network_configuration.return_value = False
|
|
r = self.client.post('/api/wireguard/network/setup')
|
|
self.assertEqual(r.status_code, 500)
|
|
self.assertIn('error', json.loads(r.data))
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_network_setup_returns_500_on_exception(self, mock_wg):
|
|
mock_wg.setup_network_configuration.side_effect = Exception('iptables fail')
|
|
r = self.client.post('/api/wireguard/network/setup')
|
|
self.assertEqual(r.status_code, 500)
|
|
self.assertIn('error', json.loads(r.data))
|
|
|
|
# ── GET /api/wireguard/network/status ──────────────────────────────────
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_network_status_returns_200_with_status_dict(self, mock_wg):
|
|
mock_wg.get_network_status.return_value = {
|
|
'ip_forwarding': True,
|
|
'nat_active': True,
|
|
'interface': 'wg0',
|
|
}
|
|
r = self.client.get('/api/wireguard/network/status')
|
|
self.assertEqual(r.status_code, 200)
|
|
data = json.loads(r.data)
|
|
self.assertIn('ip_forwarding', data)
|
|
|
|
@patch('app.wireguard_manager')
|
|
def test_network_status_returns_500_on_exception(self, mock_wg):
|
|
mock_wg.get_network_status.side_effect = Exception('iproute error')
|
|
r = self.client.get('/api/wireguard/network/status')
|
|
self.assertEqual(r.status_code, 500)
|
|
self.assertIn('error', json.loads(r.data))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|