Files
pic/tests/test_api_endpoints.py
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

851 lines
49 KiB
Python

#!/usr/bin/env python3
"""
Unit tests for Flask API endpoints
"""
import sys
from pathlib import Path
# Add api directory to path
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
import unittest
import tempfile
import os
import json
import shutil
from unittest.mock import patch, MagicMock
from datetime import datetime
# Add parent directory to path for imports
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app import app, CellManager
class TestAPIEndpoints(unittest.TestCase):
"""Test cases for API endpoints"""
def setUp(self):
"""Set up test environment"""
self.test_dir = tempfile.mkdtemp()
self.data_dir = os.path.join(self.test_dir, 'data')
self.config_dir = os.path.join(self.test_dir, 'config')
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.config_dir, exist_ok=True)
# Mock environment variables
self.env_patcher = patch.dict(os.environ, {
'CELL_NAME': 'testcell',
'DATA_DIR': self.data_dir,
'CONFIG_DIR': self.config_dir
})
self.env_patcher.start()
# Create test client
app.config['TESTING'] = True
self.client = app.test_client()
def tearDown(self):
"""Clean up test environment"""
self.env_patcher.stop()
shutil.rmtree(self.test_dir)
def test_health_endpoint(self):
"""Test health check endpoint"""
response = self.client.get('/health')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(data['status'], 'healthy')
self.assertIn('timestamp', data)
def test_status_endpoint(self):
"""Test status endpoint"""
response = self.client.get('/api/status')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('cell_name', data)
self.assertIn('domain', data)
self.assertIn('peers_count', data)
self.assertIn('services', data)
self.assertIn('uptime', data)
def test_get_config_endpoint(self):
"""Test get config endpoint"""
response = self.client.get('/api/config')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('cell_name', data)
self.assertIn('domain', data)
self.assertIn('ip_range', data)
self.assertIn('wireguard_port', data)
def test_update_config_endpoint(self):
"""Test update config endpoint"""
update_data = {'cell_name': 'newcell'}
response = self.client.put('/api/config',
data=json.dumps(update_data),
content_type='application/json')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('message', data)
self.assertIn('updated', data['message'])
def test_update_config_no_data(self):
"""Test update config with no data"""
response = self.client.put('/api/config')
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertIn('error', data)
@patch('app.network_manager')
def test_dns_records_endpoints(self, mock_network):
# Mock get_dns_records
mock_network.get_dns_records.return_value = [{'name': 'test', 'type': 'A', 'value': '1.2.3.4'}]
response = self.client.get('/api/dns/records')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIsInstance(data, list)
# Mock add_dns_record
mock_network.add_dns_record.return_value = True
response = self.client.post('/api/dns/records', data=json.dumps({'name': 'test', 'type': 'A', 'value': '1.2.3.4'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_network.add_dns_record.side_effect = Exception('fail')
response = self.client.post('/api/dns/records', data=json.dumps({'name': 'test'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
# Mock remove_dns_record
mock_network.remove_dns_record.return_value = True
response = self.client.delete('/api/dns/records', data=json.dumps({'name': 'test', 'type': 'A'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_network.remove_dns_record.side_effect = Exception('fail')
response = self.client.delete('/api/dns/records', data=json.dumps({'name': 'test'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
@patch('app.network_manager')
def test_dhcp_endpoints(self, mock_network):
# Mock get_dhcp_leases
mock_network.get_dhcp_leases.return_value = [{'ip': '10.0.0.2', 'mac': '00:11:22:33:44:55'}]
response = self.client.get('/api/dhcp/leases')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIsInstance(data, list)
# Mock add_dhcp_reservation
mock_network.add_dhcp_reservation.return_value = True
response = self.client.post('/api/dhcp/reservations', data=json.dumps({'ip': '10.0.0.2', 'mac': '00:11:22:33:44:55'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Missing mac field → 400, not 500
response = self.client.post('/api/dhcp/reservations', data=json.dumps({'ip': '10.0.0.2'}), content_type='application/json')
self.assertEqual(response.status_code, 400)
# Simulate manager error
mock_network.add_dhcp_reservation.side_effect = Exception('fail')
response = self.client.post('/api/dhcp/reservations', data=json.dumps({'ip': '10.0.0.2', 'mac': '00:11:22:33:44:55'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
# Mock remove_dhcp_reservation
mock_network.remove_dhcp_reservation.return_value = True
response = self.client.delete('/api/dhcp/reservations', data=json.dumps({'mac': '00:11:22:33:44:55'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Missing mac → 400
response = self.client.delete('/api/dhcp/reservations', data=json.dumps({'ip': '10.0.0.2'}), content_type='application/json')
self.assertEqual(response.status_code, 400)
# Simulate manager error
mock_network.remove_dhcp_reservation.side_effect = Exception('fail')
response = self.client.delete('/api/dhcp/reservations', data=json.dumps({'mac': '00:11:22:33:44:55'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
@patch('app.network_manager')
def test_ntp_status_endpoint(self, mock_network):
# Mock get_ntp_status
mock_network.get_ntp_status.return_value = {'running': True, 'stats': {}}
response = self.client.get('/api/ntp/status')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('running', data)
# Simulate error
mock_network.get_ntp_status.side_effect = Exception('fail')
response = self.client.get('/api/ntp/status')
self.assertEqual(response.status_code, 500)
@patch('app.network_manager')
def test_network_test_endpoint(self, mock_network):
# Mock test_connectivity
mock_network.test_connectivity.return_value = {'success': True, 'output': 'ok'}
response = self.client.post('/api/network/test', data=json.dumps({'target': '8.8.8.8'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('success', data)
# Simulate error
mock_network.test_connectivity.side_effect = Exception('fail')
response = self.client.post('/api/network/test', data=json.dumps({'target': '8.8.8.8'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
@patch('app.wireguard_manager')
def test_wireguard_endpoints(self, mock_wg):
# /api/wireguard/keys (GET)
mock_wg.get_keys.return_value = {'public_key': 'pub', 'private_key': 'priv'}
response = self.client.get('/api/wireguard/keys')
self.assertEqual(response.status_code, 200)
self.assertIn('public_key', json.loads(response.data))
# Simulate error
mock_wg.get_keys.side_effect = Exception('fail')
response = self.client.get('/api/wireguard/keys')
self.assertEqual(response.status_code, 500)
mock_wg.get_keys.side_effect = None
# /api/wireguard/keys/peer (POST)
mock_wg.generate_peer_keys.return_value = {'peer_key': 'peer'}
response = self.client.post('/api/wireguard/keys/peer', data=json.dumps({'name': 'peer'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_wg.generate_peer_keys.side_effect = Exception('fail')
response = self.client.post('/api/wireguard/keys/peer', data=json.dumps({'name': 'peer'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_wg.generate_peer_keys.side_effect = None
# /api/wireguard/config (GET)
mock_wg.get_config.return_value = {'config': 'wg0'}
response = self.client.get('/api/wireguard/config')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_wg.get_config.side_effect = Exception('fail')
response = self.client.get('/api/wireguard/config')
self.assertEqual(response.status_code, 500)
mock_wg.get_config.side_effect = None
# /api/wireguard/peers (GET)
mock_wg.get_peers.return_value = [{'peer': 'peer1'}]
response = self.client.get('/api/wireguard/peers')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_wg.get_peers.side_effect = Exception('fail')
response = self.client.get('/api/wireguard/peers')
self.assertEqual(response.status_code, 500)
mock_wg.get_peers.side_effect = None
# /api/wireguard/peers (POST)
mock_wg.add_peer.return_value = {'result': 'ok'}
response = self.client.post('/api/wireguard/peers', data=json.dumps({'peer': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_wg.add_peer.side_effect = Exception('fail')
response = self.client.post('/api/wireguard/peers', data=json.dumps({'peer': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_wg.add_peer.side_effect = None
# /api/wireguard/peers (DELETE)
mock_wg.remove_peer.return_value = {'result': 'ok'}
response = self.client.delete('/api/wireguard/peers', data=json.dumps({'peer': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_wg.remove_peer.side_effect = Exception('fail')
response = self.client.delete('/api/wireguard/peers', data=json.dumps({'peer': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_wg.remove_peer.side_effect = None
# /api/wireguard/status (GET)
mock_wg.get_status.return_value = {'status': 'ok'}
response = self.client.get('/api/wireguard/status')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_wg.get_status.side_effect = Exception('fail')
response = self.client.get('/api/wireguard/status')
self.assertEqual(response.status_code, 500)
mock_wg.get_status.side_effect = None
# /api/wireguard/connectivity (POST)
mock_wg.test_connectivity.return_value = {'success': True}
response = self.client.post('/api/wireguard/connectivity', data=json.dumps({'target': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_wg.test_connectivity.side_effect = Exception('fail')
response = self.client.post('/api/wireguard/connectivity', data=json.dumps({'target': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_wg.test_connectivity.side_effect = None
# /api/wireguard/peers/ip (PUT)
mock_wg.update_peer_ip.return_value = {'success': True}
response = self.client.put('/api/wireguard/peers/ip', data=json.dumps({'peer': 'peer1', 'ip': '10.0.0.2'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_wg.update_peer_ip.side_effect = Exception('fail')
response = self.client.put('/api/wireguard/peers/ip', data=json.dumps({'peer': 'peer1', 'ip': '10.0.0.2'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_wg.update_peer_ip.side_effect = None
# /api/wireguard/peers/config (POST)
mock_wg.get_peer_config.return_value = {'config': 'peer1'}
response = self.client.post('/api/wireguard/peers/config', data=json.dumps({'peer': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Simulate error
mock_wg.get_peer_config.side_effect = Exception('fail')
response = self.client.post('/api/wireguard/peers/config', data=json.dumps({'peer': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_wg.get_peer_config.side_effect = None
@patch('app.file_manager')
@patch('app.calendar_manager')
@patch('app.email_manager')
@patch('app.auth_manager')
@patch('app.peer_registry')
def test_peer_registry_endpoints(self, mock_peers, mock_auth, mock_email, mock_cal, mock_files):
# Stub out service provisioning so POST /api/peers can succeed
mock_auth.create_user.return_value = True
mock_auth.delete_user.return_value = True
mock_auth.list_users.return_value = [] # keep auth hook inactive
mock_email.create_email_user.return_value = True
mock_email.delete_email_user.return_value = True
mock_cal.create_calendar_user.return_value = True
mock_cal.delete_calendar_user.return_value = True
mock_files.create_user.return_value = True
mock_files.delete_user.return_value = True
# /api/peers (GET)
mock_peers.list_peers.return_value = [{'peer': 'peer1', 'ip': '10.0.0.2'}]
response = self.client.get('/api/peers')
self.assertEqual(response.status_code, 200)
self.assertIsInstance(json.loads(response.data), list)
# Simulate error
mock_peers.list_peers.side_effect = Exception('fail')
response = self.client.get('/api/peers')
self.assertEqual(response.status_code, 500)
mock_peers.list_peers.side_effect = None
# /api/peers (POST) — password now required
mock_peers.add_peer.return_value = True
response = self.client.post('/api/peers', data=json.dumps({'name': 'peer1', 'ip': '10.0.0.2', 'public_key': 'key', 'password': 'PeerPass123!'}), content_type='application/json')
self.assertEqual(response.status_code, 201)
# Duplicate
mock_peers.add_peer.return_value = False
response = self.client.post('/api/peers', data=json.dumps({'name': 'peer1', 'ip': '10.0.0.2', 'public_key': 'key', 'password': 'PeerPass123!'}), content_type='application/json')
self.assertEqual(response.status_code, 400)
# Missing field
response = self.client.post('/api/peers', data=json.dumps({'ip': '10.0.0.2', 'public_key': 'key'}), content_type='application/json')
self.assertEqual(response.status_code, 400)
# Simulate error from peer_registry
mock_peers.add_peer.return_value = True
mock_peers.add_peer.side_effect = Exception('fail')
response = self.client.post('/api/peers', data=json.dumps({'name': 'peer2', 'ip': '10.0.0.3', 'public_key': 'key', 'password': 'PeerPass123!'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_peers.add_peer.side_effect = None
# /api/peers/<peer_name> (DELETE)
mock_peers.remove_peer.return_value = True
response = self.client.delete('/api/peers/peer1')
self.assertEqual(response.status_code, 200)
mock_peers.remove_peer.return_value = False
response = self.client.delete('/api/peers/peer1')
self.assertEqual(response.status_code, 200)
mock_peers.remove_peer.side_effect = Exception('fail')
response = self.client.delete('/api/peers/peer1')
self.assertEqual(response.status_code, 500)
mock_peers.remove_peer.side_effect = None
# /api/peers/register (POST)
mock_peers.register_peer.return_value = {'result': 'ok'}
response = self.client.post('/api/peers/register', data=json.dumps({'peer': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_peers.register_peer.side_effect = Exception('fail')
response = self.client.post('/api/peers/register', data=json.dumps({'peer': 'peer1'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_peers.register_peer.side_effect = None
# /api/peers/<peer_name>/unregister (DELETE)
mock_peers.unregister_peer.return_value = {'result': 'ok'}
response = self.client.delete('/api/peers/peer1/unregister')
self.assertEqual(response.status_code, 200)
mock_peers.unregister_peer.side_effect = Exception('fail')
response = self.client.delete('/api/peers/peer1/unregister')
self.assertEqual(response.status_code, 500)
mock_peers.unregister_peer.side_effect = None
# /api/peers/<peer_name>/update-ip (PUT)
mock_peers.update_peer_ip.return_value = True
response = self.client.put('/api/peers/peer1/update-ip', data=json.dumps({'ip': '10.0.0.3'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_peers.update_peer_ip.return_value = False
response = self.client.put('/api/peers/peer1/update-ip', data=json.dumps({'ip': '10.0.0.3'}), content_type='application/json')
self.assertEqual(response.status_code, 404)
mock_peers.update_peer_ip.side_effect = Exception('fail')
response = self.client.put('/api/peers/peer1/update-ip', data=json.dumps({'ip': '10.0.0.3'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_peers.update_peer_ip.side_effect = None
@patch('app.email_manager')
def test_email_endpoints(self, mock_email):
# Ensure all relevant mock methods return JSON-serializable values
mock_email.get_users.return_value = [{'username': 'user1', 'domain': 'cell', 'email': 'user1@cell'}]
mock_email.create_email_user.return_value = True
mock_email.delete_email_user.return_value = True
mock_email.get_status.return_value = {'postfix_running': True, 'dovecot_running': True, 'total_users': 1, 'total_size_bytes': 0, 'total_size_mb': 0.0, 'users': [{'username': 'user1', 'domain': 'cell', 'email': 'user1@cell'}]}
mock_email.test_connectivity.return_value = {'smtp': {'success': True, 'message': 'SMTP server responding'}}
mock_email.send_email.return_value = True
mock_email.get_mailbox_info.return_value = {'username': 'user1', 'domain': 'cell', 'email': 'user1@cell', 'total_messages': 0, 'total_size_bytes': 0, 'total_size_mb': 0.0, 'folders': {}}
# /api/email/users (GET)
response = self.client.get('/api/email/users')
self.assertEqual(response.status_code, 200)
self.assertIsInstance(json.loads(response.data), list)
mock_email.get_users.side_effect = Exception('fail')
response = self.client.get('/api/email/users')
self.assertEqual(response.status_code, 500)
mock_email.get_users.side_effect = None
# /api/email/users (POST)
response = self.client.post('/api/email/users', data=json.dumps({'username': 'user1', 'domain': 'cell', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_email.create_email_user.side_effect = Exception('fail')
response = self.client.post('/api/email/users', data=json.dumps({'username': 'user1', 'domain': 'cell', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_email.create_email_user.side_effect = None
# /api/email/users/<username> (DELETE)
response = self.client.delete('/api/email/users/user1')
self.assertEqual(response.status_code, 200)
mock_email.delete_email_user.side_effect = Exception('fail')
response = self.client.delete('/api/email/users/user1')
self.assertEqual(response.status_code, 500)
mock_email.delete_email_user.side_effect = None
# /api/email/status (GET)
response = self.client.get('/api/email/status')
self.assertEqual(response.status_code, 200)
mock_email.get_status.side_effect = Exception('fail')
response = self.client.get('/api/email/status')
self.assertEqual(response.status_code, 500)
mock_email.get_status.side_effect = None
# /api/email/connectivity (GET)
response = self.client.get('/api/email/connectivity')
self.assertEqual(response.status_code, 200)
mock_email.test_connectivity.side_effect = Exception('fail')
response = self.client.get('/api/email/connectivity')
self.assertEqual(response.status_code, 500)
mock_email.test_connectivity.side_effect = None
# /api/email/send (POST)
response = self.client.post('/api/email/send', data=json.dumps({'from': 'a', 'to': 'b', 'subject': 's', 'body': 'b'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_email.send_email.side_effect = Exception('fail')
response = self.client.post('/api/email/send', data=json.dumps({'from': 'a', 'to': 'b', 'subject': 's', 'body': 'b'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_email.send_email.side_effect = None
# /api/email/mailbox/<username> (GET)
response = self.client.get('/api/email/mailbox/user1')
self.assertEqual(response.status_code, 200)
mock_email.get_mailbox_info.side_effect = Exception('fail')
response = self.client.get('/api/email/mailbox/user1')
self.assertEqual(response.status_code, 500)
mock_email.get_mailbox_info.side_effect = None
@patch('app.calendar_manager')
def test_calendar_endpoints(self, mock_calendar):
# Mock return values for all relevant calendar_manager methods
mock_calendar.get_users.return_value = [{'username': 'user1', 'collections': {'calendars': ['cal1'], 'contacts': ['c1']}}]
mock_calendar.create_calendar_user.return_value = True
mock_calendar.delete_calendar_user.return_value = True
mock_calendar.create_calendar.return_value = {'calendar': 'cal1'}
mock_calendar.add_event.return_value = {'event': 'event1'}
mock_calendar.get_events.return_value = [{'event': 'event1'}]
mock_calendar.get_status.return_value = {'radicale_running': True, 'total_users': 1, 'total_calendars': 1, 'total_contacts': 1, 'users': [{'username': 'user1', 'collections': {'calendars': ['cal1'], 'contacts': ['c1']}}]}
mock_calendar.test_connectivity.return_value = {'success': True}
# /api/calendar/users (GET)
response = self.client.get('/api/calendar/users')
self.assertEqual(response.status_code, 200)
self.assertIsInstance(json.loads(response.data), list)
mock_calendar.get_users.side_effect = Exception('fail')
response = self.client.get('/api/calendar/users')
self.assertEqual(response.status_code, 500)
mock_calendar.get_users.side_effect = None
# /api/calendar/users (POST)
response = self.client.post('/api/calendar/users', data=json.dumps({'username': 'user1', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_calendar.create_calendar_user.side_effect = Exception('fail')
response = self.client.post('/api/calendar/users', data=json.dumps({'username': 'user1', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_calendar.create_calendar_user.side_effect = None
# /api/calendar/users/<username> (DELETE)
response = self.client.delete('/api/calendar/users/user1')
self.assertEqual(response.status_code, 200)
mock_calendar.delete_calendar_user.side_effect = Exception('fail')
response = self.client.delete('/api/calendar/users/user1')
self.assertEqual(response.status_code, 500)
mock_calendar.delete_calendar_user.side_effect = None
# /api/calendar/calendars (POST)
response = self.client.post('/api/calendar/calendars', data=json.dumps({'username': 'user1', 'calendar_name': 'cal1'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_calendar.create_calendar.side_effect = Exception('fail')
response = self.client.post('/api/calendar/calendars', data=json.dumps({'username': 'user1', 'calendar_name': 'cal1'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_calendar.create_calendar.side_effect = None
# /api/calendar/events (POST)
response = self.client.post('/api/calendar/events', data=json.dumps({'username': 'user1', 'calendar_name': 'cal1', 'event': 'event1'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_calendar.add_event.side_effect = Exception('fail')
response = self.client.post('/api/calendar/events', data=json.dumps({'username': 'user1', 'calendar_name': 'cal1', 'event': 'event1'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_calendar.add_event.side_effect = None
# /api/calendar/events/<username>/<calendar_name> (GET)
response = self.client.get('/api/calendar/events/user1/cal1')
self.assertEqual(response.status_code, 200)
mock_calendar.get_events.side_effect = Exception('fail')
response = self.client.get('/api/calendar/events/user1/cal1')
self.assertEqual(response.status_code, 500)
mock_calendar.get_events.side_effect = None
# /api/calendar/status (GET)
response = self.client.get('/api/calendar/status')
self.assertEqual(response.status_code, 200)
mock_calendar.get_status.side_effect = Exception('fail')
response = self.client.get('/api/calendar/status')
self.assertEqual(response.status_code, 500)
mock_calendar.get_status.side_effect = None
# /api/calendar/connectivity (GET)
response = self.client.get('/api/calendar/connectivity')
self.assertEqual(response.status_code, 200)
mock_calendar.test_connectivity.side_effect = Exception('fail')
response = self.client.get('/api/calendar/connectivity')
self.assertEqual(response.status_code, 500)
mock_calendar.test_connectivity.side_effect = None
@patch('app.file_manager')
def test_file_endpoints(self, mock_file):
# Mock return values for all relevant file_manager methods
mock_file.get_users.return_value = [{'username': 'user1', 'storage_info': {'total_files': 1, 'total_size_bytes': 1000}}]
mock_file.create_user.return_value = True
mock_file.delete_user.return_value = True
mock_file.get_status.return_value = {'webdav_running': True, 'total_users': 1, 'total_files': 1, 'total_size_bytes': 1000, 'total_size_mb': 1.0, 'users': [{'username': 'user1', 'storage_info': {'total_files': 1, 'total_size_bytes': 1000}}]}
mock_file.test_connectivity.return_value = {'success': True}
# /api/files/users (GET)
response = self.client.get('/api/files/users')
self.assertEqual(response.status_code, 200)
self.assertIsInstance(json.loads(response.data), list)
mock_file.get_users.side_effect = Exception('fail')
response = self.client.get('/api/files/users')
self.assertEqual(response.status_code, 500)
mock_file.get_users.side_effect = None
# /api/files/users (POST)
response = self.client.post('/api/files/users', data=json.dumps({'username': 'user1', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_file.create_user.side_effect = Exception('fail')
response = self.client.post('/api/files/users', data=json.dumps({'username': 'user1', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_file.create_user.side_effect = None
# /api/files/users/<username> (DELETE)
response = self.client.delete('/api/files/users/user1')
self.assertEqual(response.status_code, 200)
mock_file.delete_user.side_effect = Exception('fail')
response = self.client.delete('/api/files/users/user1')
self.assertEqual(response.status_code, 500)
mock_file.delete_user.side_effect = None
# /api/files/status (GET)
response = self.client.get('/api/files/status')
self.assertEqual(response.status_code, 200)
mock_file.get_status.side_effect = Exception('fail')
response = self.client.get('/api/files/status')
self.assertEqual(response.status_code, 500)
mock_file.get_status.side_effect = None
# /api/files/connectivity (GET)
response = self.client.get('/api/files/connectivity')
self.assertEqual(response.status_code, 200)
mock_file.test_connectivity.side_effect = Exception('fail')
response = self.client.get('/api/files/connectivity')
self.assertEqual(response.status_code, 500)
mock_file.test_connectivity.side_effect = None
@patch('app.routing_manager')
def test_routing_endpoints(self, mock_routing):
# Mock return values for all relevant routing_manager methods
mock_routing.get_status.return_value = {'routing_running': True, 'routes': []}
mock_routing.add_nat_rule.return_value = {'result': 'ok'}
mock_routing.get_nat_rules.return_value = [{'id': 1, 'rule': 'nat'}]
mock_routing.remove_nat_rule.return_value = {'result': 'ok'}
mock_routing.add_firewall_rule.return_value = {'result': 'ok'}
mock_routing.get_firewall_rules.return_value = [{'id': 1, 'rule': 'fw'}]
mock_routing.add_peer_route.return_value = {'result': 'ok'}
mock_routing.get_peer_routes.return_value = [{'peer': 'peer1', 'route': '10.0.0.2'}]
mock_routing.remove_peer_route.return_value = {'result': 'ok'}
mock_routing.add_exit_node.return_value = {'result': 'ok'}
mock_routing.add_bridge_route.return_value = {'result': 'ok'}
mock_routing.add_split_route.return_value = {'result': 'ok'}
mock_routing.test_routing_connectivity.return_value = {'ping': {'success': True, 'output': '', 'error': ''}}
mock_routing.remove_firewall_rule.return_value = True
mock_routing.get_live_iptables.return_value = {'filter': '', 'nat': ''}
mock_routing.get_routing_logs.return_value = {'logs': 'logdata'}
# /api/routing/status (GET)
response = self.client.get('/api/routing/status')
self.assertEqual(response.status_code, 200)
mock_routing.get_status.side_effect = Exception('fail')
response = self.client.get('/api/routing/status')
self.assertEqual(response.status_code, 500)
mock_routing.get_status.side_effect = None
# /api/routing/nat (POST)
response = self.client.post('/api/routing/nat', data=json.dumps({'rule': 'nat'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_nat_rule.side_effect = Exception('fail')
response = self.client.post('/api/routing/nat', data=json.dumps({'rule': 'nat'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_nat_rule.side_effect = None
# /api/routing/nat (GET)
response = self.client.get('/api/routing/nat')
self.assertEqual(response.status_code, 200)
mock_routing.get_nat_rules.side_effect = Exception('fail')
response = self.client.get('/api/routing/nat')
self.assertEqual(response.status_code, 500)
mock_routing.get_nat_rules.side_effect = None
# /api/routing/nat/<rule_id> (DELETE)
response = self.client.delete('/api/routing/nat/1')
self.assertEqual(response.status_code, 200)
mock_routing.remove_nat_rule.side_effect = Exception('fail')
response = self.client.delete('/api/routing/nat/1')
self.assertEqual(response.status_code, 500)
mock_routing.remove_nat_rule.side_effect = None
# /api/routing/firewall (POST)
response = self.client.post('/api/routing/firewall', data=json.dumps({'rule': 'fw'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_firewall_rule.side_effect = Exception('fail')
response = self.client.post('/api/routing/firewall', data=json.dumps({'rule': 'fw'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_firewall_rule.side_effect = None
# /api/routing/firewall (GET)
response = self.client.get('/api/routing/firewall')
self.assertEqual(response.status_code, 200)
mock_routing.get_firewall_rules.side_effect = Exception('fail')
response = self.client.get('/api/routing/firewall')
self.assertEqual(response.status_code, 500)
mock_routing.get_firewall_rules.side_effect = None
# /api/routing/peers (POST)
response = self.client.post('/api/routing/peers', data=json.dumps({'peer_name': 'peer1', 'peer_ip': '10.0.0.2'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_peer_route.side_effect = Exception('fail')
response = self.client.post('/api/routing/peers', data=json.dumps({'peer_name': 'peer1', 'peer_ip': '10.0.0.2'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_peer_route.side_effect = None
# /api/routing/peers (GET)
response = self.client.get('/api/routing/peers')
self.assertEqual(response.status_code, 200)
mock_routing.get_peer_routes.side_effect = Exception('fail')
response = self.client.get('/api/routing/peers')
self.assertEqual(response.status_code, 500)
mock_routing.get_peer_routes.side_effect = None
# /api/routing/peers/<peer_name> (DELETE)
response = self.client.delete('/api/routing/peers/peer1')
self.assertEqual(response.status_code, 200)
mock_routing.remove_peer_route.side_effect = Exception('fail')
response = self.client.delete('/api/routing/peers/peer1')
self.assertEqual(response.status_code, 500)
mock_routing.remove_peer_route.side_effect = None
# /api/routing/exit-nodes (POST)
response = self.client.post('/api/routing/exit-nodes', data=json.dumps({'peer_name': 'exit1', 'peer_ip': '10.0.0.5'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_exit_node.side_effect = Exception('fail')
response = self.client.post('/api/routing/exit-nodes', data=json.dumps({'peer_name': 'exit1', 'peer_ip': '10.0.0.5'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_exit_node.side_effect = None
# /api/routing/bridge (POST)
response = self.client.post('/api/routing/bridge', data=json.dumps({'source_peer': 'peer1', 'target_peer': 'peer2'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_bridge_route.side_effect = Exception('fail')
response = self.client.post('/api/routing/bridge', data=json.dumps({'source_peer': 'peer1', 'target_peer': 'peer2'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_bridge_route.side_effect = None
# /api/routing/split (POST)
response = self.client.post('/api/routing/split', data=json.dumps({'network': '10.0.0.0/24', 'exit_peer': '10.0.0.5'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_split_route.side_effect = Exception('fail')
response = self.client.post('/api/routing/split', data=json.dumps({'network': '10.0.0.0/24', 'exit_peer': '10.0.0.5'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_split_route.side_effect = None
# /api/routing/connectivity (POST)
response = self.client.post('/api/routing/connectivity', data=json.dumps({'target_ip': '8.8.8.8'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.test_routing_connectivity.side_effect = Exception('fail')
response = self.client.post('/api/routing/connectivity', data=json.dumps({'target_ip': '8.8.8.8'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.test_routing_connectivity.side_effect = None
# /api/routing/firewall/<id> (DELETE)
response = self.client.delete('/api/routing/firewall/fw_1')
self.assertEqual(response.status_code, 200)
mock_routing.remove_firewall_rule.return_value = False
response = self.client.delete('/api/routing/firewall/fw_999')
self.assertEqual(response.status_code, 404)
mock_routing.remove_firewall_rule.return_value = True
# /api/routing/live-iptables (GET)
response = self.client.get('/api/routing/live-iptables')
self.assertEqual(response.status_code, 200)
mock_routing.get_live_iptables.side_effect = Exception('fail')
response = self.client.get('/api/routing/live-iptables')
self.assertEqual(response.status_code, 500)
mock_routing.get_live_iptables.side_effect = None
# /api/routing/logs (GET)
mock_routing.get_logs.return_value = {
'iptables': 'iptables log data',
'routing': 'routing log data',
'routes': 'route log data'
}
response = self.client.get('/api/routing/logs')
self.assertEqual(response.status_code, 200)
mock_routing.get_logs.side_effect = Exception('fail')
response = self.client.get('/api/routing/logs')
self.assertEqual(response.status_code, 500)
mock_routing.get_logs.side_effect = None
@patch('app.app.vault_manager')
def test_vault_endpoints(self, mock_vault):
# Mock return values for all relevant vault_manager methods
mock_vault.get_status = MagicMock(return_value={'vault_running': True, 'certs': 2})
mock_vault.list_certificates = MagicMock(return_value=[{'common_name': 'test', 'valid': True}])
mock_vault.generate_certificate = MagicMock(return_value={'certificate': 'certdata'})
mock_vault.revoke_certificate = MagicMock(return_value=True)
mock_vault.get_ca_certificate = MagicMock(return_value='ca_cert_data')
mock_vault.get_age_public_key = MagicMock(return_value='age_pubkey')
mock_vault.get_trusted_keys = MagicMock(return_value=[{'name': 'key1', 'public_key': 'pk1'}])
mock_vault.add_trusted_key = MagicMock(return_value=True)
mock_vault.remove_trusted_key = MagicMock(return_value=True)
mock_vault.verify_trust_chain = MagicMock(return_value=True)
mock_vault.get_trust_chains = MagicMock(return_value=[{'chain': 'chain1'}])
# /api/vault/status (GET)
response = self.client.get('/api/vault/status')
self.assertEqual(response.status_code, 200)
mock_vault.get_status.side_effect = Exception('fail')
response = self.client.get('/api/vault/status')
self.assertEqual(response.status_code, 500)
mock_vault.get_status.side_effect = None
# /api/vault/certificates (GET)
response = self.client.get('/api/vault/certificates')
self.assertEqual(response.status_code, 200)
mock_vault.list_certificates.side_effect = Exception('fail')
response = self.client.get('/api/vault/certificates')
self.assertEqual(response.status_code, 500)
mock_vault.list_certificates.side_effect = None
# /api/vault/certificates (POST)
response = self.client.post('/api/vault/certificates', data=json.dumps({'common_name': 'test'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_vault.generate_certificate.side_effect = Exception('fail')
response = self.client.post('/api/vault/certificates', data=json.dumps({'common_name': 'test'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_vault.generate_certificate.side_effect = None
# /api/vault/certificates/<common_name> (DELETE)
response = self.client.delete('/api/vault/certificates/test')
self.assertEqual(response.status_code, 200)
mock_vault.revoke_certificate.side_effect = Exception('fail')
response = self.client.delete('/api/vault/certificates/test')
self.assertEqual(response.status_code, 500)
mock_vault.revoke_certificate.side_effect = None
# /api/vault/ca/certificate (GET)
response = self.client.get('/api/vault/ca/certificate')
self.assertEqual(response.status_code, 200)
mock_vault.get_ca_certificate.side_effect = Exception('fail')
response = self.client.get('/api/vault/ca/certificate')
self.assertEqual(response.status_code, 500)
mock_vault.get_ca_certificate.side_effect = None
# /api/vault/age/public-key (GET)
response = self.client.get('/api/vault/age/public-key')
self.assertEqual(response.status_code, 200)
mock_vault.get_age_public_key.side_effect = Exception('fail')
response = self.client.get('/api/vault/age/public-key')
self.assertEqual(response.status_code, 500)
mock_vault.get_age_public_key.side_effect = None
# /api/vault/trust/keys (GET)
response = self.client.get('/api/vault/trust/keys')
self.assertEqual(response.status_code, 200)
mock_vault.get_trusted_keys.side_effect = Exception('fail')
response = self.client.get('/api/vault/trust/keys')
self.assertEqual(response.status_code, 500)
mock_vault.get_trusted_keys.side_effect = None
# /api/vault/trust/keys (POST)
response = self.client.post('/api/vault/trust/keys', data=json.dumps({'name': 'key1', 'public_key': 'pk1'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_vault.add_trusted_key.side_effect = Exception('fail')
response = self.client.post('/api/vault/trust/keys', data=json.dumps({'name': 'key1', 'public_key': 'pk1'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_vault.add_trusted_key.side_effect = None
# /api/vault/trust/keys/<name> (DELETE)
response = self.client.delete('/api/vault/trust/keys/key1')
self.assertEqual(response.status_code, 200)
mock_vault.remove_trusted_key.side_effect = Exception('fail')
response = self.client.delete('/api/vault/trust/keys/key1')
self.assertEqual(response.status_code, 500)
mock_vault.remove_trusted_key.side_effect = None
# /api/vault/trust/verify (POST)
response = self.client.post('/api/vault/trust/verify', data=json.dumps({'peer_name': 'peer1', 'signature': 'sig', 'data': 'data'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_vault.verify_trust_chain.side_effect = Exception('fail')
response = self.client.post('/api/vault/trust/verify', data=json.dumps({'peer_name': 'peer1', 'signature': 'sig', 'data': 'data'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_vault.verify_trust_chain.side_effect = None
# /api/vault/trust/chains (GET)
response = self.client.get('/api/vault/trust/chains')
self.assertEqual(response.status_code, 200)
mock_vault.get_trust_chains.side_effect = Exception('fail')
response = self.client.get('/api/vault/trust/chains')
self.assertEqual(response.status_code, 500)
mock_vault.get_trust_chains.side_effect = None
@patch('app.app.vault_manager')
def test_secrets_api_endpoints(self, mock_vault):
mock_vault.list_secrets.return_value = ['API_KEY']
mock_vault.store_secret.return_value = True
mock_vault.get_secret.return_value = 'supersecret'
mock_vault.delete_secret.return_value = True
# List secrets
response = self.client.get('/api/vault/secrets')
self.assertEqual(response.status_code, 200)
self.assertIn('API_KEY', json.loads(response.data)['secrets'])
# Store secret
response = self.client.post('/api/vault/secrets', data=json.dumps({'name': 'API_KEY', 'value': 'supersecret'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
# Get secret
response = self.client.get('/api/vault/secrets/API_KEY')
self.assertEqual(response.status_code, 200)
self.assertEqual(json.loads(response.data)['value'], 'supersecret')
# Delete secret
response = self.client.delete('/api/vault/secrets/API_KEY')
self.assertEqual(response.status_code, 200)
# Container creation with secrets
mock_vault.get_secret.side_effect = lambda name: 'supersecret' if name == 'API_KEY' else None
with patch('app.container_manager') as mock_container:
mock_container.create_container.return_value = {'id': 'cid', 'name': 'cname'}
data = {'image': 'nginx', 'secrets': ['API_KEY']}
response = self.client.post('/api/containers', data=json.dumps(data), content_type='application/json')
self.assertEqual(response.status_code, 200)
args, kwargs = mock_container.create_container.call_args
self.assertIn('API_KEY', kwargs['env'])
self.assertEqual(kwargs['env']['API_KEY'], 'supersecret')
@patch('app.container_manager')
def test_container_endpoints(self, mock_container):
# Simulate local request
with self.client as c:
c.environ_base['REMOTE_ADDR'] = '127.0.0.1'
# List containers
mock_container.list_containers.return_value = [{'id': 'abc', 'name': 'test', 'status': 'running', 'image': ['img'], 'labels': {}}]
response = c.get('/api/containers')
self.assertEqual(response.status_code, 200)
self.assertIsInstance(json.loads(response.data), list)
mock_container.list_containers.side_effect = Exception('fail')
response = c.get('/api/containers')
self.assertEqual(response.status_code, 500)
mock_container.list_containers.side_effect = None
# Start container
mock_container.start_container.return_value = True
response = c.post('/api/containers/test/start')
self.assertEqual(response.status_code, 200)
mock_container.start_container.side_effect = Exception('fail')
response = c.post('/api/containers/test/start')
self.assertEqual(response.status_code, 500)
mock_container.start_container.side_effect = None
# Stop container
mock_container.stop_container.return_value = True
response = c.post('/api/containers/test/stop')
self.assertEqual(response.status_code, 200)
mock_container.stop_container.side_effect = Exception('fail')
response = c.post('/api/containers/test/stop')
self.assertEqual(response.status_code, 500)
mock_container.stop_container.side_effect = None
# Restart container
mock_container.restart_container.return_value = True
response = c.post('/api/containers/test/restart')
self.assertEqual(response.status_code, 200)
mock_container.restart_container.side_effect = Exception('fail')
response = c.post('/api/containers/test/restart')
self.assertEqual(response.status_code, 500)
mock_container.restart_container.side_effect = None
# Simulate non-local request
with self.client as c:
c.environ_base['REMOTE_ADDR'] = '8.8.8.8'
response = c.get('/api/containers')
self.assertEqual(response.status_code, 403)
response = c.post('/api/containers/test/start')
self.assertEqual(response.status_code, 403)
response = c.post('/api/containers/test/stop')
self.assertEqual(response.status_code, 403)
response = c.post('/api/containers/test/restart')
self.assertEqual(response.status_code, 403)
if __name__ == '__main__':
unittest.main()