add security fixes, port hardening, and expanded QA coverage
Security fixes: - Replace debug=True with env-driven FLASK_DEBUG in app.py - Add _safe_path helper and path-traversal protection to all 6 file routes in file_manager.py - Add peer_name regex and input validation (public_key, name, endpoint_ip) in wireguard_manager.py - Stop returning private key from GET /api/wireguard/keys; return only public_key + has_private_key boolean - Fix is_local_request() XFF bypass by checking remote_addr only, ignoring X-Forwarded-For - Remove duplicate get_all_configs / get_config_summary methods from config_manager.py DevOps: - Bind 6 internal service ports to 127.0.0.1 in docker-compose.yml (radicale, webdav, api, webui, rainloop, filegator) - Move WebDAV credentials to env vars (WEBDAV_USER, WEBDAV_PASS) - Pin flask, flask-cors, requests, cryptography, docker to secure minimum versions in requirements.txt QA (560 tests, 0 failures): - tests/test_wireguard_endpoints.py: 18 new endpoint tests - tests/test_file_endpoints.py: 24 new endpoint tests incl. path traversal - tests/test_container_manager.py: expanded from 2 to 30 tests - tests/test_config_backup_restore_http.py: 25 new tests (new file) - tests/test_config_apply.py: 9 new tests (new file) Docs: - Rewrite README.md with accurate architecture, ports, env vars, security notes - Rewrite QUICKSTART.md with verified commands Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,346 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unit tests for config backup / restore / export / import HTTP routes.
|
||||
|
||||
These tests exercise the Flask layer in api/app.py only.
|
||||
The ConfigManager is mocked throughout.
|
||||
|
||||
Endpoints under test:
|
||||
POST /api/config/backup
|
||||
GET /api/config/backups
|
||||
POST /api/config/restore/<id>
|
||||
GET /api/config/export
|
||||
POST /api/config/import
|
||||
DELETE /api/config/backups/<id>
|
||||
GET /api/config/backups/<id>/download
|
||||
POST /api/config/backup/upload
|
||||
"""
|
||||
|
||||
import sys
|
||||
import io
|
||||
import json
|
||||
import zipfile
|
||||
import tempfile
|
||||
import shutil
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock, PropertyMock
|
||||
|
||||
api_dir = Path(__file__).parent.parent / 'api'
|
||||
sys.path.insert(0, str(api_dir))
|
||||
|
||||
from app import app
|
||||
|
||||
|
||||
class TestCreateConfigBackup(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_backup_returns_200_with_backup_id(self, mock_cm):
|
||||
mock_cm.backup_config.return_value = 'backup_20260424_120000'
|
||||
r = self.client.post('/api/config/backup')
|
||||
self.assertEqual(r.status_code, 200)
|
||||
data = json.loads(r.data)
|
||||
self.assertIn('backup_id', data)
|
||||
self.assertEqual(data['backup_id'], 'backup_20260424_120000')
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_backup_returns_500_on_exception(self, mock_cm):
|
||||
mock_cm.backup_config.side_effect = Exception('disk full')
|
||||
r = self.client.post('/api/config/backup')
|
||||
self.assertEqual(r.status_code, 500)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
|
||||
class TestListConfigBackups(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_list_backups_returns_200_with_list(self, mock_cm):
|
||||
mock_cm.list_backups.return_value = [
|
||||
{'backup_id': 'backup_001', 'timestamp': '2026-04-24T12:00:00'},
|
||||
{'backup_id': 'backup_002', 'timestamp': '2026-04-23T08:00:00'},
|
||||
]
|
||||
r = self.client.get('/api/config/backups')
|
||||
self.assertEqual(r.status_code, 200)
|
||||
data = json.loads(r.data)
|
||||
self.assertIsInstance(data, list)
|
||||
self.assertEqual(len(data), 2)
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_list_backups_returns_500_on_exception(self, mock_cm):
|
||||
mock_cm.list_backups.side_effect = Exception('directory error')
|
||||
r = self.client.get('/api/config/backups')
|
||||
self.assertEqual(r.status_code, 500)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
|
||||
class TestRestoreConfigBackup(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_restore_returns_200_on_success(self, mock_cm):
|
||||
mock_cm.restore_config.return_value = True
|
||||
r = self.client.post('/api/config/restore/backup_001')
|
||||
self.assertEqual(r.status_code, 200)
|
||||
data = json.loads(r.data)
|
||||
self.assertIn('message', data)
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_restore_returns_500_when_manager_returns_false(self, mock_cm):
|
||||
mock_cm.restore_config.return_value = False
|
||||
r = self.client.post('/api/config/restore/backup_missing')
|
||||
self.assertEqual(r.status_code, 500)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_restore_returns_500_on_exception(self, mock_cm):
|
||||
mock_cm.restore_config.side_effect = Exception('corrupt backup')
|
||||
r = self.client.post('/api/config/restore/backup_bad')
|
||||
self.assertEqual(r.status_code, 500)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_restore_passes_services_list_to_manager(self, mock_cm):
|
||||
mock_cm.restore_config.return_value = True
|
||||
payload = {'services': ['network', 'wireguard']}
|
||||
self.client.post(
|
||||
'/api/config/restore/backup_001',
|
||||
data=json.dumps(payload),
|
||||
content_type='application/json',
|
||||
)
|
||||
mock_cm.restore_config.assert_called_once_with(
|
||||
'backup_001', services=['network', 'wireguard']
|
||||
)
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_restore_passes_none_services_when_no_body(self, mock_cm):
|
||||
mock_cm.restore_config.return_value = True
|
||||
self.client.post('/api/config/restore/backup_001')
|
||||
mock_cm.restore_config.assert_called_once_with('backup_001', services=None)
|
||||
|
||||
|
||||
class TestExportConfig(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_export_returns_200_with_config_and_format(self, mock_cm):
|
||||
mock_cm.export_config.return_value = '{"cell_name": "mycell"}'
|
||||
r = self.client.get('/api/config/export')
|
||||
self.assertEqual(r.status_code, 200)
|
||||
data = json.loads(r.data)
|
||||
self.assertIn('config', data)
|
||||
self.assertIn('format', data)
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_export_uses_json_format_by_default(self, mock_cm):
|
||||
mock_cm.export_config.return_value = '{}'
|
||||
self.client.get('/api/config/export')
|
||||
mock_cm.export_config.assert_called_once_with('json')
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_export_passes_format_query_param(self, mock_cm):
|
||||
mock_cm.export_config.return_value = 'yaml: data'
|
||||
self.client.get('/api/config/export?format=yaml')
|
||||
mock_cm.export_config.assert_called_once_with('yaml')
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_export_returns_500_on_exception(self, mock_cm):
|
||||
mock_cm.export_config.side_effect = Exception('serialisation error')
|
||||
r = self.client.get('/api/config/export')
|
||||
self.assertEqual(r.status_code, 500)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
|
||||
class TestImportConfig(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_import_returns_200_on_success(self, mock_cm):
|
||||
mock_cm.import_config.return_value = True
|
||||
r = self.client.post(
|
||||
'/api/config/import',
|
||||
data=json.dumps({'config': '{"cell_name": "mycell"}', 'format': 'json'}),
|
||||
content_type='application/json',
|
||||
)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
data = json.loads(r.data)
|
||||
self.assertIn('message', data)
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_import_returns_400_when_no_body(self, mock_cm):
|
||||
r = self.client.post('/api/config/import')
|
||||
self.assertEqual(r.status_code, 400)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_import_returns_500_when_manager_returns_false(self, mock_cm):
|
||||
mock_cm.import_config.return_value = False
|
||||
r = self.client.post(
|
||||
'/api/config/import',
|
||||
data=json.dumps({'config': 'bad data'}),
|
||||
content_type='application/json',
|
||||
)
|
||||
self.assertEqual(r.status_code, 500)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_import_returns_500_on_exception(self, mock_cm):
|
||||
mock_cm.import_config.side_effect = Exception('parse error')
|
||||
r = self.client.post(
|
||||
'/api/config/import',
|
||||
data=json.dumps({'config': 'something'}),
|
||||
content_type='application/json',
|
||||
)
|
||||
self.assertEqual(r.status_code, 500)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
|
||||
class TestDeleteConfigBackup(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_delete_backup_returns_200_on_success(self, mock_cm):
|
||||
mock_cm.delete_backup.return_value = True
|
||||
r = self.client.delete('/api/config/backups/backup_001')
|
||||
self.assertEqual(r.status_code, 200)
|
||||
data = json.loads(r.data)
|
||||
self.assertIn('message', data)
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_delete_backup_returns_500_when_manager_returns_false(self, mock_cm):
|
||||
mock_cm.delete_backup.return_value = False
|
||||
r = self.client.delete('/api/config/backups/backup_missing')
|
||||
self.assertEqual(r.status_code, 500)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_delete_backup_returns_500_on_exception(self, mock_cm):
|
||||
mock_cm.delete_backup.side_effect = Exception('io error')
|
||||
r = self.client.delete('/api/config/backups/backup_001')
|
||||
self.assertEqual(r.status_code, 500)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
|
||||
class TestDownloadBackup(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
# Create a real temporary backup directory with a manifest so the route
|
||||
# can read it and serve a zip file.
|
||||
self.tmp = tempfile.mkdtemp()
|
||||
self.backup_id = 'backup_test_dl'
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def _make_backup_dir(self, backup_id):
|
||||
"""Create a minimal backup directory with manifest.json."""
|
||||
backup_path = Path(self.tmp) / backup_id
|
||||
backup_path.mkdir(parents=True)
|
||||
(backup_path / 'manifest.json').write_text(json.dumps({'backup_id': backup_id}))
|
||||
(backup_path / 'config.json').write_text('{}')
|
||||
return backup_path
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_download_backup_returns_zip_content_type(self, mock_cm):
|
||||
backup_path = self._make_backup_dir(self.backup_id)
|
||||
mock_cm.backup_dir = Path(self.tmp)
|
||||
r = self.client.get(f'/api/config/backups/{self.backup_id}/download')
|
||||
self.assertEqual(r.status_code, 200)
|
||||
self.assertIn('application/zip', r.content_type)
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_download_backup_returns_404_when_not_found(self, mock_cm):
|
||||
mock_cm.backup_dir = Path(self.tmp)
|
||||
r = self.client.get('/api/config/backups/nonexistent_backup/download')
|
||||
self.assertEqual(r.status_code, 404)
|
||||
|
||||
|
||||
class TestUploadBackup(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
app.config['TESTING'] = True
|
||||
self.client = app.test_client()
|
||||
self.tmp = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def _make_valid_zip(self):
|
||||
"""Return BytesIO containing a valid zip with manifest.json."""
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, 'w') as zf:
|
||||
zf.writestr('manifest.json', json.dumps({'backup_id': 'upload_test'}))
|
||||
zf.writestr('config.json', '{}')
|
||||
buf.seek(0)
|
||||
return buf
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_upload_returns_400_when_no_file(self, mock_cm):
|
||||
r = self.client.post('/api/config/backup/upload')
|
||||
self.assertEqual(r.status_code, 400)
|
||||
self.assertIn('error', json.loads(r.data))
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_upload_returns_200_on_valid_zip(self, mock_cm):
|
||||
backup_dir = Path(self.tmp)
|
||||
mock_cm.backup_dir = backup_dir
|
||||
zip_data = self._make_valid_zip()
|
||||
r = self.client.post(
|
||||
'/api/config/backup/upload',
|
||||
data={'file': (zip_data, 'mybackup.zip')},
|
||||
content_type='multipart/form-data',
|
||||
)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
data = json.loads(r.data)
|
||||
self.assertIn('backup_id', data)
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_upload_returns_400_on_invalid_zip(self, mock_cm):
|
||||
backup_dir = Path(self.tmp)
|
||||
mock_cm.backup_dir = backup_dir
|
||||
r = self.client.post(
|
||||
'/api/config/backup/upload',
|
||||
data={'file': (io.BytesIO(b'this is not a zip'), 'bad.zip')},
|
||||
content_type='multipart/form-data',
|
||||
)
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
@patch('app.config_manager')
|
||||
def test_upload_returns_400_when_zip_missing_manifest(self, mock_cm):
|
||||
backup_dir = Path(self.tmp)
|
||||
mock_cm.backup_dir = backup_dir
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, 'w') as zf:
|
||||
zf.writestr('config.json', '{}') # no manifest.json
|
||||
buf.seek(0)
|
||||
r = self.client.post(
|
||||
'/api/config/backup/upload',
|
||||
data={'file': (buf, 'nomanifest.zip')},
|
||||
content_type='multipart/form-data',
|
||||
)
|
||||
self.assertEqual(r.status_code, 400)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user