Files
pic/tests/test_is_local_request_per_endpoint.py
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

302 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Tests verifying that is_local_request() enforcement works correctly
per endpoint in api/app.py.
The audit flagged that is_local_request() checks are performed inline
(not via a decorator), so this file confirms:
1. Endpoints that call `is_local_request()` return 403 when the
function returns False (i.e., a non-local caller).
2. Endpoints that do NOT call `is_local_request()` still respond
normally (2xx / 4xx) for non-local callers.
Tested local-only endpoints (representative sample):
GET /api/containers — list_containers
POST /api/containers/<n>/start
POST /api/containers/<n>/stop
POST /api/containers/<n>/restart
GET /api/containers/<n>/logs
GET /api/containers/<n>/stats
GET /api/vault/secrets
POST /api/vault/secrets
GET /api/vault/secrets/<name>
DELETE /api/vault/secrets/<name>
GET /api/containers — POST with image field
GET /api/images
POST /api/images/pull
DELETE /api/images/<image>
GET /api/volumes
POST /api/volumes
DELETE /api/volumes/<name>
DELETE /api/containers/<name>
Tested public endpoints (no is_local_request guard):
GET /api/calendar/status
GET /api/dns/records
GET /api/dhcp/leases
GET /api/cells
"""
import sys
import json
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
api_dir = Path(__file__).parent.parent / 'api'
sys.path.insert(0, str(api_dir))
from app import app
def _non_local_client():
"""Return a Flask test client that pretends to come from a non-local address."""
app.config['TESTING'] = True
# Flask's test client uses '127.0.0.1' by default; override with a public IP
# by setting REMOTE_ADDR in the environ base.
return app.test_client()
# ── helpers ───────────────────────────────────────────────────────────────────
def _get_non_local(client, path):
"""Perform a GET request that appears to originate from a non-local IP."""
return client.get(path, environ_base={'REMOTE_ADDR': '203.0.113.1'})
def _post_non_local(client, path, body=None):
return client.post(
path,
data=json.dumps(body or {}),
content_type='application/json',
environ_base={'REMOTE_ADDR': '203.0.113.1'},
)
def _delete_non_local(client, path):
return client.delete(path, environ_base={'REMOTE_ADDR': '203.0.113.1'})
# ── local-only endpoint tests ─────────────────────────────────────────────────
class TestLocalOnlyEndpointsReturn403ForNonLocal(unittest.TestCase):
"""Every endpoint that calls is_local_request() must return 403 for external IPs."""
def setUp(self):
app.config['TESTING'] = True
self.client = _non_local_client()
# Container management
def test_list_containers_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/containers')
self.assertEqual(r.status_code, 403)
self.assertIn('error', json.loads(r.data))
def test_start_container_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/containers/myapp/start')
self.assertEqual(r.status_code, 403)
def test_stop_container_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/containers/myapp/stop')
self.assertEqual(r.status_code, 403)
def test_restart_container_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/containers/myapp/restart')
self.assertEqual(r.status_code, 403)
def test_get_container_logs_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/containers/myapp/logs')
self.assertEqual(r.status_code, 403)
def test_get_container_stats_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/containers/myapp/stats')
self.assertEqual(r.status_code, 403)
def test_remove_container_returns_403_for_non_local(self):
r = _delete_non_local(self.client, '/api/containers/myapp')
self.assertEqual(r.status_code, 403)
# Image management
def test_list_images_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/images')
self.assertEqual(r.status_code, 403)
def test_pull_image_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/images/pull', {'image': 'nginx:latest'})
self.assertEqual(r.status_code, 403)
def test_remove_image_returns_403_for_non_local(self):
r = _delete_non_local(self.client, '/api/images/nginx')
self.assertEqual(r.status_code, 403)
# Volume management
def test_list_volumes_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/volumes')
self.assertEqual(r.status_code, 403)
def test_create_volume_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/volumes', {'name': 'myvol'})
self.assertEqual(r.status_code, 403)
def test_remove_volume_returns_403_for_non_local(self):
r = _delete_non_local(self.client, '/api/volumes/myvol')
self.assertEqual(r.status_code, 403)
# Vault endpoints
def test_list_secrets_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/vault/secrets')
self.assertEqual(r.status_code, 403)
def test_store_secret_returns_403_for_non_local(self):
r = _post_non_local(self.client, '/api/vault/secrets', {'name': 'k', 'value': 'v'})
self.assertEqual(r.status_code, 403)
def test_get_secret_returns_403_for_non_local(self):
r = _get_non_local(self.client, '/api/vault/secrets/mykey')
self.assertEqual(r.status_code, 403)
def test_delete_secret_returns_403_for_non_local(self):
r = _delete_non_local(self.client, '/api/vault/secrets/mykey')
self.assertEqual(r.status_code, 403)
class TestLocalOnlyEndpointsAllowedFromLocalhost(unittest.TestCase):
"""The same endpoints must NOT return 403 for loopback / local callers."""
def setUp(self):
app.config['TESTING'] = True
# Default test client remote_addr is 127.0.0.1, which is local
self.client = app.test_client()
@patch('app.container_manager')
def test_list_containers_allowed_from_local(self, mock_cm):
mock_cm.list_containers.return_value = []
r = self.client.get('/api/containers')
self.assertNotEqual(r.status_code, 403)
@patch('app.container_manager')
def test_list_images_allowed_from_local(self, mock_cm):
mock_cm.list_images.return_value = []
r = self.client.get('/api/images')
self.assertNotEqual(r.status_code, 403)
@patch('app.container_manager')
def test_list_volumes_allowed_from_local(self, mock_cm):
mock_cm.list_volumes.return_value = []
r = self.client.get('/api/volumes')
self.assertNotEqual(r.status_code, 403)
# ── public endpoint tests — no is_local_request guard ────────────────────────
class TestPublicEndpointsNotBlockedForNonLocal(unittest.TestCase):
"""
Endpoints that do NOT call is_local_request() must remain reachable
from non-local addresses. A 403 here would indicate an unintended
broadening of the local-only guard.
"""
def setUp(self):
app.config['TESTING'] = True
self.client = _non_local_client()
@patch('app.calendar_manager')
def test_calendar_status_not_403_for_non_local(self, mock_cm):
mock_cm.get_status.return_value = {'running': True}
r = _get_non_local(self.client, '/api/calendar/status')
self.assertNotEqual(r.status_code, 403)
@patch('app.network_manager')
def test_dns_records_not_403_for_non_local(self, mock_nm):
mock_nm.get_dns_records.return_value = []
r = _get_non_local(self.client, '/api/dns/records')
self.assertNotEqual(r.status_code, 403)
@patch('app.network_manager')
def test_dhcp_leases_not_403_for_non_local(self, mock_nm):
mock_nm.get_dhcp_leases.return_value = []
r = _get_non_local(self.client, '/api/dhcp/leases')
self.assertNotEqual(r.status_code, 403)
@patch('app.cell_link_manager')
def test_cells_list_not_403_for_non_local(self, mock_clm):
mock_clm.list_connections.return_value = []
r = _get_non_local(self.client, '/api/cells')
self.assertNotEqual(r.status_code, 403)
def test_health_check_not_403_for_non_local(self):
r = _get_non_local(self.client, '/health')
self.assertNotEqual(r.status_code, 403)
# ── is_local_request logic unit tests ────────────────────────────────────────
class TestIsLocalRequestLogic(unittest.TestCase):
"""
Directly verify the is_local_request() function from app.py.
These tests exercise the address-checking logic without going through
a full HTTP request cycle.
"""
def setUp(self):
from app import is_local_request as _fn
self._fn = _fn
app.config['TESTING'] = True
def _call_with_addr(self, remote_addr, xff=None):
"""Push a fake request context and evaluate is_local_request()."""
from app import app as _app
headers = {}
if xff:
headers['X-Forwarded-For'] = xff
with _app.test_request_context('/', environ_base={'REMOTE_ADDR': remote_addr},
headers=headers):
return self._fn()
def test_loopback_127_is_local(self):
self.assertTrue(self._call_with_addr('127.0.0.1'))
def test_ipv6_loopback_is_local(self):
self.assertTrue(self._call_with_addr('::1'))
def test_docker_bridge_172_20_is_local(self):
# 172.20.x.x is inside 172.16.0.0/12
self.assertTrue(self._call_with_addr('172.20.0.5'))
def test_docker_bridge_172_16_boundary_is_local(self):
# Exact boundary of 172.16.0.0/12
self.assertTrue(self._call_with_addr('172.16.0.1'))
def test_public_ip_is_not_local(self):
self.assertFalse(self._call_with_addr('8.8.8.8'))
def test_wireguard_peer_10_0_0_x_is_not_local(self):
# WireGuard peer IPs (10.0.0.0/8) must NOT be treated as local
self.assertFalse(self._call_with_addr('10.0.0.2'))
def test_lan_192_168_is_not_local(self):
# LAN addresses must NOT be treated as local (comment in app.py confirms this)
self.assertFalse(self._call_with_addr('192.168.1.50'))
def test_xff_last_entry_loopback_is_local(self):
# Public remote addr, but last XFF entry is loopback → allowed
self.assertTrue(self._call_with_addr('8.8.8.8', xff='8.8.8.8, 127.0.0.1'))
def test_xff_first_entry_spoofed_loopback_not_local(self):
# Spoofed first XFF entry; last entry is a public IP → should be rejected
# remote_addr is also public to rule out that shortcut
result = self._call_with_addr('8.8.8.8', xff='127.0.0.1, 8.8.8.8')
self.assertFalse(result)
def test_xff_last_entry_docker_bridge_is_local(self):
# Last XFF entry is Caddy's Docker bridge address
self.assertTrue(self._call_with_addr('8.8.8.8', xff='1.2.3.4, 172.20.0.2'))
if __name__ == '__main__':
unittest.main()