fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests

P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 11:30:21 -04:00
parent 0c12e3fc97
commit a43f9fbf0d
47 changed files with 4578 additions and 579 deletions
+6
View File
@@ -1,10 +1,16 @@
import os
import shutil
import pytest
import tempfile
import secrets
from helpers.wg_runner import WGInterface, build_wg_config, cleanup_stale_e2e_interfaces
def pytest_configure(config):
if not shutil.which('wg-quick'):
pytest.skip('wg-quick not found — skipping WireGuard E2E tests', allow_module_level=True)
@pytest.fixture(scope='session', autouse=True)
def cleanup_stale_wg_interfaces():
cleanup_stale_e2e_interfaces()
+275
View File
@@ -0,0 +1,275 @@
"""
WireGuard E2E: Caddy per-domain routing correctness.
Scenarios covered:
35. api.<domain> proxies to the API (returns JSON), not the WebUI
36. calendar.<domain> via VIP proxies to Radicale, not the WebUI
37. files.<domain> via VIP proxies to Filegator, not the WebUI
38. mail.<domain> via VIP proxies to Rainloop, not the WebUI
39. webdav.<domain> via VIP proxies to the WebDAV service, not the WebUI
40. Direct VIP requests (by IP) go to the correct service
41. Catch-all :80 serves WebUI for unknown hosts but routes /api/* to API
The WebUI serves a React app — its HTML starts with '<!doctype html>'.
Any service domain that returns that string is incorrectly falling through
to the catch-all :80 block instead of being routed by its Host header.
These tests require a live PIC stack with WireGuard and are marked `wg`.
They run via `make test-e2e-wg` or `pytest tests/e2e/wg/ -m wg`.
"""
import subprocess
import pytest
pytestmark = pytest.mark.wg
_WEBUI_MARKER = '<!doctype html>'
def _config(admin_client) -> dict:
r = admin_client.get('/api/config')
return r.json() if r.status_code == 200 else {}
def _domain(admin_client) -> str:
return _config(admin_client).get('domain') or 'lan'
def _dns_ip(admin_client) -> str:
cfg = _config(admin_client)
return cfg.get('service_ips', {}).get('dns') or '172.20.0.3'
def _curl_host(ip: str, host: str, path: str = '/', timeout: int = 8) -> tuple[int, str]:
"""
Make an HTTP request to `ip` with the given Host header.
Returns (http_code, body_snippet).
"""
result = subprocess.run(
['curl', '-s', '--connect-timeout', '5',
'-H', f'Host: {host}',
'-w', '\n__HTTP_CODE__:%{http_code}',
f'http://{ip}{path}'],
capture_output=True, text=True, timeout=timeout,
)
output = result.stdout
body = ''
code = 0
if '__HTTP_CODE__:' in output:
parts = output.rsplit('__HTTP_CODE__:', 1)
body = parts[0].lower()
try:
code = int(parts[1].strip())
except ValueError:
pass
return code, body
def _curl_domain(host: str, path: str = '/', dns_ip: str = '', timeout: int = 8) -> tuple[int, str]:
"""Make an HTTP request using curl's --dns-servers to resolve via CoreDNS."""
cmd = ['curl', '-s', '--connect-timeout', '5',
'-w', '\n__HTTP_CODE__:%{http_code}',
f'http://{host}{path}']
if dns_ip:
cmd = ['curl', '-s', '--connect-timeout', '5',
'--dns-servers', dns_ip,
'-w', '\n__HTTP_CODE__:%{http_code}',
f'http://{host}{path}']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
output = result.stdout
body = ''
code = 0
if '__HTTP_CODE__:' in output:
parts = output.rsplit('__HTTP_CODE__:', 1)
body = parts[0].lower()
try:
code = int(parts[1].strip())
except ValueError:
pass
return code, body
# ── Scenario 35: api.<domain> routes to API ───────────────────────────────────
def test_api_domain_returns_json_not_webui(connected_peer, admin_client):
"""api.<domain>/api/status must return JSON, not the React WebUI HTML."""
dom = _domain(admin_client)
dns_ip = _dns_ip(admin_client)
code, body = _curl_domain(f'api.{dom}', '/api/status', dns_ip)
assert code not in (0, 000), f"curl to api.{dom}/api/status failed (code {code})"
assert _WEBUI_MARKER not in body, (
f"api.{dom}/api/status returned WebUI HTML — "
"Caddy is not routing api.<domain> to the API; "
"check that the http://api.<domain> block exists in the Caddyfile "
"and uses the configured domain (not a stale .cell or .dev TLD)"
)
assert '{' in body or '"' in body, (
f"api.{dom}/api/status did not return JSON (body: {body[:100]!r})"
)
def test_api_vip_host_header_routes_to_api(connected_peer, admin_client):
"""Caddy routes api.<domain> by Host header even when accessed via the Caddy VIP."""
dom = _domain(admin_client)
code, body = _curl_host('172.20.0.2', f'api.{dom}', '/api/status')
assert _WEBUI_MARKER not in body, (
f"Host: api.{dom} via 172.20.0.2 returned WebUI HTML — "
"Caddy http://api.<domain> block is missing or uses wrong TLD"
)
# ── Scenario 36: calendar.<domain> routes to Radicale ────────────────────────
def test_calendar_vip_does_not_serve_webui(connected_peer, admin_client):
"""calendar.<domain> (VIP 172.20.0.21) must proxy to Radicale, not the WebUI."""
dom = _domain(admin_client)
dns_ip = _dns_ip(admin_client)
code, body = _curl_domain(f'calendar.{dom}', '/', dns_ip)
assert code not in (0,), f"curl to calendar.{dom} failed completely"
assert _WEBUI_MARKER not in body, (
f"calendar.{dom} returned WebUI HTML — "
"Caddy is not routing calendar.<domain> to Radicale. "
"This happens when Caddy has old (e.g. .cell) domain blocks and all "
"traffic falls through to the catch-all :80 block."
)
def test_calendar_vip_ip_does_not_serve_webui(connected_peer):
"""Direct request to VIP 172.20.0.21 must NOT return the WebUI."""
code, body = _curl_host('172.20.0.21', 'calendar.lan')
assert _WEBUI_MARKER not in body, (
"172.20.0.21 (calendar VIP) returned WebUI HTML — "
"Caddy http://calendar.<domain>, http://172.20.0.21:80 block is missing or stale"
)
# ── Scenario 37: files.<domain> routes to Filegator ──────────────────────────
def test_files_vip_does_not_serve_webui(connected_peer, admin_client):
"""files.<domain> (VIP 172.20.0.22) must proxy to Filegator, not the WebUI."""
dom = _domain(admin_client)
dns_ip = _dns_ip(admin_client)
code, body = _curl_domain(f'files.{dom}', '/', dns_ip)
assert code not in (0,), f"curl to files.{dom} failed completely"
assert _WEBUI_MARKER not in body, (
f"files.{dom} returned WebUI HTML — "
"Caddy is not routing files.<domain> to Filegator. "
"Check the http://files.<domain>, http://172.20.0.22:80 Caddyfile block."
)
def test_files_vip_ip_does_not_serve_webui(connected_peer):
"""Direct request to VIP 172.20.0.22 must NOT return the WebUI."""
code, body = _curl_host('172.20.0.22', 'files.lan')
assert _WEBUI_MARKER not in body, (
"172.20.0.22 (files VIP) returned WebUI HTML — "
"Caddy http://files.<domain>, http://172.20.0.22:80 block is missing or stale"
)
# ── Scenario 38: mail.<domain> routes to Rainloop ────────────────────────────
def test_mail_vip_does_not_serve_webui(connected_peer, admin_client):
"""mail.<domain> (VIP 172.20.0.23) must proxy to Rainloop, not the WebUI."""
dom = _domain(admin_client)
dns_ip = _dns_ip(admin_client)
code, body = _curl_domain(f'mail.{dom}', '/', dns_ip)
assert code not in (0,), f"curl to mail.{dom} failed completely"
assert _WEBUI_MARKER not in body, (
f"mail.{dom} returned WebUI HTML — "
"Caddy is not routing mail.<domain> to Rainloop."
)
def test_webmail_vip_does_not_serve_webui(connected_peer, admin_client):
"""webmail.<domain> (alias, same VIP 172.20.0.23) must NOT return the WebUI."""
dom = _domain(admin_client)
dns_ip = _dns_ip(admin_client)
code, body = _curl_domain(f'webmail.{dom}', '/', dns_ip)
assert _WEBUI_MARKER not in body, (
f"webmail.{dom} returned WebUI HTML — "
"Caddy http://webmail.<domain> block is missing or stale"
)
def test_mail_vip_ip_does_not_serve_webui(connected_peer):
"""Direct request to VIP 172.20.0.23 must NOT return the WebUI."""
code, body = _curl_host('172.20.0.23', 'mail.lan')
assert _WEBUI_MARKER not in body, (
"172.20.0.23 (mail VIP) returned WebUI HTML — "
"Caddy http://mail.<domain>, http://172.20.0.23:80 block is missing or stale"
)
# ── Scenario 39: webdav.<domain> routes to WebDAV ────────────────────────────
def test_webdav_vip_does_not_serve_webui(connected_peer, admin_client):
"""webdav.<domain> (VIP 172.20.0.24) must proxy to the WebDAV service."""
dom = _domain(admin_client)
dns_ip = _dns_ip(admin_client)
code, body = _curl_domain(f'webdav.{dom}', '/', dns_ip)
assert code not in (0,), f"curl to webdav.{dom} failed completely"
assert _WEBUI_MARKER not in body, (
f"webdav.{dom} returned WebUI HTML — "
"Caddy is not routing webdav.<domain> to the WebDAV service."
)
def test_webdav_vip_ip_does_not_serve_webui(connected_peer):
"""Direct request to VIP 172.20.0.24 must NOT return the WebUI."""
code, body = _curl_host('172.20.0.24', 'webdav.lan')
assert _WEBUI_MARKER not in body, (
"172.20.0.24 (webdav VIP) returned WebUI HTML — "
"Caddy http://webdav.<domain>, http://172.20.0.24:80 block is missing or stale"
)
# ── Scenario 40: VIP IPs without Host header ─────────────────────────────────
@pytest.mark.parametrize('vip,expected_not', [
('172.20.0.21', _WEBUI_MARKER),
('172.20.0.22', _WEBUI_MARKER),
('172.20.0.23', _WEBUI_MARKER),
('172.20.0.24', _WEBUI_MARKER),
])
def test_vip_direct_access_not_webui(connected_peer, vip, expected_not):
"""Each service VIP accessed directly (no special Host) must not return WebUI."""
code, body = _curl_host(vip, vip)
assert expected_not not in body, (
f"VIP {vip} returned WebUI HTML — "
"Caddy catch-all :80 is taking over; the per-VIP blocks must listen on port 80"
)
# ── Scenario 41: Catch-all :80 routes API path correctly ─────────────────────
def test_catchall_api_path_returns_json(connected_peer):
"""The catch-all :80 block must route /api/* to the API (not WebUI)."""
code, body = _curl_host('172.20.0.2', 'localhost', '/api/status')
assert _WEBUI_MARKER not in body, (
"Catch-all :80 returned WebUI HTML for /api/status — "
"the `handle /api/*` directive in the :80 block is missing or wrong"
)
assert '{' in body or '"' in body, (
f"/api/status via catch-all did not return JSON (body: {body[:100]!r})"
)
def test_catchall_root_serves_webui(connected_peer):
"""The catch-all :80 block serves the WebUI for the root path."""
code, body = _curl_host('172.20.0.2', 'localhost', '/')
assert _WEBUI_MARKER in body, (
"Catch-all :80 / did not return WebUI HTML — "
"something is broken with the catch-all :80 block"
)
# ── Scenario extra: stale TLD detection ──────────────────────────────────────
def test_caddy_does_not_route_cell_tld(connected_peer):
"""Caddy must NOT have active routing for .cell domains — they are from old config."""
code, body = _curl_host('172.20.0.2', 'calendar.cell', '/')
assert _WEBUI_MARKER in body or code in (0, 404, 502, 503), (
"Caddy is still routing calendar.cell — stale .cell blocks remain in config. "
"Check that write_caddyfile() is writing to the correct path that Caddy reads."
)
+20 -20
View File
@@ -366,8 +366,8 @@ class TestAPIEndpoints(unittest.TestCase):
def test_email_endpoints(self, mock_email):
# Ensure all relevant mock methods return JSON-serializable values
mock_email.get_users.return_value = [{'username': 'user1', 'domain': 'cell', 'email': 'user1@cell'}]
mock_email.create_user.return_value = True
mock_email.delete_user.return_value = True
mock_email.create_email_user.return_value = True
mock_email.delete_email_user.return_value = True
mock_email.get_status.return_value = {'postfix_running': True, 'dovecot_running': True, 'total_users': 1, 'total_size_bytes': 0, 'total_size_mb': 0.0, 'users': [{'username': 'user1', 'domain': 'cell', 'email': 'user1@cell'}]}
mock_email.test_connectivity.return_value = {'smtp': {'success': True, 'message': 'SMTP server responding'}}
mock_email.send_email.return_value = True
@@ -383,17 +383,17 @@ class TestAPIEndpoints(unittest.TestCase):
# /api/email/users (POST)
response = self.client.post('/api/email/users', data=json.dumps({'username': 'user1', 'domain': 'cell', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_email.create_user.side_effect = Exception('fail')
mock_email.create_email_user.side_effect = Exception('fail')
response = self.client.post('/api/email/users', data=json.dumps({'username': 'user1', 'domain': 'cell', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_email.create_user.side_effect = None
mock_email.create_email_user.side_effect = None
# /api/email/users/<username> (DELETE)
response = self.client.delete('/api/email/users/user1')
self.assertEqual(response.status_code, 200)
mock_email.delete_user.side_effect = Exception('fail')
mock_email.delete_email_user.side_effect = Exception('fail')
response = self.client.delete('/api/email/users/user1')
self.assertEqual(response.status_code, 500)
mock_email.delete_user.side_effect = None
mock_email.delete_email_user.side_effect = None
# /api/email/status (GET)
response = self.client.get('/api/email/status')
self.assertEqual(response.status_code, 200)
@@ -427,8 +427,8 @@ class TestAPIEndpoints(unittest.TestCase):
def test_calendar_endpoints(self, mock_calendar):
# Mock return values for all relevant calendar_manager methods
mock_calendar.get_users.return_value = [{'username': 'user1', 'collections': {'calendars': ['cal1'], 'contacts': ['c1']}}]
mock_calendar.create_user.return_value = True
mock_calendar.delete_user.return_value = True
mock_calendar.create_calendar_user.return_value = True
mock_calendar.delete_calendar_user.return_value = True
mock_calendar.create_calendar.return_value = {'calendar': 'cal1'}
mock_calendar.add_event.return_value = {'event': 'event1'}
mock_calendar.get_events.return_value = [{'event': 'event1'}]
@@ -445,17 +445,17 @@ class TestAPIEndpoints(unittest.TestCase):
# /api/calendar/users (POST)
response = self.client.post('/api/calendar/users', data=json.dumps({'username': 'user1', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_calendar.create_user.side_effect = Exception('fail')
mock_calendar.create_calendar_user.side_effect = Exception('fail')
response = self.client.post('/api/calendar/users', data=json.dumps({'username': 'user1', 'password': 'pw'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_calendar.create_user.side_effect = None
mock_calendar.create_calendar_user.side_effect = None
# /api/calendar/users/<username> (DELETE)
response = self.client.delete('/api/calendar/users/user1')
self.assertEqual(response.status_code, 200)
mock_calendar.delete_user.side_effect = Exception('fail')
mock_calendar.delete_calendar_user.side_effect = Exception('fail')
response = self.client.delete('/api/calendar/users/user1')
self.assertEqual(response.status_code, 500)
mock_calendar.delete_user.side_effect = None
mock_calendar.delete_calendar_user.side_effect = None
# /api/calendar/calendars (POST)
response = self.client.post('/api/calendar/calendars', data=json.dumps({'username': 'user1', 'calendar_name': 'cal1'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
@@ -599,10 +599,10 @@ class TestAPIEndpoints(unittest.TestCase):
self.assertEqual(response.status_code, 500)
mock_routing.get_firewall_rules.side_effect = None
# /api/routing/peers (POST)
response = self.client.post('/api/routing/peers', data=json.dumps({'peer': 'peer1', 'route': '10.0.0.2'}), content_type='application/json')
response = self.client.post('/api/routing/peers', data=json.dumps({'peer_name': 'peer1', 'peer_ip': '10.0.0.2'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_peer_route.side_effect = Exception('fail')
response = self.client.post('/api/routing/peers', data=json.dumps({'peer': 'peer1', 'route': '10.0.0.2'}), content_type='application/json')
response = self.client.post('/api/routing/peers', data=json.dumps({'peer_name': 'peer1', 'peer_ip': '10.0.0.2'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_peer_route.side_effect = None
# /api/routing/peers (GET)
@@ -620,24 +620,24 @@ class TestAPIEndpoints(unittest.TestCase):
self.assertEqual(response.status_code, 500)
mock_routing.remove_peer_route.side_effect = None
# /api/routing/exit-nodes (POST)
response = self.client.post('/api/routing/exit-nodes', data=json.dumps({'node': 'exit1'}), content_type='application/json')
response = self.client.post('/api/routing/exit-nodes', data=json.dumps({'peer_name': 'exit1', 'peer_ip': '10.0.0.5'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_exit_node.side_effect = Exception('fail')
response = self.client.post('/api/routing/exit-nodes', data=json.dumps({'node': 'exit1'}), content_type='application/json')
response = self.client.post('/api/routing/exit-nodes', data=json.dumps({'peer_name': 'exit1', 'peer_ip': '10.0.0.5'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_exit_node.side_effect = None
# /api/routing/bridge (POST)
response = self.client.post('/api/routing/bridge', data=json.dumps({'bridge': 'br1'}), content_type='application/json')
response = self.client.post('/api/routing/bridge', data=json.dumps({'source_peer': 'peer1', 'target_peer': 'peer2'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_bridge_route.side_effect = Exception('fail')
response = self.client.post('/api/routing/bridge', data=json.dumps({'bridge': 'br1'}), content_type='application/json')
response = self.client.post('/api/routing/bridge', data=json.dumps({'source_peer': 'peer1', 'target_peer': 'peer2'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_bridge_route.side_effect = None
# /api/routing/split (POST)
response = self.client.post('/api/routing/split', data=json.dumps({'split': 'sp1'}), content_type='application/json')
response = self.client.post('/api/routing/split', data=json.dumps({'network': '10.0.0.0/24', 'exit_peer': '10.0.0.5'}), content_type='application/json')
self.assertEqual(response.status_code, 200)
mock_routing.add_split_route.side_effect = Exception('fail')
response = self.client.post('/api/routing/split', data=json.dumps({'split': 'sp1'}), content_type='application/json')
response = self.client.post('/api/routing/split', data=json.dumps({'network': '10.0.0.0/24', 'exit_peer': '10.0.0.5'}), content_type='application/json')
self.assertEqual(response.status_code, 500)
mock_routing.add_split_route.side_effect = None
# /api/routing/connectivity (POST)
+11 -2
View File
@@ -113,8 +113,11 @@ class TestAppMisc(unittest.TestCase):
self.assertFalse(app_module.is_local_request())
def test_is_local_request_private_ip(self):
# 192.168.x.x (LAN) is no longer trusted — only Docker bridge (172.16.0.0/12)
# and loopback are trusted. The API is bound to 127.0.0.1:3000 and only
# reachable via Caddy (172.20.x.x), so LAN IPs never reach it directly.
with patch('app.request', new=self._req('192.168.1.5')):
self.assertTrue(app_module.is_local_request())
self.assertFalse(app_module.is_local_request())
def test_is_local_request_xff_spoof_rejected(self):
# Client sends X-Forwarded-For: 127.0.0.1 but actual IP is public
@@ -123,8 +126,14 @@ class TestAppMisc(unittest.TestCase):
self.assertFalse(app_module.is_local_request())
def test_is_local_request_xff_last_entry_local(self):
# Caddy appends the real client IP; last entry is local → allow
# 192.168.x.x is no longer in the trusted range — only Docker bridge
# (172.16.0.0/12) and loopback are trusted now.
with patch('app.request', new=self._req('8.8.8.8', xff='8.8.8.8, 192.168.1.10')):
self.assertFalse(app_module.is_local_request())
def test_is_local_request_xff_docker_bridge(self):
# Docker bridge IPs (172.16.0.0/12) ARE trusted — Caddy uses this range
with patch('app.request', new=self._req('8.8.8.8', xff='8.8.8.8, 172.20.0.2')):
self.assertTrue(app_module.is_local_request())
def test_is_local_request_xff_single_public_rejected(self):
+379 -1
View File
@@ -1 +1,379 @@
# ... moved and adapted code from test_phase3_endpoints.py (calendar section) ...
#!/usr/bin/env python3
"""
Unit tests for calendar Flask endpoints in api/app.py.
Covers:
GET /api/calendar/users
POST /api/calendar/users
DELETE /api/calendar/users/<username>
POST /api/calendar/calendars
POST /api/calendar/events
GET /api/calendar/events/<username>/<calendar_name>
GET /api/calendar/status
GET /api/calendar/connectivity
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
class TestGetCalendarUsers(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.calendar_manager')
def test_get_users_returns_200_with_list(self, mock_cm):
mock_cm.get_users.return_value = [
{'username': 'alice', 'email': 'alice@cell'},
{'username': 'bob', 'email': 'bob@cell'},
]
r = self.client.get('/api/calendar/users')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 2)
@patch('app.calendar_manager')
def test_get_users_returns_200_with_empty_list(self, mock_cm):
mock_cm.get_users.return_value = []
r = self.client.get('/api/calendar/users')
self.assertEqual(r.status_code, 200)
self.assertEqual(json.loads(r.data), [])
@patch('app.calendar_manager')
def test_get_users_returns_500_on_exception(self, mock_cm):
mock_cm.get_users.side_effect = Exception('radicale unreachable')
r = self.client.get('/api/calendar/users')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestCreateCalendarUser(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.calendar_manager')
def test_create_user_returns_200_on_valid_body(self, mock_cm):
mock_cm.create_calendar_user.return_value = True
r = self.client.post(
'/api/calendar/users',
data=json.dumps({'username': 'alice', 'password': 'secret123'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('created', data)
@patch('app.calendar_manager')
def test_create_user_passes_credentials_to_manager(self, mock_cm):
mock_cm.create_calendar_user.return_value = True
self.client.post(
'/api/calendar/users',
data=json.dumps({'username': 'alice', 'password': 'secret123'}),
content_type='application/json',
)
mock_cm.create_calendar_user.assert_called_once_with('alice', 'secret123')
@patch('app.calendar_manager')
def test_create_user_returns_400_when_no_body(self, mock_cm):
r = self.client.post('/api/calendar/users')
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
mock_cm.create_calendar_user.assert_not_called()
@patch('app.calendar_manager')
def test_create_user_returns_400_when_username_missing(self, mock_cm):
r = self.client.post(
'/api/calendar/users',
data=json.dumps({'password': 'secret123'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_cm.create_calendar_user.assert_not_called()
@patch('app.calendar_manager')
def test_create_user_returns_400_when_password_missing(self, mock_cm):
r = self.client.post(
'/api/calendar/users',
data=json.dumps({'username': 'alice'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_cm.create_calendar_user.assert_not_called()
@patch('app.calendar_manager')
def test_create_user_returns_500_on_exception(self, mock_cm):
mock_cm.create_calendar_user.side_effect = Exception('htpasswd write failure')
r = self.client.post(
'/api/calendar/users',
data=json.dumps({'username': 'alice', 'password': 'secret123'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestDeleteCalendarUser(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.calendar_manager')
def test_delete_user_returns_200_on_success(self, mock_cm):
mock_cm.delete_calendar_user.return_value = True
r = self.client.delete('/api/calendar/users/alice')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('deleted', data)
@patch('app.calendar_manager')
def test_delete_user_passes_username_to_manager(self, mock_cm):
mock_cm.delete_calendar_user.return_value = True
self.client.delete('/api/calendar/users/bob')
mock_cm.delete_calendar_user.assert_called_once_with('bob')
@patch('app.calendar_manager')
def test_delete_user_returns_500_on_exception(self, mock_cm):
mock_cm.delete_calendar_user.side_effect = Exception('user not found')
r = self.client.delete('/api/calendar/users/alice')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestCreateCalendar(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.calendar_manager')
def test_create_calendar_returns_200_on_valid_body(self, mock_cm):
mock_cm.create_calendar.return_value = True
r = self.client.post(
'/api/calendar/calendars',
data=json.dumps({'username': 'alice', 'name': 'Work'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('created', data)
@patch('app.calendar_manager')
def test_create_calendar_accepts_calendar_name_alias(self, mock_cm):
mock_cm.create_calendar.return_value = True
r = self.client.post(
'/api/calendar/calendars',
data=json.dumps({'username': 'alice', 'calendar_name': 'Personal'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
@patch('app.calendar_manager')
def test_create_calendar_returns_400_when_no_body(self, mock_cm):
r = self.client.post('/api/calendar/calendars')
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_cm.create_calendar.assert_not_called()
@patch('app.calendar_manager')
def test_create_calendar_returns_400_when_username_missing(self, mock_cm):
r = self.client.post(
'/api/calendar/calendars',
data=json.dumps({'name': 'Work'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.calendar_manager')
def test_create_calendar_returns_400_when_name_missing(self, mock_cm):
r = self.client.post(
'/api/calendar/calendars',
data=json.dumps({'username': 'alice'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.calendar_manager')
def test_create_calendar_returns_500_on_exception(self, mock_cm):
mock_cm.create_calendar.side_effect = Exception('CalDAV error')
r = self.client.post(
'/api/calendar/calendars',
data=json.dumps({'username': 'alice', 'name': 'Work'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestAddCalendarEvent(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.calendar_manager')
def test_add_event_returns_200_on_valid_body(self, mock_cm):
mock_cm.add_event.return_value = 'event-uid-123'
r = self.client.post(
'/api/calendar/events',
data=json.dumps({
'username': 'alice',
'calendar_name': 'Work',
'summary': 'Team Meeting',
'dtstart': '20260427T100000Z',
}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('created', data)
@patch('app.calendar_manager')
def test_add_event_returns_400_when_no_body(self, mock_cm):
r = self.client.post('/api/calendar/events')
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_cm.add_event.assert_not_called()
@patch('app.calendar_manager')
def test_add_event_returns_400_when_username_missing(self, mock_cm):
r = self.client.post(
'/api/calendar/events',
data=json.dumps({'calendar_name': 'Work', 'summary': 'Meeting'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.calendar_manager')
def test_add_event_returns_400_when_calendar_missing(self, mock_cm):
r = self.client.post(
'/api/calendar/events',
data=json.dumps({'username': 'alice', 'summary': 'Meeting'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.calendar_manager')
def test_add_event_returns_500_on_exception(self, mock_cm):
mock_cm.add_event.side_effect = Exception('iCalendar parse error')
r = self.client.post(
'/api/calendar/events',
data=json.dumps({
'username': 'alice',
'calendar_name': 'Work',
'summary': 'Meeting',
}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetCalendarEvents(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.calendar_manager')
def test_get_events_returns_200_with_events(self, mock_cm):
mock_cm.get_events.return_value = [
{'uid': 'abc', 'summary': 'Standup', 'dtstart': '20260427T090000Z'},
]
r = self.client.get('/api/calendar/events/alice/Work')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 1)
@patch('app.calendar_manager')
def test_get_events_passes_username_and_calendar_to_manager(self, mock_cm):
mock_cm.get_events.return_value = []
self.client.get('/api/calendar/events/bob/Personal')
mock_cm.get_events.assert_called_once()
args = mock_cm.get_events.call_args[0]
self.assertEqual(args[0], 'bob')
self.assertEqual(args[1], 'Personal')
@patch('app.calendar_manager')
def test_get_events_returns_500_on_exception(self, mock_cm):
mock_cm.get_events.side_effect = Exception('calendar not found')
r = self.client.get('/api/calendar/events/alice/Work')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetCalendarStatus(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.calendar_manager')
def test_get_status_returns_200_with_status_dict(self, mock_cm):
mock_cm.get_status.return_value = {
'running': True,
'port': 5232,
'users_count': 3,
}
r = self.client.get('/api/calendar/status')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('running', data)
@patch('app.calendar_manager')
def test_get_status_returns_500_on_exception(self, mock_cm):
mock_cm.get_status.side_effect = Exception('container not found')
r = self.client.get('/api/calendar/status')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestCalendarConnectivity(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.calendar_manager')
def test_connectivity_returns_200_with_result(self, mock_cm):
mock_cm.test_connectivity.return_value = {
'caldav': True,
'carddav': True,
'latency_ms': 8,
}
r = self.client.get('/api/calendar/connectivity')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('caldav', data)
@patch('app.calendar_manager')
def test_connectivity_returns_500_on_exception(self, mock_cm):
mock_cm.test_connectivity.side_effect = Exception('connection refused')
r = self.client.get('/api/calendar/connectivity')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
if __name__ == '__main__':
unittest.main()
+240
View File
@@ -0,0 +1,240 @@
#!/usr/bin/env python3
"""
Tests for cell-to-cell DNS forwarding integration.
Covers:
- generate_corefile() with cell_links entries
- apply_all_dns_rules() passing cell_links through to generate_corefile()
- Correct domain/dns_ip values in the emitted forwarding stanza
- Validation: invalid characters in domain are rejected by add_cell_dns_forward()
"""
import sys
import os
import tempfile
import shutil
import unittest
from unittest.mock import patch, MagicMock, call
from pathlib import Path
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
import firewall_manager
# ---------------------------------------------------------------------------
# generate_corefile() with cell_links
# ---------------------------------------------------------------------------
class TestGenerateCorefileOneLink(unittest.TestCase):
"""generate_corefile() with a single cell link produces the right stanza."""
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.path = os.path.join(self.tmp, 'Corefile')
def tearDown(self):
shutil.rmtree(self.tmp)
def _read(self):
return open(self.path).read()
def test_forwarding_block_present(self):
cell_links = [{'domain': 'remote.cell', 'dns_ip': '10.5.0.1'}]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._read()
self.assertIn('remote.cell {', content)
def test_correct_dns_ip_in_forward_directive(self):
cell_links = [{'domain': 'remote.cell', 'dns_ip': '10.5.0.1'}]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._read()
self.assertIn('forward . 10.5.0.1', content)
def test_cache_directive_present_in_forwarding_block(self):
cell_links = [{'domain': 'remote.cell', 'dns_ip': '10.5.0.1'}]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._read()
# 'cache' must appear in the forwarding block (after the primary zone block)
idx_primary = content.index('remote.cell {')
self.assertIn('cache', content[idx_primary:])
def test_log_directive_present_in_forwarding_block(self):
cell_links = [{'domain': 'remote.cell', 'dns_ip': '10.5.0.1'}]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._read()
idx_primary = content.index('remote.cell {')
self.assertIn('log', content[idx_primary:])
def test_forwarding_block_appears_after_primary_zone(self):
"""The cell link stanza must appear after the primary zone block, not inside it."""
cell_links = [{'domain': 'remote.cell', 'dns_ip': '10.5.0.1'}]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._read()
# Primary zone ends with its closing brace; remote.cell block follows
idx_primary_zone = content.index('cell {')
idx_forward_block = content.index('remote.cell {')
self.assertGreater(idx_forward_block, idx_primary_zone)
class TestGenerateCorefileMultipleLinks(unittest.TestCase):
"""generate_corefile() with multiple cell links produces one stanza each."""
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.path = os.path.join(self.tmp, 'Corefile')
def tearDown(self):
shutil.rmtree(self.tmp)
def _read(self):
return open(self.path).read()
def test_all_domains_present(self):
cell_links = [
{'domain': 'alpha.cell', 'dns_ip': '10.1.0.1'},
{'domain': 'beta.cell', 'dns_ip': '10.2.0.1'},
{'domain': 'gamma.cell', 'dns_ip': '10.3.0.1'},
]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._read()
self.assertIn('alpha.cell {', content)
self.assertIn('beta.cell {', content)
self.assertIn('gamma.cell {', content)
def test_all_dns_ips_present(self):
cell_links = [
{'domain': 'alpha.cell', 'dns_ip': '10.1.0.1'},
{'domain': 'beta.cell', 'dns_ip': '10.2.0.1'},
]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._read()
self.assertIn('forward . 10.1.0.1', content)
self.assertIn('forward . 10.2.0.1', content)
def test_stanza_count_matches_link_count(self):
"""Each valid link contributes exactly one forwarding stanza."""
cell_links = [
{'domain': 'a.cell', 'dns_ip': '10.1.0.1'},
{'domain': 'b.cell', 'dns_ip': '10.2.0.1'},
]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._read()
# Count occurrences of 'forward .' — one for default, one per cell link
count = content.count('forward .')
self.assertEqual(count, 3) # 1 default + 2 cell links
# ---------------------------------------------------------------------------
# apply_all_dns_rules() passes cell_links through to generate_corefile()
# ---------------------------------------------------------------------------
class TestApplyAllDnsRulesPassesCellLinks(unittest.TestCase):
"""apply_all_dns_rules() must forward the cell_links argument to generate_corefile()."""
def test_cell_links_forwarded(self):
cell_links = [{'domain': 'x.cell', 'dns_ip': '10.9.0.1'}]
with patch.object(firewall_manager, 'generate_corefile', return_value=True) as mock_gen, \
patch.object(firewall_manager, 'reload_coredns', return_value=True):
firewall_manager.apply_all_dns_rules(
peers=[],
corefile_path='/tmp/fake_Corefile',
domain='cell',
cell_links=cell_links,
)
mock_gen.assert_called_once_with(
[], '/tmp/fake_Corefile', 'cell', cell_links
)
def test_cell_links_none_forwarded_as_none(self):
with patch.object(firewall_manager, 'generate_corefile', return_value=True) as mock_gen, \
patch.object(firewall_manager, 'reload_coredns', return_value=True):
firewall_manager.apply_all_dns_rules(
peers=[],
corefile_path='/tmp/fake_Corefile',
domain='cell',
cell_links=None,
)
mock_gen.assert_called_once_with([], '/tmp/fake_Corefile', 'cell', None)
def test_reload_called_on_success(self):
with patch.object(firewall_manager, 'generate_corefile', return_value=True), \
patch.object(firewall_manager, 'reload_coredns', return_value=True) as mock_reload:
firewall_manager.apply_all_dns_rules([], '/tmp/f', cell_links=None)
mock_reload.assert_called_once()
def test_reload_not_called_on_failure(self):
with patch.object(firewall_manager, 'generate_corefile', return_value=False), \
patch.object(firewall_manager, 'reload_coredns') as mock_reload:
firewall_manager.apply_all_dns_rules([], '/tmp/f', cell_links=None)
mock_reload.assert_not_called()
# ---------------------------------------------------------------------------
# Domain validation in add_cell_dns_forward() (via network_manager)
# ---------------------------------------------------------------------------
class TestAddCellDnsForwardValidation(unittest.TestCase):
"""
add_cell_dns_forward() must reject malformed domains/IPs without writing
the Corefile or calling apply_all_dns_rules().
"""
def _get_network_manager(self, tmp_dir):
"""Construct a minimal NetworkManager with test directories."""
# We import here so the test file doesn't hard-fail if network_manager
# has an import-time dependency that's unavailable in CI.
try:
from network_manager import NetworkManager
except ImportError as e:
self.skipTest(f'NetworkManager import failed: {e}')
os.makedirs(os.path.join(tmp_dir, 'dns'), exist_ok=True)
return NetworkManager(data_dir=tmp_dir, config_dir=tmp_dir)
def setUp(self):
self.tmp = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp)
def test_invalid_dns_ip_returns_warning(self):
nm = self._get_network_manager(self.tmp)
result = nm.add_cell_dns_forward('valid.cell', 'not-an-ip')
self.assertTrue(result['warnings'])
self.assertFalse(result['restarted'])
def test_domain_with_newline_returns_warning(self):
nm = self._get_network_manager(self.tmp)
result = nm.add_cell_dns_forward('evil\ndomain', '10.1.0.1')
self.assertTrue(result['warnings'])
self.assertFalse(result['restarted'])
def test_domain_with_braces_returns_warning(self):
nm = self._get_network_manager(self.tmp)
result = nm.add_cell_dns_forward('evil{domain}', '10.1.0.1')
self.assertTrue(result['warnings'])
self.assertFalse(result['restarted'])
def test_domain_with_space_returns_warning(self):
nm = self._get_network_manager(self.tmp)
result = nm.add_cell_dns_forward('evil domain', '10.1.0.1')
self.assertTrue(result['warnings'])
self.assertFalse(result['restarted'])
def test_valid_domain_and_ip_calls_apply_all_dns_rules(self):
"""Valid inputs must call firewall_manager.apply_all_dns_rules()."""
nm = self._get_network_manager(self.tmp)
with patch.object(firewall_manager, 'apply_all_dns_rules', return_value=True) as mock_apply, \
patch.object(firewall_manager, 'reload_coredns', return_value=True):
result = nm.add_cell_dns_forward('valid.cell', '10.1.0.1')
mock_apply.assert_called_once()
call_kwargs = mock_apply.call_args
# cell_links kwarg must include the new entry
cell_links_arg = call_kwargs[1].get('cell_links') or call_kwargs[0][3]
domains = [l['domain'] for l in cell_links_arg]
self.assertIn('valid.cell', domains)
if __name__ == '__main__':
unittest.main()
+295
View File
@@ -0,0 +1,295 @@
#!/usr/bin/env python3
"""
Unit tests for cell management Flask endpoints in api/app.py.
Covers:
GET /api/cells/invite — generate invite package
GET /api/cells — list connected cells
POST /api/cells — connect to a remote cell
DELETE /api/cells/<cell_name> — disconnect from a cell
GET /api/cells/<cell_name>/status — live status for a connected cell
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
# Minimal set of required fields for POST /api/cells
_VALID_CELL_BODY = {
'cell_name': 'remotecell',
'public_key': 'abc123publickey==',
'vpn_subnet': '10.1.0.0/24',
'dns_ip': '10.1.0.1',
'domain': 'remotecell.cell',
}
class TestGetCellInvite(unittest.TestCase):
"""GET /api/cells/invite"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.cell_link_manager')
@patch('app.config_manager')
def test_get_invite_returns_200_with_invite_dict(self, mock_cfg, mock_clm):
mock_cfg.configs = {'_identity': {'cell_name': 'mycell', 'domain': 'cell'}}
mock_clm.generate_invite.return_value = {
'cell_name': 'mycell',
'public_key': 'server_pub_key==',
'vpn_subnet': '10.0.0.0/24',
'dns_ip': '10.0.0.1',
'domain': 'cell',
}
r = self.client.get('/api/cells/invite')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('cell_name', data)
self.assertIn('public_key', data)
@patch('app.cell_link_manager')
@patch('app.config_manager')
def test_get_invite_passes_cell_name_and_domain(self, mock_cfg, mock_clm):
mock_cfg.configs = {'_identity': {'cell_name': 'myhome', 'domain': 'home'}}
mock_clm.generate_invite.return_value = {}
self.client.get('/api/cells/invite')
mock_clm.generate_invite.assert_called_once_with('myhome', 'home')
@patch('app.cell_link_manager')
@patch('app.config_manager')
def test_get_invite_returns_500_on_exception(self, mock_cfg, mock_clm):
mock_cfg.configs = {'_identity': {}}
mock_clm.generate_invite.side_effect = Exception('WireGuard key unavailable')
r = self.client.get('/api/cells/invite')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestListCellConnections(unittest.TestCase):
"""GET /api/cells"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.cell_link_manager')
def test_list_cells_returns_200_with_list(self, mock_clm):
mock_clm.list_connections.return_value = [
{'cell_name': 'remotecell', 'domain': 'remotecell.cell', 'status': 'connected'},
]
r = self.client.get('/api/cells')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 1)
self.assertEqual(data[0]['cell_name'], 'remotecell')
@patch('app.cell_link_manager')
def test_list_cells_returns_empty_list_when_none_connected(self, mock_clm):
mock_clm.list_connections.return_value = []
r = self.client.get('/api/cells')
self.assertEqual(r.status_code, 200)
self.assertEqual(json.loads(r.data), [])
@patch('app.cell_link_manager')
def test_list_cells_returns_500_on_exception(self, mock_clm):
mock_clm.list_connections.side_effect = Exception('storage error')
r = self.client.get('/api/cells')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestAddCellConnection(unittest.TestCase):
"""POST /api/cells"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.cell_link_manager')
def test_add_cell_returns_201_on_success(self, mock_clm):
mock_clm.add_connection.return_value = {'cell_name': 'remotecell'}
r = self.client.post(
'/api/cells',
data=json.dumps(_VALID_CELL_BODY),
content_type='application/json',
)
self.assertEqual(r.status_code, 201)
data = json.loads(r.data)
self.assertIn('message', data)
self.assertIn('link', data)
@patch('app.cell_link_manager')
def test_add_cell_returns_400_when_no_body(self, mock_clm):
r = self.client.post('/api/cells')
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_clm.add_connection.assert_not_called()
@patch('app.cell_link_manager')
def test_add_cell_returns_400_when_cell_name_missing(self, mock_clm):
body = {k: v for k, v in _VALID_CELL_BODY.items() if k != 'cell_name'}
r = self.client.post(
'/api/cells',
data=json.dumps(body),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_add_cell_returns_400_when_public_key_missing(self, mock_clm):
body = {k: v for k, v in _VALID_CELL_BODY.items() if k != 'public_key'}
r = self.client.post(
'/api/cells',
data=json.dumps(body),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_add_cell_returns_400_when_vpn_subnet_missing(self, mock_clm):
body = {k: v for k, v in _VALID_CELL_BODY.items() if k != 'vpn_subnet'}
r = self.client.post(
'/api/cells',
data=json.dumps(body),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_add_cell_returns_400_when_dns_ip_missing(self, mock_clm):
body = {k: v for k, v in _VALID_CELL_BODY.items() if k != 'dns_ip'}
r = self.client.post(
'/api/cells',
data=json.dumps(body),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_add_cell_returns_400_when_domain_missing(self, mock_clm):
body = {k: v for k, v in _VALID_CELL_BODY.items() if k != 'domain'}
r = self.client.post(
'/api/cells',
data=json.dumps(body),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_add_cell_returns_400_on_value_error_from_manager(self, mock_clm):
mock_clm.add_connection.side_effect = ValueError('cell already connected')
r = self.client.post(
'/api/cells',
data=json.dumps(_VALID_CELL_BODY),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_add_cell_returns_500_on_unexpected_exception(self, mock_clm):
mock_clm.add_connection.side_effect = Exception('WireGuard peer add failed')
r = self.client.post(
'/api/cells',
data=json.dumps(_VALID_CELL_BODY),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestRemoveCellConnection(unittest.TestCase):
"""DELETE /api/cells/<cell_name>"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.cell_link_manager')
def test_remove_cell_returns_200_on_success(self, mock_clm):
mock_clm.remove_connection.return_value = None
r = self.client.delete('/api/cells/remotecell')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('message', data)
@patch('app.cell_link_manager')
def test_remove_cell_passes_cell_name_to_manager(self, mock_clm):
mock_clm.remove_connection.return_value = None
self.client.delete('/api/cells/faraway')
mock_clm.remove_connection.assert_called_once_with('faraway')
@patch('app.cell_link_manager')
def test_remove_cell_returns_404_on_value_error(self, mock_clm):
mock_clm.remove_connection.side_effect = ValueError('cell not found')
r = self.client.delete('/api/cells/nonexistent')
self.assertEqual(r.status_code, 404)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_remove_cell_returns_500_on_unexpected_exception(self, mock_clm):
mock_clm.remove_connection.side_effect = Exception('storage corruption')
r = self.client.delete('/api/cells/remotecell')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetCellConnectionStatus(unittest.TestCase):
"""GET /api/cells/<cell_name>/status"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.cell_link_manager')
def test_get_cell_status_returns_200_with_status_dict(self, mock_clm):
mock_clm.get_connection_status.return_value = {
'cell_name': 'remotecell',
'online': True,
'last_handshake': '2026-04-27T09:00:00Z',
'transfer_rx': 1024,
'transfer_tx': 2048,
}
r = self.client.get('/api/cells/remotecell/status')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('online', data)
self.assertTrue(data['online'])
@patch('app.cell_link_manager')
def test_get_cell_status_passes_cell_name(self, mock_clm):
mock_clm.get_connection_status.return_value = {}
self.client.get('/api/cells/faraway/status')
mock_clm.get_connection_status.assert_called_once_with('faraway')
@patch('app.cell_link_manager')
def test_get_cell_status_returns_404_on_value_error(self, mock_clm):
mock_clm.get_connection_status.side_effect = ValueError('cell not found')
r = self.client.get('/api/cells/missing/status')
self.assertEqual(r.status_code, 404)
self.assertIn('error', json.loads(r.data))
@patch('app.cell_link_manager')
def test_get_cell_status_returns_500_on_unexpected_exception(self, mock_clm):
mock_clm.get_connection_status.side_effect = Exception('WireGuard query failed')
r = self.client.get('/api/cells/remotecell/status')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
if __name__ == '__main__':
unittest.main()
+212 -1
View File
@@ -1 +1,212 @@
# ... moved and adapted code from test_phase3_endpoints.py (email section) ...
#!/usr/bin/env python3
"""
Unit tests for email Flask endpoints in api/app.py.
Covers:
GET /api/email/users
POST /api/email/users
DELETE /api/email/users/<username>
GET /api/email/status
GET /api/email/connectivity
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
class TestGetEmailUsers(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.email_manager')
def test_get_users_returns_200_with_list(self, mock_em):
mock_em.get_users.return_value = [
{'username': 'alice@cell', 'domain': 'cell'},
{'username': 'bob@cell', 'domain': 'cell'},
]
r = self.client.get('/api/email/users')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 2)
@patch('app.email_manager')
def test_get_users_returns_empty_list_when_no_users(self, mock_em):
mock_em.get_users.return_value = []
r = self.client.get('/api/email/users')
self.assertEqual(r.status_code, 200)
self.assertEqual(json.loads(r.data), [])
@patch('app.email_manager')
def test_get_users_returns_500_on_exception(self, mock_em):
mock_em.get_users.side_effect = Exception('mailbox unreachable')
r = self.client.get('/api/email/users')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestCreateEmailUser(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.email_manager')
def test_create_user_returns_200_on_success(self, mock_em):
mock_em.create_email_user.return_value = True
r = self.client.post(
'/api/email/users',
data=json.dumps({'username': 'alice', 'password': 'secret123'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('created', data)
@patch('app.email_manager')
def test_create_user_calls_manager_with_username_and_password(self, mock_em):
mock_em.create_email_user.return_value = True
self.client.post(
'/api/email/users',
data=json.dumps({'username': 'alice', 'password': 'secret123'}),
content_type='application/json',
)
mock_em.create_email_user.assert_called_once()
args = mock_em.create_email_user.call_args[0]
self.assertEqual(args[0], 'alice')
self.assertEqual(args[2], 'secret123')
@patch('app.email_manager')
def test_create_user_returns_400_when_username_missing(self, mock_em):
r = self.client.post(
'/api/email/users',
data=json.dumps({'password': 'secret123'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_em.create_email_user.assert_not_called()
@patch('app.email_manager')
def test_create_user_returns_400_when_password_missing(self, mock_em):
r = self.client.post(
'/api/email/users',
data=json.dumps({'username': 'alice'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_em.create_email_user.assert_not_called()
@patch('app.email_manager')
def test_create_user_returns_400_when_no_body(self, mock_em):
r = self.client.post('/api/email/users')
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.email_manager')
def test_create_user_returns_500_on_exception(self, mock_em):
mock_em.create_email_user.side_effect = Exception('SMTP config error')
r = self.client.post(
'/api/email/users',
data=json.dumps({'username': 'alice', 'password': 'secret123'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestDeleteEmailUser(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.email_manager')
def test_delete_user_returns_200_on_success(self, mock_em):
mock_em.delete_email_user.return_value = True
r = self.client.delete('/api/email/users/alice')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('deleted', data)
@patch('app.email_manager')
def test_delete_user_calls_manager_with_username(self, mock_em):
mock_em.delete_email_user.return_value = True
self.client.delete('/api/email/users/bob')
mock_em.delete_email_user.assert_called_once()
args = mock_em.delete_email_user.call_args[0]
self.assertEqual(args[0], 'bob')
@patch('app.email_manager')
def test_delete_user_returns_500_on_exception(self, mock_em):
mock_em.delete_email_user.side_effect = Exception('LDAP error')
r = self.client.delete('/api/email/users/alice')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetEmailStatus(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.email_manager')
def test_get_status_returns_200_with_status_dict(self, mock_em):
mock_em.get_status.return_value = {
'running': True,
'smtp_port': 25,
'imap_port': 993,
}
r = self.client.get('/api/email/status')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('running', data)
@patch('app.email_manager')
def test_get_status_returns_500_on_exception(self, mock_em):
mock_em.get_status.side_effect = Exception('container not found')
r = self.client.get('/api/email/status')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestEmailConnectivity(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.email_manager')
def test_connectivity_returns_200_with_result(self, mock_em):
mock_em.test_connectivity.return_value = {
'smtp': True,
'imap': True,
'latency_ms': 12,
}
r = self.client.get('/api/email/connectivity')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('smtp', data)
@patch('app.email_manager')
def test_connectivity_returns_500_on_exception(self, mock_em):
mock_em.test_connectivity.side_effect = Exception('timeout')
r = self.client.get('/api/email/connectivity')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
if __name__ == '__main__':
unittest.main()
+142
View File
@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Tests for the enforce_auth before_request hook in api/app.py.
The hook has two distinct behaviours depending on the auth store state:
- users file exists and is POPULATED → auth is enforced (unauthenticated → 401)
- users file exists but is EMPTY → 503 (auth not configured)
- users file does not exist / unreadable → bypass (pre-auth compat mode)
These tests create real AuthManager instances pointing at tmp directories so
that list_users() and the file-readability check both behave exactly as they
do in production.
"""
import os
import sys
import json
import pytest
from pathlib import Path
from unittest.mock import patch
sys.path.insert(0, str(Path(__file__).parent.parent / 'api'))
@pytest.fixture
def flask_client():
from app import app
app.config['TESTING'] = True
return app.test_client()
@pytest.fixture
def populated_auth_manager(tmp_path):
"""AuthManager whose users file contains at least one admin account."""
from auth_manager import AuthManager
data_dir = str(tmp_path / 'data')
config_dir = str(tmp_path / 'config')
os.makedirs(data_dir, exist_ok=True)
os.makedirs(config_dir, exist_ok=True)
mgr = AuthManager(data_dir=data_dir, config_dir=config_dir)
# Create an admin so list_users() is non-empty
ok = mgr.create_user('admin', 'AdminPass123!', 'admin')
assert ok, 'Could not seed admin user for test'
return mgr
@pytest.fixture
def empty_auth_manager(tmp_path):
"""AuthManager whose users file exists and is readable but contains no users."""
from auth_manager import AuthManager
data_dir = str(tmp_path / 'data')
config_dir = str(tmp_path / 'config')
os.makedirs(data_dir, exist_ok=True)
os.makedirs(config_dir, exist_ok=True)
mgr = AuthManager(data_dir=data_dir, config_dir=config_dir)
# The constructor creates the file with '[]' (empty list). We do NOT add
# any user, so list_users() returns [] but the file is readable.
assert mgr.list_users() == [], 'Expected empty user list'
return mgr
# ── populated store → auth enforced ──────────────────────────────────────────
def test_populated_auth_manager_unauthenticated_request_gets_401(
flask_client, populated_auth_manager
):
"""When the auth store has users, unauthenticated API requests must get 401."""
with patch('app.auth_manager', populated_auth_manager):
r = flask_client.get('/api/status')
assert r.status_code == 401
data = json.loads(r.data)
assert 'error' in data
def test_populated_auth_manager_401_body_says_not_authenticated(
flask_client, populated_auth_manager
):
"""The 401 body must clearly indicate the session is missing."""
with patch('app.auth_manager', populated_auth_manager):
r = flask_client.get('/api/peers')
assert r.status_code == 401
data = json.loads(r.data)
assert 'Not authenticated' in data.get('error', '')
def test_populated_auth_manager_non_api_path_bypasses_auth(
flask_client, populated_auth_manager
):
"""Non-API paths like /health must always be public."""
with patch('app.auth_manager', populated_auth_manager):
r = flask_client.get('/health')
assert r.status_code == 200
def test_populated_auth_manager_auth_namespace_bypasses_auth(
flask_client, populated_auth_manager
):
"""The /api/auth/* namespace must always be accessible without a session."""
with patch('app.auth_manager', populated_auth_manager):
r = flask_client.get('/api/auth/me')
# /api/auth/me may return 401 from the route itself (no session), but it
# must NOT be blocked by enforce_auth; the enforce_auth hook must return None
# for /api/auth/* paths. The status must not be 503.
assert r.status_code != 503
# ── empty store → 503 ────────────────────────────────────────────────────────
def test_empty_auth_manager_returns_503_for_api_requests(
flask_client, empty_auth_manager
):
"""When the users file exists and is readable but empty, /api/* must get 503."""
with patch('app.auth_manager', empty_auth_manager):
r = flask_client.get('/api/status')
assert r.status_code == 503
data = json.loads(r.data)
assert 'error' in data
def test_empty_auth_manager_503_body_mentions_configuration(
flask_client, empty_auth_manager
):
"""The 503 error body must mention that auth is not configured."""
with patch('app.auth_manager', empty_auth_manager):
r = flask_client.get('/api/config')
assert r.status_code == 503
data = json.loads(r.data)
error_text = data.get('error', '')
assert 'not configured' in error_text.lower() or 'Authentication' in error_text
def test_empty_auth_manager_non_api_path_bypasses_503(
flask_client, empty_auth_manager
):
"""Even with an empty auth store, /health must remain accessible."""
with patch('app.auth_manager', empty_auth_manager):
r = flask_client.get('/health')
assert r.status_code == 200
if __name__ == '__main__':
pytest.main([__file__, '-v'])
+2 -2
View File
@@ -231,7 +231,7 @@ class TestFileCreateFolderEndpoint(unittest.TestCase):
mock_fm.create_folder.return_value = True
r = self.client.post(
'/api/files/folders',
data=json.dumps({'username': 'alice', 'folder': 'Archive'}),
data=json.dumps({'username': 'alice', 'folder_path': 'Archive'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
@@ -247,7 +247,7 @@ class TestFileCreateFolderEndpoint(unittest.TestCase):
mock_fm.create_folder.side_effect = Exception('quota exceeded')
r = self.client.post(
'/api/files/folders',
data=json.dumps({'username': 'alice', 'folder': 'NewFolder'}),
data=json.dumps({'username': 'alice', 'folder_path': 'NewFolder'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
+89 -6
View File
@@ -30,10 +30,12 @@ def _make_peer(ip, internet=True, services=None, peers=True):
class TestPeerComment(unittest.TestCase):
def test_dots_replaced_with_dashes(self):
self.assertEqual(firewall_manager._peer_comment('10.0.0.2'), 'pic-peer-10-0-0-2')
# Comment format now includes /32 suffix to prevent substring matches
# (e.g. pic-peer-10-0-0-1/32 is not a prefix of pic-peer-10-0-0-10/32)
self.assertEqual(firewall_manager._peer_comment('10.0.0.2'), 'pic-peer-10-0-0-2/32')
def test_different_ip(self):
self.assertEqual(firewall_manager._peer_comment('192.168.1.100'), 'pic-peer-192-168-1-100')
self.assertEqual(firewall_manager._peer_comment('192.168.1.100'), 'pic-peer-192-168-1-100/32')
# ---------------------------------------------------------------------------
@@ -115,6 +117,87 @@ class TestGenerateCorefile(unittest.TestCase):
self.assertFalse(result)
# ---------------------------------------------------------------------------
# generate_corefile with cell_links
# ---------------------------------------------------------------------------
class TestGenerateCorefileWithCellLinks(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.path = os.path.join(self.tmp, 'Corefile')
def tearDown(self):
shutil.rmtree(self.tmp)
def _content(self):
return open(self.path).read()
def test_cell_links_none_produces_no_forwarding_stanzas(self):
"""Default (None) produces no extra forwarding blocks beyond the primary zone."""
firewall_manager.generate_corefile([], self.path, cell_links=None)
content = self._content()
# The only 'forward' line should be the default internet forwarder
forward_lines = [l for l in content.splitlines() if 'forward' in l]
self.assertEqual(len(forward_lines), 1)
self.assertIn('8.8.8.8', forward_lines[0])
def test_cell_links_empty_list_produces_no_extra_stanzas(self):
"""An empty cell_links list produces no extra forwarding blocks."""
firewall_manager.generate_corefile([], self.path, cell_links=[])
content = self._content()
forward_lines = [l for l in content.splitlines() if 'forward' in l]
self.assertEqual(len(forward_lines), 1)
self.assertIn('8.8.8.8', forward_lines[0])
def test_single_cell_link_produces_forwarding_block(self):
"""One cell link produces one forwarding stanza with correct domain and dns_ip."""
cell_links = [{'domain': 'remote.cell', 'dns_ip': '10.1.0.1'}]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._content()
self.assertIn('remote.cell {', content)
self.assertIn('forward . 10.1.0.1', content)
self.assertIn('cache', content)
def test_multiple_cell_links_produce_multiple_forwarding_blocks(self):
"""Multiple cell links produce one stanza each."""
cell_links = [
{'domain': 'alpha.cell', 'dns_ip': '10.1.0.1'},
{'domain': 'beta.cell', 'dns_ip': '10.2.0.1'},
]
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._content()
self.assertIn('alpha.cell {', content)
self.assertIn('forward . 10.1.0.1', content)
self.assertIn('beta.cell {', content)
self.assertIn('forward . 10.2.0.1', content)
def test_cell_links_do_not_overwrite_peer_acls(self):
"""Cell link stanzas are appended; peer ACLs in the primary zone survive."""
peers = [_make_peer('10.0.0.3', services=['calendar'])]
cell_links = [{'domain': 'other.cell', 'dns_ip': '10.99.0.1'}]
firewall_manager.generate_corefile(peers, self.path, cell_links=cell_links)
content = self._content()
self.assertIn('block net 10.0.0.3/32', content)
self.assertIn('other.cell {', content)
self.assertIn('forward . 10.99.0.1', content)
def test_link_with_missing_domain_is_skipped(self):
"""A cell_link entry with no domain key is silently skipped."""
cell_links = [{'dns_ip': '10.1.0.1'}] # no 'domain'
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._content()
# Only the default internet forwarder
forward_lines = [l for l in content.splitlines() if 'forward' in l]
self.assertEqual(len(forward_lines), 1)
def test_link_with_missing_dns_ip_is_skipped(self):
"""A cell_link entry with no dns_ip key is silently skipped."""
cell_links = [{'domain': 'nope.cell'}] # no 'dns_ip'
firewall_manager.generate_corefile([], self.path, cell_links=cell_links)
content = self._content()
self.assertNotIn('nope.cell', content)
# ---------------------------------------------------------------------------
# apply_peer_rules — iptables call verification
# ---------------------------------------------------------------------------
@@ -227,8 +310,8 @@ class TestClearPeerRules(unittest.TestCase):
'*filter\n'
':INPUT ACCEPT [0:0]\n'
':FORWARD ACCEPT [0:0]\n'
'-A FORWARD -s 10.0.0.2 -m comment --comment pic-peer-10-0-0-2 -j ACCEPT\n'
'-A FORWARD -s 10.0.0.3 -m comment --comment pic-peer-10-0-0-3 -j DROP\n'
'-A FORWARD -s 10.0.0.2 -m comment --comment "pic-peer-10-0-0-2/32" -j ACCEPT\n'
'-A FORWARD -s 10.0.0.3 -m comment --comment "pic-peer-10-0-0-3/32" -j DROP\n'
'COMMIT\n'
)
restored = []
@@ -252,8 +335,8 @@ class TestClearPeerRules(unittest.TestCase):
self.assertEqual(len(restored), 1)
restored_content = restored[0]
self.assertNotIn('pic-peer-10-0-0-2', restored_content)
self.assertIn('pic-peer-10-0-0-3', restored_content)
self.assertNotIn('pic-peer-10-0-0-2/32', restored_content)
self.assertIn('pic-peer-10-0-0-3/32', restored_content)
def test_no_op_when_no_matching_rules(self):
save_output = '*filter\n:FORWARD ACCEPT [0:0]\nCOMMIT\n'
+136
View File
@@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""
Tests for the security input validation on PUT /api/config.
Validates that domain and cell_name fields reject injection characters
while allowing legitimate values (multi-label domains, hyphens, etc.).
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
def _put(client, payload):
return client.put(
'/api/config',
data=json.dumps(payload),
content_type='application/json',
)
class TestDomainValidation(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
def test_domain_with_newline_returns_400(self):
r = _put(self.client, {'domain': 'cell\nnewline'})
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
def test_domain_with_opening_brace_returns_400(self):
r = _put(self.client, {'domain': 'cell{injection}'})
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
def test_domain_with_semicolon_returns_400(self):
r = _put(self.client, {'domain': 'cell;rm -rf /'})
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
def test_domain_with_space_returns_400(self):
r = _put(self.client, {'domain': 'my cell'})
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
def test_domain_multilabel_with_dot_returns_200(self):
# Multi-label names like 'cell.local' or 'home.lan' must be accepted.
r = _put(self.client, {'domain': 'cell.local'})
# The endpoint may also return non-400 on 500 if downstream fails,
# but the validation itself must not reject dots.
self.assertNotEqual(r.status_code, 400)
def test_domain_simple_word_returns_200(self):
r = _put(self.client, {'domain': 'myhome'})
self.assertNotEqual(r.status_code, 400)
def test_domain_with_hyphen_returns_200(self):
r = _put(self.client, {'domain': 'my-cell'})
self.assertNotEqual(r.status_code, 400)
def test_domain_with_at_sign_returns_400(self):
r = _put(self.client, {'domain': 'cell@evil.com'})
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
def test_domain_with_slash_returns_400(self):
r = _put(self.client, {'domain': 'cell/etc'})
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
class TestCellNameValidation(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
def test_cell_name_with_space_returns_400(self):
r = _put(self.client, {'cell_name': 'my cell'})
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
def test_cell_name_with_dot_returns_400(self):
# cell_name is a single hostname component — dots are not allowed
r = _put(self.client, {'cell_name': 'my.cell'})
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
def test_cell_name_with_newline_returns_400(self):
r = _put(self.client, {'cell_name': 'cell\nevil'})
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
def test_cell_name_with_semicolon_returns_400(self):
r = _put(self.client, {'cell_name': 'cell;drop'})
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
def test_cell_name_valid_hyphenated_returns_200(self):
r = _put(self.client, {'cell_name': 'valid-name'})
self.assertNotEqual(r.status_code, 400)
def test_cell_name_simple_alpha_returns_200(self):
r = _put(self.client, {'cell_name': 'mycell'})
self.assertNotEqual(r.status_code, 400)
def test_cell_name_with_digits_returns_200(self):
r = _put(self.client, {'cell_name': 'cell01'})
self.assertNotEqual(r.status_code, 400)
def test_cell_name_with_brace_returns_400(self):
r = _put(self.client, {'cell_name': 'cell{x}'})
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
if __name__ == '__main__':
unittest.main()
+301
View File
@@ -0,0 +1,301 @@
#!/usr/bin/env python3
"""
Tests verifying that is_local_request() enforcement works correctly
per endpoint in api/app.py.
The audit flagged that is_local_request() checks are performed inline
(not via a decorator), so this file confirms:
1. Endpoints that call `is_local_request()` return 403 when the
function returns False (i.e., a non-local caller).
2. Endpoints that do NOT call `is_local_request()` still respond
normally (2xx / 4xx) for non-local callers.
Tested local-only endpoints (representative sample):
GET /api/containers list_containers
POST /api/containers/<n>/start
POST /api/containers/<n>/stop
POST /api/containers/<n>/restart
GET /api/containers/<n>/logs
GET /api/containers/<n>/stats
GET /api/vault/secrets
POST /api/vault/secrets
GET /api/vault/secrets/<name>
DELETE /api/vault/secrets/<name>
GET /api/containers POST with image field
GET /api/images
POST /api/images/pull
DELETE /api/images/<image>
GET /api/volumes
POST /api/volumes
DELETE /api/volumes/<name>
DELETE /api/containers/<name>
Tested public endpoints (no is_local_request guard):
GET /api/calendar/status
GET /api/dns/records
GET /api/dhcp/leases
GET /api/cells
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
def _non_local_client():
"""Return a Flask test client that pretends to come from a non-local address."""
app.config['TESTING'] = True
# Flask's test client uses '127.0.0.1' by default; override with a public IP
# by setting REMOTE_ADDR in the environ base.
return app.test_client()
# ── helpers ───────────────────────────────────────────────────────────────────
def _get_non_local(client, path):
"""Perform a GET request that appears to originate from a non-local IP."""
return client.get(path, environ_base={'REMOTE_ADDR': '203.0.113.1'})
def _post_non_local(client, path, body=None):
return client.post(
path,
data=json.dumps(body or {}),
content_type='application/json',
environ_base={'REMOTE_ADDR': '203.0.113.1'},
)
def _delete_non_local(client, path):
return client.delete(path, environ_base={'REMOTE_ADDR': '203.0.113.1'})
# ── local-only endpoint tests ─────────────────────────────────────────────────
class TestLocalOnlyEndpointsReturn403ForNonLocal(unittest.TestCase):
"""Every endpoint that calls is_local_request() must return 403 for external IPs."""
def setUp(self):
app.config['TESTING'] = True
self.client = _non_local_client()
# Container management
def test_list_containers_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/containers')
self.assertEqual(r.status_code, 403)
self.assertIn('error', json.loads(r.data))
def test_start_container_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/containers/myapp/start')
self.assertEqual(r.status_code, 403)
def test_stop_container_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/containers/myapp/stop')
self.assertEqual(r.status_code, 403)
def test_restart_container_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/containers/myapp/restart')
self.assertEqual(r.status_code, 403)
def test_get_container_logs_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/containers/myapp/logs')
self.assertEqual(r.status_code, 403)
def test_get_container_stats_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/containers/myapp/stats')
self.assertEqual(r.status_code, 403)
def test_remove_container_returns_403_for_non_local(self):
r = _delete_non_local(self.client, '/api/containers/myapp')
self.assertEqual(r.status_code, 403)
# Image management
def test_list_images_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/images')
self.assertEqual(r.status_code, 403)
def test_pull_image_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/images/pull', {'image': 'nginx:latest'})
self.assertEqual(r.status_code, 403)
def test_remove_image_returns_403_for_non_local(self):
r = _delete_non_local(self.client, '/api/images/nginx')
self.assertEqual(r.status_code, 403)
# Volume management
def test_list_volumes_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/volumes')
self.assertEqual(r.status_code, 403)
def test_create_volume_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/volumes', {'name': 'myvol'})
self.assertEqual(r.status_code, 403)
def test_remove_volume_returns_403_for_non_local(self):
r = _delete_non_local(self.client, '/api/volumes/myvol')
self.assertEqual(r.status_code, 403)
# Vault endpoints
def test_list_secrets_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/vault/secrets')
self.assertEqual(r.status_code, 403)
def test_store_secret_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/vault/secrets', {'name': 'k', 'value': 'v'})
self.assertEqual(r.status_code, 403)
def test_get_secret_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/vault/secrets/mykey')
self.assertEqual(r.status_code, 403)
def test_delete_secret_returns_403_for_non_local(self):
r = _delete_non_local(self.client, '/api/vault/secrets/mykey')
self.assertEqual(r.status_code, 403)
class TestLocalOnlyEndpointsAllowedFromLocalhost(unittest.TestCase):
"""The same endpoints must NOT return 403 for loopback / local callers."""
def setUp(self):
app.config['TESTING'] = True
# Default test client remote_addr is 127.0.0.1, which is local
self.client = app.test_client()
@patch('app.container_manager')
def test_list_containers_allowed_from_local(self, mock_cm):
mock_cm.list_containers.return_value = []
r = self.client.get('/api/containers')
self.assertNotEqual(r.status_code, 403)
@patch('app.container_manager')
def test_list_images_allowed_from_local(self, mock_cm):
mock_cm.list_images.return_value = []
r = self.client.get('/api/images')
self.assertNotEqual(r.status_code, 403)
@patch('app.container_manager')
def test_list_volumes_allowed_from_local(self, mock_cm):
mock_cm.list_volumes.return_value = []
r = self.client.get('/api/volumes')
self.assertNotEqual(r.status_code, 403)
# ── public endpoint tests — no is_local_request guard ────────────────────────
class TestPublicEndpointsNotBlockedForNonLocal(unittest.TestCase):
"""
Endpoints that do NOT call is_local_request() must remain reachable
from non-local addresses. A 403 here would indicate an unintended
broadening of the local-only guard.
"""
def setUp(self):
app.config['TESTING'] = True
self.client = _non_local_client()
@patch('app.calendar_manager')
def test_calendar_status_not_403_for_non_local(self, mock_cm):
mock_cm.get_status.return_value = {'running': True}
r = _get_non_local(self.client, '/api/calendar/status')
self.assertNotEqual(r.status_code, 403)
@patch('app.network_manager')
def test_dns_records_not_403_for_non_local(self, mock_nm):
mock_nm.get_dns_records.return_value = []
r = _get_non_local(self.client, '/api/dns/records')
self.assertNotEqual(r.status_code, 403)
@patch('app.network_manager')
def test_dhcp_leases_not_403_for_non_local(self, mock_nm):
mock_nm.get_dhcp_leases.return_value = []
r = _get_non_local(self.client, '/api/dhcp/leases')
self.assertNotEqual(r.status_code, 403)
@patch('app.cell_link_manager')
def test_cells_list_not_403_for_non_local(self, mock_clm):
mock_clm.list_connections.return_value = []
r = _get_non_local(self.client, '/api/cells')
self.assertNotEqual(r.status_code, 403)
def test_health_check_not_403_for_non_local(self):
r = _get_non_local(self.client, '/health')
self.assertNotEqual(r.status_code, 403)
# ── is_local_request logic unit tests ────────────────────────────────────────
class TestIsLocalRequestLogic(unittest.TestCase):
"""
Directly verify the is_local_request() function from app.py.
These tests exercise the address-checking logic without going through
a full HTTP request cycle.
"""
def setUp(self):
from app import is_local_request as _fn
self._fn = _fn
app.config['TESTING'] = True
def _call_with_addr(self, remote_addr, xff=None):
"""Push a fake request context and evaluate is_local_request()."""
from app import app as _app
headers = {}
if xff:
headers['X-Forwarded-For'] = xff
with _app.test_request_context('/', environ_base={'REMOTE_ADDR': remote_addr},
headers=headers):
return self._fn()
def test_loopback_127_is_local(self):
self.assertTrue(self._call_with_addr('127.0.0.1'))
def test_ipv6_loopback_is_local(self):
self.assertTrue(self._call_with_addr('::1'))
def test_docker_bridge_172_20_is_local(self):
# 172.20.x.x is inside 172.16.0.0/12
self.assertTrue(self._call_with_addr('172.20.0.5'))
def test_docker_bridge_172_16_boundary_is_local(self):
# Exact boundary of 172.16.0.0/12
self.assertTrue(self._call_with_addr('172.16.0.1'))
def test_public_ip_is_not_local(self):
self.assertFalse(self._call_with_addr('8.8.8.8'))
def test_wireguard_peer_10_0_0_x_is_not_local(self):
# WireGuard peer IPs (10.0.0.0/8) must NOT be treated as local
self.assertFalse(self._call_with_addr('10.0.0.2'))
def test_lan_192_168_is_not_local(self):
# LAN addresses must NOT be treated as local (comment in app.py confirms this)
self.assertFalse(self._call_with_addr('192.168.1.50'))
def test_xff_last_entry_loopback_is_local(self):
# Public remote addr, but last XFF entry is loopback → allowed
self.assertTrue(self._call_with_addr('8.8.8.8', xff='8.8.8.8, 127.0.0.1'))
def test_xff_first_entry_spoofed_loopback_not_local(self):
# Spoofed first XFF entry; last entry is a public IP → should be rejected
# remote_addr is also public to rule out that shortcut
result = self._call_with_addr('8.8.8.8', xff='127.0.0.1, 8.8.8.8')
self.assertFalse(result)
def test_xff_last_entry_docker_bridge_is_local(self):
# Last XFF entry is Caddy's Docker bridge address
self.assertTrue(self._call_with_addr('8.8.8.8', xff='1.2.3.4, 172.20.0.2'))
if __name__ == '__main__':
unittest.main()
+363
View File
@@ -0,0 +1,363 @@
#!/usr/bin/env python3
"""
Unit tests for logs Flask endpoints in api/app.py.
Covers:
GET /api/logs backend log file (reads picell.log)
GET /api/logs/services/<service> per-service logs via log_manager
POST /api/logs/search search across services
POST /api/logs/export export logs
GET /api/logs/statistics log stats
POST /api/logs/rotate rotate logs
GET /api/logs/files list log file info
GET /api/logs/verbosity get log levels
PUT /api/logs/verbosity set log levels
"""
import sys
import json
import os
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock, mock_open
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
class TestGetBackendLogs(unittest.TestCase):
"""GET /api/logs — reads picell.log from api directory."""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
def test_get_logs_returns_404_when_log_file_missing(self):
# Patch os.path.exists so the log file appears absent
with patch('app.os.path.exists', return_value=False):
r = self.client.get('/api/logs')
self.assertEqual(r.status_code, 404)
self.assertIn('error', json.loads(r.data))
def test_get_logs_returns_200_with_log_content(self):
log_content = 'INFO 2026-04-27 server started\nERROR something went wrong\n'
m = mock_open(read_data=log_content)
# Bypass auth enforcement by replacing auth_manager with a non-AuthManager object
with patch('app.auth_manager', MagicMock(spec=object)), \
patch('app.os.path.exists', return_value=True), \
patch('builtins.open', m):
r = self.client.get('/api/logs')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('log', data)
def test_get_logs_respects_lines_query_param(self):
# Produce 10 lines; request only last 3
all_lines = [f'line {i}\n' for i in range(10)]
m = mock_open(read_data=''.join(all_lines))
m.return_value.__iter__ = lambda s: iter(all_lines)
m.return_value.readlines = lambda: all_lines
# Bypass auth enforcement by replacing auth_manager with a non-AuthManager object
with patch('app.auth_manager', MagicMock(spec=object)), \
patch('app.os.path.exists', return_value=True), \
patch('builtins.open', m):
r = self.client.get('/api/logs?lines=3')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
# The tail should contain only the last 3 lines
self.assertIn('line 7', data['log'])
self.assertIn('line 8', data['log'])
self.assertIn('line 9', data['log'])
def test_get_logs_returns_500_on_exception(self):
with patch('app.os.path.exists', return_value=True), \
patch('builtins.open', side_effect=PermissionError('access denied')):
r = self.client.get('/api/logs')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetServiceLogs(unittest.TestCase):
"""GET /api/logs/services/<service>"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_get_service_logs_returns_200_with_log_data(self, mock_lm):
mock_lm.get_service_logs.return_value = [
'[INFO] 2026-04-27 dns started',
'[WARN] 2026-04-27 retry attempt',
]
r = self.client.get('/api/logs/services/dns')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertEqual(data['service'], 'dns')
self.assertIsInstance(data['logs'], list)
self.assertEqual(len(data['logs']), 2)
@patch('app.log_manager')
def test_get_service_logs_passes_level_and_lines_params(self, mock_lm):
mock_lm.get_service_logs.return_value = []
self.client.get('/api/logs/services/email?level=ERROR&lines=25')
mock_lm.get_service_logs.assert_called_once_with('email', 'ERROR', 25)
@patch('app.log_manager')
def test_get_service_logs_uses_defaults_when_params_absent(self, mock_lm):
mock_lm.get_service_logs.return_value = []
self.client.get('/api/logs/services/wireguard')
mock_lm.get_service_logs.assert_called_once_with('wireguard', 'INFO', 50)
@patch('app.log_manager')
def test_get_service_logs_returns_500_on_exception(self, mock_lm):
mock_lm.get_service_logs.side_effect = Exception('log file missing')
r = self.client.get('/api/logs/services/calendar')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestSearchLogs(unittest.TestCase):
"""POST /api/logs/search"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_search_logs_returns_200_with_results_and_count(self, mock_lm):
mock_lm.search_logs.return_value = [
{'service': 'dns', 'line': 'ERROR timeout'},
]
r = self.client.post(
'/api/logs/search',
data=json.dumps({'query': 'ERROR', 'services': ['dns']}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('results', data)
self.assertIn('count', data)
self.assertEqual(data['count'], 1)
@patch('app.log_manager')
def test_search_logs_works_with_empty_body(self, mock_lm):
mock_lm.search_logs.return_value = []
r = self.client.post('/api/logs/search')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertEqual(data['results'], [])
self.assertEqual(data['count'], 0)
@patch('app.log_manager')
def test_search_logs_returns_500_on_exception(self, mock_lm):
mock_lm.search_logs.side_effect = Exception('index unavailable')
r = self.client.post(
'/api/logs/search',
data=json.dumps({'query': 'fail'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestExportLogs(unittest.TestCase):
"""POST /api/logs/export"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_export_logs_returns_200_with_log_data_and_format(self, mock_lm):
mock_lm.export_logs.return_value = '[{"ts":1,"msg":"ok"}]'
r = self.client.post(
'/api/logs/export',
data=json.dumps({'format': 'json', 'filters': {'service': 'dns'}}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('logs', data)
self.assertIn('format', data)
self.assertEqual(data['format'], 'json')
@patch('app.log_manager')
def test_export_logs_defaults_to_json_format(self, mock_lm):
mock_lm.export_logs.return_value = '[]'
self.client.post('/api/logs/export')
mock_lm.export_logs.assert_called_once_with('json', {})
@patch('app.log_manager')
def test_export_logs_returns_500_on_exception(self, mock_lm):
mock_lm.export_logs.side_effect = Exception('export failed')
r = self.client.post(
'/api/logs/export',
data=json.dumps({'format': 'csv'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetLogStatistics(unittest.TestCase):
"""GET /api/logs/statistics"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_get_statistics_returns_200_with_stats_dict(self, mock_lm):
mock_lm.get_log_statistics.return_value = {
'total_lines': 1200,
'error_count': 3,
'warn_count': 17,
}
r = self.client.get('/api/logs/statistics')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('total_lines', data)
@patch('app.log_manager')
def test_get_statistics_passes_service_param(self, mock_lm):
mock_lm.get_log_statistics.return_value = {}
self.client.get('/api/logs/statistics?service=email')
mock_lm.get_log_statistics.assert_called_once_with('email')
@patch('app.log_manager')
def test_get_statistics_passes_none_when_no_service_param(self, mock_lm):
mock_lm.get_log_statistics.return_value = {}
self.client.get('/api/logs/statistics')
mock_lm.get_log_statistics.assert_called_once_with(None)
@patch('app.log_manager')
def test_get_statistics_returns_500_on_exception(self, mock_lm):
mock_lm.get_log_statistics.side_effect = Exception('stats error')
r = self.client.get('/api/logs/statistics')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestRotateLogs(unittest.TestCase):
"""POST /api/logs/rotate"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_rotate_all_logs_returns_200(self, mock_lm):
r = self.client.post('/api/logs/rotate')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('message', data)
mock_lm.rotate_logs.assert_called_once_with(None)
@patch('app.log_manager')
def test_rotate_specific_service_passes_service_name(self, mock_lm):
r = self.client.post(
'/api/logs/rotate',
data=json.dumps({'service': 'dns'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
mock_lm.rotate_logs.assert_called_once_with('dns')
@patch('app.log_manager')
def test_rotate_returns_500_on_exception(self, mock_lm):
mock_lm.rotate_logs.side_effect = Exception('rotate failed')
r = self.client.post('/api/logs/rotate')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetLogFileInfos(unittest.TestCase):
"""GET /api/logs/files"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_get_log_files_returns_200_with_file_list(self, mock_lm):
mock_lm.get_all_log_file_infos.return_value = [
{'service': 'dns', 'path': '/data/logs/dns.log', 'size_bytes': 4096},
{'service': 'email', 'path': '/data/logs/email.log', 'size_bytes': 8192},
]
r = self.client.get('/api/logs/files')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 2)
@patch('app.log_manager')
def test_get_log_files_returns_500_on_exception(self, mock_lm):
mock_lm.get_all_log_file_infos.side_effect = Exception('filesystem error')
r = self.client.get('/api/logs/files')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestLogVerbosity(unittest.TestCase):
"""GET /api/logs/verbosity and PUT /api/logs/verbosity"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.log_manager')
def test_get_verbosity_returns_200_with_levels_map(self, mock_lm):
mock_lm.get_service_levels.return_value = {
'dns': 'INFO',
'email': 'DEBUG',
'wireguard': 'WARNING',
}
r = self.client.get('/api/logs/verbosity')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('dns', data)
self.assertEqual(data['email'], 'DEBUG')
@patch('app.log_manager')
def test_get_verbosity_returns_500_on_exception(self, mock_lm):
mock_lm.get_service_levels.side_effect = Exception('config missing')
r = self.client.get('/api/logs/verbosity')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
@patch('app.log_manager')
def test_put_verbosity_returns_200_and_calls_set_level(self, mock_lm):
mock_lm.get_service_levels.return_value = {'dns': 'DEBUG'}
with tempfile.TemporaryDirectory() as tmpdir:
# Endpoint builds: os.path.join(os.path.dirname(__file__), 'config', 'log_levels.json')
# Patch dirname to return tmpdir so the full path becomes tmpdir/config/log_levels.json
config_dir = os.path.join(tmpdir, 'config')
os.makedirs(config_dir)
with patch('app.auth_manager', MagicMock(spec=object)), \
patch('app.os.path.dirname', return_value=tmpdir):
r = self.client.put(
'/api/logs/verbosity',
data=json.dumps({'dns': 'DEBUG'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
mock_lm.set_service_level.assert_called_with('dns', 'DEBUG')
@patch('app.log_manager')
def test_put_verbosity_returns_500_on_exception(self, mock_lm):
mock_lm.set_service_level.side_effect = Exception('unknown service')
r = self.client.put(
'/api/logs/verbosity',
data=json.dumps({'unknown_svc': 'DEBUG'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
if __name__ == '__main__':
unittest.main()
+353 -1
View File
@@ -1 +1,353 @@
# ... moved and adapted code from test_phase1_endpoints.py ...
#!/usr/bin/env python3
"""
Unit tests for network/DNS/DHCP Flask endpoints in api/app.py.
Covers:
GET /api/dns/records
POST /api/dns/records
DELETE /api/dns/records
GET /api/dns/status
GET /api/dhcp/leases
POST /api/dhcp/reservations
DELETE /api/dhcp/reservations
GET /api/network/info
POST /api/network/test
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
class TestGetDnsRecords(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.network_manager')
def test_get_dns_records_returns_200_with_list(self, mock_nm):
mock_nm.get_dns_records.return_value = [
{'name': 'myhost.cell', 'type': 'A', 'value': '192.168.1.10'},
{'name': 'nas.cell', 'type': 'A', 'value': '192.168.1.20'},
]
r = self.client.get('/api/dns/records')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 2)
@patch('app.network_manager')
def test_get_dns_records_returns_empty_list_when_none(self, mock_nm):
mock_nm.get_dns_records.return_value = []
r = self.client.get('/api/dns/records')
self.assertEqual(r.status_code, 200)
self.assertEqual(json.loads(r.data), [])
@patch('app.network_manager')
def test_get_dns_records_returns_500_on_exception(self, mock_nm):
mock_nm.get_dns_records.side_effect = Exception('CoreDNS unreachable')
r = self.client.get('/api/dns/records')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestAddDnsRecord(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.network_manager')
def test_add_dns_record_returns_200_on_valid_body(self, mock_nm):
mock_nm.add_dns_record.return_value = {'success': True}
r = self.client.post(
'/api/dns/records',
data=json.dumps({'name': 'printer.cell', 'type': 'A', 'value': '192.168.1.50'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('success', data)
@patch('app.network_manager')
def test_add_dns_record_returns_400_when_no_body(self, mock_nm):
r = self.client.post('/api/dns/records')
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_nm.add_dns_record.assert_not_called()
@patch('app.network_manager')
def test_add_dns_record_returns_500_on_exception(self, mock_nm):
mock_nm.add_dns_record.side_effect = Exception('Corefile write failed')
r = self.client.post(
'/api/dns/records',
data=json.dumps({'name': 'bad.cell', 'type': 'A', 'value': '10.0.0.1'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestDeleteDnsRecord(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.network_manager')
def test_delete_dns_record_returns_200_on_success(self, mock_nm):
mock_nm.remove_dns_record.return_value = {'success': True}
r = self.client.delete(
'/api/dns/records',
data=json.dumps({'name': 'printer.cell'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
@patch('app.network_manager')
def test_delete_dns_record_returns_500_on_exception(self, mock_nm):
mock_nm.remove_dns_record.side_effect = Exception('record not found')
r = self.client.delete(
'/api/dns/records',
data=json.dumps({'name': 'missing.cell'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetDnsStatus(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.network_manager')
def test_get_dns_status_returns_200_with_status_dict(self, mock_nm):
mock_nm.get_dns_status.return_value = {
'running': True,
'records_count': 5,
'upstreams': ['1.1.1.1', '8.8.8.8'],
}
r = self.client.get('/api/dns/status')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('running', data)
@patch('app.network_manager')
def test_get_dns_status_returns_500_on_exception(self, mock_nm):
mock_nm.get_dns_status.side_effect = Exception('CoreDNS not running')
r = self.client.get('/api/dns/status')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetDhcpLeases(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.network_manager')
def test_get_dhcp_leases_returns_200_with_list(self, mock_nm):
mock_nm.get_dhcp_leases.return_value = [
{'mac': 'aa:bb:cc:dd:ee:ff', 'ip': '192.168.1.101', 'hostname': 'laptop'},
]
r = self.client.get('/api/dhcp/leases')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 1)
self.assertEqual(data[0]['hostname'], 'laptop')
@patch('app.network_manager')
def test_get_dhcp_leases_returns_empty_list_when_no_leases(self, mock_nm):
mock_nm.get_dhcp_leases.return_value = []
r = self.client.get('/api/dhcp/leases')
self.assertEqual(r.status_code, 200)
self.assertEqual(json.loads(r.data), [])
@patch('app.network_manager')
def test_get_dhcp_leases_returns_500_on_exception(self, mock_nm):
mock_nm.get_dhcp_leases.side_effect = Exception('dnsmasq not running')
r = self.client.get('/api/dhcp/leases')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestAddDhcpReservation(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.network_manager')
def test_add_reservation_returns_200_on_valid_body(self, mock_nm):
mock_nm.add_dhcp_reservation.return_value = True
r = self.client.post(
'/api/dhcp/reservations',
data=json.dumps({'mac': 'aa:bb:cc:dd:ee:ff', 'ip': '192.168.1.50', 'hostname': 'printer'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('success', data)
@patch('app.network_manager')
def test_add_reservation_returns_400_when_no_body(self, mock_nm):
r = self.client.post('/api/dhcp/reservations')
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_nm.add_dhcp_reservation.assert_not_called()
@patch('app.network_manager')
def test_add_reservation_returns_400_when_mac_missing(self, mock_nm):
r = self.client.post(
'/api/dhcp/reservations',
data=json.dumps({'ip': '192.168.1.50'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.network_manager')
def test_add_reservation_returns_400_when_ip_missing(self, mock_nm):
r = self.client.post(
'/api/dhcp/reservations',
data=json.dumps({'mac': 'aa:bb:cc:dd:ee:ff'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.network_manager')
def test_add_reservation_uses_empty_hostname_when_omitted(self, mock_nm):
mock_nm.add_dhcp_reservation.return_value = True
self.client.post(
'/api/dhcp/reservations',
data=json.dumps({'mac': 'aa:bb:cc:dd:ee:ff', 'ip': '192.168.1.50'}),
content_type='application/json',
)
mock_nm.add_dhcp_reservation.assert_called_once_with('aa:bb:cc:dd:ee:ff', '192.168.1.50', '')
@patch('app.network_manager')
def test_add_reservation_returns_500_on_exception(self, mock_nm):
mock_nm.add_dhcp_reservation.side_effect = Exception('dnsmasq config error')
r = self.client.post(
'/api/dhcp/reservations',
data=json.dumps({'mac': 'aa:bb:cc:dd:ee:ff', 'ip': '192.168.1.50'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestDeleteDhcpReservation(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.network_manager')
def test_delete_reservation_returns_200_on_success(self, mock_nm):
mock_nm.remove_dhcp_reservation.return_value = True
r = self.client.delete(
'/api/dhcp/reservations',
data=json.dumps({'mac': 'aa:bb:cc:dd:ee:ff'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('success', data)
@patch('app.network_manager')
def test_delete_reservation_returns_400_when_mac_missing(self, mock_nm):
r = self.client.delete(
'/api/dhcp/reservations',
data=json.dumps({'hostname': 'printer'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_nm.remove_dhcp_reservation.assert_not_called()
@patch('app.network_manager')
def test_delete_reservation_returns_400_when_no_body(self, mock_nm):
r = self.client.delete('/api/dhcp/reservations')
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.network_manager')
def test_delete_reservation_returns_500_on_exception(self, mock_nm):
mock_nm.remove_dhcp_reservation.side_effect = Exception('reservation not found')
r = self.client.delete(
'/api/dhcp/reservations',
data=json.dumps({'mac': 'aa:bb:cc:dd:ee:ff'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetNetworkInfo(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.network_manager')
def test_get_network_info_returns_200_with_info_dict(self, mock_nm):
mock_nm.get_network_info.return_value = {
'interfaces': ['eth0', 'wg0'],
'gateway': '192.168.1.1',
'dns': ['127.0.0.1'],
}
r = self.client.get('/api/network/info')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('interfaces', data)
@patch('app.network_manager')
def test_get_network_info_returns_500_on_exception(self, mock_nm):
mock_nm.get_network_info.side_effect = Exception('network unreachable')
r = self.client.get('/api/network/info')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestNetworkTest(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.network_manager')
def test_network_test_returns_200_with_result(self, mock_nm):
mock_nm.test_connectivity.return_value = {
'internet': True,
'dns': True,
'latency_ms': 15,
}
r = self.client.post('/api/network/test')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('internet', data)
@patch('app.network_manager')
def test_network_test_returns_500_on_exception(self, mock_nm):
mock_nm.test_connectivity.side_effect = Exception('ping failed')
r = self.client.post('/api/network/test')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
if __name__ == '__main__':
unittest.main()
+6 -4
View File
@@ -399,11 +399,13 @@ class TestCellDnsForwarding(unittest.TestCase):
self.assertNotIn('10.1.0.1', content)
@patch('subprocess.run')
def test_remove_nonexistent_forward_is_noop(self, _mock):
before = open(self.corefile).read()
self.nm.remove_cell_dns_forward('nonexistent.cell')
def test_remove_nonexistent_forward_does_not_error(self, _mock):
# Removing a domain that was never added must not raise and must not
# leave the nonexistent domain in the regenerated Corefile.
result = self.nm.remove_cell_dns_forward('nonexistent.cell')
after = open(self.corefile).read()
self.assertEqual(before, after)
self.assertNotIn('nonexistent.cell', after)
# The Corefile is regenerated (new canonical format) — that's correct.
if __name__ == '__main__':
+182
View File
@@ -0,0 +1,182 @@
#!/usr/bin/env python3
"""
Edge-case tests for peer management endpoints in api/app.py.
Key scenarios:
- POST /api/peers with subnet exhaustion (_next_peer_ip raises ValueError) 409
- POST /api/peers/<name>/clear-reinstall: success (200)
- POST /api/peers/<name>/clear-reinstall: unknown peer raises 500
- POST /api/ip-update: missing 'peer' field 400
- POST /api/ip-update: missing 'ip' field 400
- POST /api/ip-update: unknown peer 404
- POST /api/ip-update: success 200
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
class TestAddPeerSubnetExhaustion(unittest.TestCase):
"""POST /api/peers with no free IPs left must return 409, not 500."""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app._next_peer_ip')
@patch('app.auth_manager')
def test_add_peer_returns_409_when_subnet_exhausted(self, mock_auth, mock_next_ip):
mock_auth.create_user.return_value = True
mock_next_ip.side_effect = ValueError('No free IPs left in 10.0.0.0/24')
r = self.client.post(
'/api/peers',
data=json.dumps({
'name': 'newpeer',
'public_key': 'PUBKEY==',
'password': 'verysecret123',
}),
content_type='application/json',
)
self.assertEqual(r.status_code, 409)
data = json.loads(r.data)
self.assertIn('error', data)
@patch('app._next_peer_ip')
@patch('app.auth_manager')
def test_add_peer_409_error_message_mentions_ip(self, mock_auth, mock_next_ip):
mock_auth.create_user.return_value = True
mock_next_ip.side_effect = ValueError('No free IPs left in 10.0.0.0/24')
r = self.client.post(
'/api/peers',
data=json.dumps({
'name': 'newpeer',
'public_key': 'PUBKEY==',
'password': 'verysecret123',
}),
content_type='application/json',
)
self.assertEqual(r.status_code, 409)
data = json.loads(r.data)
self.assertIn('No free IPs', data['error'])
class TestClearReinstallFlag(unittest.TestCase):
"""POST /api/peers/<name>/clear-reinstall"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.peer_registry')
def test_clear_reinstall_returns_200_on_success(self, mock_reg):
mock_reg.clear_reinstall_flag.return_value = True
r = self.client.post('/api/peers/alice/clear-reinstall')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('message', data)
@patch('app.peer_registry')
def test_clear_reinstall_calls_registry_with_peer_name(self, mock_reg):
mock_reg.clear_reinstall_flag.return_value = True
self.client.post('/api/peers/bob/clear-reinstall')
mock_reg.clear_reinstall_flag.assert_called_once_with('bob')
@patch('app.peer_registry')
def test_clear_reinstall_returns_500_when_exception_raised(self, mock_reg):
mock_reg.clear_reinstall_flag.side_effect = Exception('peer not found')
r = self.client.post('/api/peers/ghost/clear-reinstall')
self.assertEqual(r.status_code, 500)
data = json.loads(r.data)
self.assertIn('error', data)
class TestIpUpdate(unittest.TestCase):
"""POST /api/ip-update"""
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.routing_manager')
@patch('app.peer_registry')
def test_ip_update_returns_200_on_success(self, mock_reg, mock_rm):
mock_reg.update_peer_ip.return_value = True
mock_rm.update_peer_ip.return_value = None
r = self.client.post(
'/api/ip-update',
data=json.dumps({'peer': 'alice', 'ip': '10.0.0.99'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('message', data)
@patch('app.peer_registry')
def test_ip_update_returns_400_when_peer_field_missing(self, mock_reg):
r = self.client.post(
'/api/ip-update',
data=json.dumps({'ip': '10.0.0.99'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
mock_reg.update_peer_ip.assert_not_called()
@patch('app.peer_registry')
def test_ip_update_returns_400_when_ip_field_missing(self, mock_reg):
r = self.client.post(
'/api/ip-update',
data=json.dumps({'peer': 'alice'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
data = json.loads(r.data)
self.assertIn('error', data)
mock_reg.update_peer_ip.assert_not_called()
@patch('app.peer_registry')
def test_ip_update_returns_400_when_no_body(self, mock_reg):
r = self.client.post('/api/ip-update')
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
@patch('app.peer_registry')
def test_ip_update_returns_404_when_peer_not_found(self, mock_reg):
mock_reg.update_peer_ip.return_value = False
r = self.client.post(
'/api/ip-update',
data=json.dumps({'peer': 'ghost', 'ip': '10.0.0.50'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 404)
data = json.loads(r.data)
self.assertIn('error', data)
@patch('app.routing_manager')
@patch('app.peer_registry')
def test_ip_update_calls_registry_with_correct_args(self, mock_reg, mock_rm):
mock_reg.update_peer_ip.return_value = True
mock_rm.update_peer_ip.return_value = None
self.client.post(
'/api/ip-update',
data=json.dumps({'peer': 'alice', 'ip': '10.0.0.5'}),
content_type='application/json',
)
mock_reg.update_peer_ip.assert_called_once_with('alice', '10.0.0.5')
if __name__ == '__main__':
unittest.main()
+176
View File
@@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""
Tests for PUT /api/peers/<peer_name>.
Key scenarios:
- 404 when peer_registry.get_peer returns None
- 200 on successful update
- config_needs_reinstall=True in response when internet_access changes
- config_needs_reinstall=False (config_changed=False) when only description changes
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
class TestUpdatePeer(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.firewall_manager')
@patch('app.peer_registry')
def test_update_peer_returns_404_when_peer_not_found(self, mock_reg, mock_fw):
mock_reg.get_peer.return_value = None
r = self.client.put(
'/api/peers/ghost',
data=json.dumps({'description': 'updated'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 404)
data = json.loads(r.data)
self.assertIn('error', data)
@patch('app.firewall_manager')
@patch('app.peer_registry')
def test_update_peer_returns_200_on_success(self, mock_reg, mock_fw):
existing = {
'peer': 'alice',
'ip': '10.0.0.2',
'internet_access': True,
'public_key': 'KEY==',
}
mock_reg.get_peer.return_value = existing
mock_reg.update_peer.return_value = True
mock_reg.list_peers.return_value = [existing]
mock_fw.apply_peer_rules.return_value = None
mock_fw.apply_all_dns_rules.return_value = None
r = self.client.put(
'/api/peers/alice',
data=json.dumps({'description': 'my laptop'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('message', data)
@patch('app.firewall_manager')
@patch('app.peer_registry')
def test_update_peer_config_changed_true_when_internet_access_changes(
self, mock_reg, mock_fw
):
existing = {
'peer': 'alice',
'ip': '10.0.0.2',
'internet_access': True,
'public_key': 'KEY==',
}
mock_reg.get_peer.return_value = existing
mock_reg.update_peer.return_value = True
mock_reg.list_peers.return_value = [existing]
mock_fw.apply_peer_rules.return_value = None
mock_fw.apply_all_dns_rules.return_value = None
r = self.client.put(
'/api/peers/alice',
data=json.dumps({'internet_access': False}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertTrue(data['config_changed'])
@patch('app.firewall_manager')
@patch('app.peer_registry')
def test_update_peer_config_changed_false_when_only_description_changes(
self, mock_reg, mock_fw
):
existing = {
'peer': 'alice',
'ip': '10.0.0.2',
'internet_access': True,
'public_key': 'KEY==',
}
mock_reg.get_peer.return_value = existing
mock_reg.update_peer.return_value = True
mock_reg.list_peers.return_value = [existing]
mock_fw.apply_peer_rules.return_value = None
mock_fw.apply_all_dns_rules.return_value = None
r = self.client.put(
'/api/peers/alice',
data=json.dumps({'description': 'just a label'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertFalse(data['config_changed'])
@patch('app.firewall_manager')
@patch('app.peer_registry')
def test_update_peer_returns_500_when_update_fails(self, mock_reg, mock_fw):
existing = {
'peer': 'alice',
'ip': '10.0.0.2',
'internet_access': True,
'public_key': 'KEY==',
}
mock_reg.get_peer.return_value = existing
mock_reg.update_peer.return_value = False
r = self.client.put(
'/api/peers/alice',
data=json.dumps({'description': 'fail'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
@patch('app.firewall_manager')
@patch('app.peer_registry')
def test_update_peer_config_changed_true_when_ip_changes(self, mock_reg, mock_fw):
existing = {
'peer': 'alice',
'ip': '10.0.0.2',
'internet_access': True,
'public_key': 'KEY==',
}
mock_reg.get_peer.return_value = existing
mock_reg.update_peer.return_value = True
mock_reg.list_peers.return_value = [existing]
mock_fw.apply_peer_rules.return_value = None
mock_fw.apply_all_dns_rules.return_value = None
r = self.client.put(
'/api/peers/alice',
data=json.dumps({'ip': '10.0.0.99'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertTrue(data['config_changed'])
@patch('app.peer_registry')
def test_update_peer_returns_500_on_exception(self, mock_reg):
mock_reg.get_peer.side_effect = Exception('disk error')
r = self.client.put(
'/api/peers/alice',
data=json.dumps({'description': 'test'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
if __name__ == '__main__':
unittest.main()
+294 -1
View File
@@ -1 +1,294 @@
# ... moved and adapted code from test_phase4_endpoints.py ...
#!/usr/bin/env python3
"""
Unit tests for routing Flask endpoints in api/app.py.
Covers:
POST /api/routing/peers (peer_name + peer_ip required)
POST /api/routing/exit-nodes (peer_name + peer_ip required)
POST /api/routing/bridge (source_peer + target_peer required)
POST /api/routing/split (network + exit_peer required)
GET /api/routing/peers
DELETE /api/routing/peers/<name>
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
class TestAddPeerRoute(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.routing_manager')
def test_add_peer_route_returns_200_on_success(self, mock_rm):
mock_rm.add_peer_route.return_value = True
r = self.client.post(
'/api/routing/peers',
data=json.dumps({'peer_name': 'alice', 'peer_ip': '10.0.0.2'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('added', data)
@patch('app.routing_manager')
def test_add_peer_route_returns_400_when_peer_name_missing(self, mock_rm):
r = self.client.post(
'/api/routing/peers',
data=json.dumps({'peer_ip': '10.0.0.2'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_rm.add_peer_route.assert_not_called()
@patch('app.routing_manager')
def test_add_peer_route_returns_400_when_peer_ip_missing(self, mock_rm):
r = self.client.post(
'/api/routing/peers',
data=json.dumps({'peer_name': 'alice'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_rm.add_peer_route.assert_not_called()
@patch('app.routing_manager')
def test_add_peer_route_returns_500_on_exception(self, mock_rm):
mock_rm.add_peer_route.side_effect = Exception('iptables error')
r = self.client.post(
'/api/routing/peers',
data=json.dumps({'peer_name': 'alice', 'peer_ip': '10.0.0.2'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestAddExitNode(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.routing_manager')
def test_add_exit_node_returns_200_on_success(self, mock_rm):
mock_rm.add_exit_node.return_value = True
r = self.client.post(
'/api/routing/exit-nodes',
data=json.dumps({'peer_name': 'gw', 'peer_ip': '10.0.0.5'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('added', data)
@patch('app.routing_manager')
def test_add_exit_node_returns_400_when_peer_name_missing(self, mock_rm):
r = self.client.post(
'/api/routing/exit-nodes',
data=json.dumps({'peer_ip': '10.0.0.5'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_rm.add_exit_node.assert_not_called()
@patch('app.routing_manager')
def test_add_exit_node_returns_400_when_peer_ip_missing(self, mock_rm):
r = self.client.post(
'/api/routing/exit-nodes',
data=json.dumps({'peer_name': 'gw'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_rm.add_exit_node.assert_not_called()
@patch('app.routing_manager')
def test_add_exit_node_returns_500_on_exception(self, mock_rm):
mock_rm.add_exit_node.side_effect = Exception('routing table full')
r = self.client.post(
'/api/routing/exit-nodes',
data=json.dumps({'peer_name': 'gw', 'peer_ip': '10.0.0.5'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestAddBridgeRoute(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.routing_manager')
def test_add_bridge_returns_200_on_success(self, mock_rm):
mock_rm.add_bridge_route.return_value = True
r = self.client.post(
'/api/routing/bridge',
data=json.dumps({'source_peer': 'alice', 'target_peer': 'bob'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('added', data)
@patch('app.routing_manager')
def test_add_bridge_returns_400_when_source_peer_missing(self, mock_rm):
r = self.client.post(
'/api/routing/bridge',
data=json.dumps({'target_peer': 'bob'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_rm.add_bridge_route.assert_not_called()
@patch('app.routing_manager')
def test_add_bridge_returns_400_when_target_peer_missing(self, mock_rm):
r = self.client.post(
'/api/routing/bridge',
data=json.dumps({'source_peer': 'alice'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_rm.add_bridge_route.assert_not_called()
@patch('app.routing_manager')
def test_add_bridge_returns_500_on_exception(self, mock_rm):
mock_rm.add_bridge_route.side_effect = Exception('bridge setup failed')
r = self.client.post(
'/api/routing/bridge',
data=json.dumps({'source_peer': 'alice', 'target_peer': 'bob'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestAddSplitRoute(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.routing_manager')
def test_add_split_returns_200_on_success(self, mock_rm):
mock_rm.add_split_route.return_value = True
r = self.client.post(
'/api/routing/split',
data=json.dumps({'network': '192.168.10.0/24', 'exit_peer': 'gw'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('added', data)
@patch('app.routing_manager')
def test_add_split_returns_400_when_network_missing(self, mock_rm):
r = self.client.post(
'/api/routing/split',
data=json.dumps({'exit_peer': 'gw'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_rm.add_split_route.assert_not_called()
@patch('app.routing_manager')
def test_add_split_returns_400_when_exit_peer_missing(self, mock_rm):
r = self.client.post(
'/api/routing/split',
data=json.dumps({'network': '192.168.10.0/24'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 400)
self.assertIn('error', json.loads(r.data))
mock_rm.add_split_route.assert_not_called()
@patch('app.routing_manager')
def test_add_split_returns_500_on_exception(self, mock_rm):
mock_rm.add_split_route.side_effect = Exception('split tunnel error')
r = self.client.post(
'/api/routing/split',
data=json.dumps({'network': '192.168.10.0/24', 'exit_peer': 'gw'}),
content_type='application/json',
)
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestGetPeerRoutes(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.routing_manager')
def test_get_peer_routes_returns_200_with_routes(self, mock_rm):
mock_rm.get_peer_routes.return_value = [
{'peer_name': 'alice', 'peer_ip': '10.0.0.2', 'route_type': 'lan'},
]
r = self.client.get('/api/routing/peers')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertIn('peer_routes', data)
self.assertIsInstance(data['peer_routes'], list)
@patch('app.routing_manager')
def test_get_peer_routes_returns_empty_list_when_no_routes(self, mock_rm):
mock_rm.get_peer_routes.return_value = []
r = self.client.get('/api/routing/peers')
self.assertEqual(r.status_code, 200)
data = json.loads(r.data)
self.assertEqual(data['peer_routes'], [])
@patch('app.routing_manager')
def test_get_peer_routes_returns_500_on_exception(self, mock_rm):
mock_rm.get_peer_routes.side_effect = Exception('DB error')
r = self.client.get('/api/routing/peers')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
class TestDeletePeerRoute(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.client = app.test_client()
@patch('app.routing_manager')
def test_delete_peer_route_returns_200_on_success(self, mock_rm):
mock_rm.remove_peer_route.return_value = {'removed': True}
r = self.client.delete('/api/routing/peers/alice')
self.assertEqual(r.status_code, 200)
@patch('app.routing_manager')
def test_delete_peer_route_calls_manager_with_name(self, mock_rm):
mock_rm.remove_peer_route.return_value = {'removed': True}
self.client.delete('/api/routing/peers/bob')
mock_rm.remove_peer_route.assert_called_once_with('bob')
@patch('app.routing_manager')
def test_delete_peer_route_returns_500_on_exception(self, mock_rm):
mock_rm.remove_peer_route.side_effect = Exception('iptables flush error')
r = self.client.delete('/api/routing/peers/alice')
self.assertEqual(r.status_code, 500)
self.assertIn('error', json.loads(r.data))
if __name__ == '__main__':
unittest.main()