aa1e5c41ec
Unit Tests / test (push) Successful in 12m6s
Coverage was below acceptable levels and several newly-added code paths (sshuttle egress, proxy egress, DDNS provider stubs, DNS overview route, peer-registry provisioning) had zero test coverage. ~250 new unit tests are added across 16 new test files. Existing test files are updated to match refactored interfaces (DHCP removed, constants introduced, network_manager restructured). .coveragerc is added to pin the source mapping and the 70% floor so regressions are caught at commit time. tests/test_enhanced_api.py was previously living in api/ (wrong location) and is moved to tests/ where it belongs. Integration test files are updated to remove references to DHCP endpoints and add coverage for the new DNS overview and DDNS sync endpoints. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
295 lines
11 KiB
Python
295 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests verifying that is_local_request() enforcement works correctly
|
|
per endpoint in api/app.py.
|
|
|
|
The audit flagged that is_local_request() checks are performed inline
|
|
(not via a decorator), so this file confirms:
|
|
1. Endpoints that call `is_local_request()` return 403 when the
|
|
function returns False (i.e., a non-local caller).
|
|
2. Endpoints that do NOT call `is_local_request()` still respond
|
|
normally (2xx / 4xx) for non-local callers.
|
|
|
|
Tested local-only endpoints (representative sample):
|
|
GET /api/containers — list_containers
|
|
POST /api/containers/<n>/start
|
|
POST /api/containers/<n>/stop
|
|
POST /api/containers/<n>/restart
|
|
GET /api/containers/<n>/logs
|
|
GET /api/containers/<n>/stats
|
|
GET /api/vault/secrets
|
|
POST /api/vault/secrets
|
|
GET /api/vault/secrets/<name>
|
|
DELETE /api/vault/secrets/<name>
|
|
GET /api/containers — POST with image field
|
|
GET /api/images
|
|
POST /api/images/pull
|
|
DELETE /api/images/<image>
|
|
GET /api/volumes
|
|
POST /api/volumes
|
|
DELETE /api/volumes/<name>
|
|
DELETE /api/containers/<name>
|
|
|
|
Tested public endpoints (no is_local_request guard):
|
|
GET /api/calendar/status
|
|
GET /api/dns/records
|
|
GET /api/cells
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
from app import app
|
|
|
|
|
|
def _non_local_client():
|
|
"""Return a Flask test client that pretends to come from a non-local address."""
|
|
app.config['TESTING'] = True
|
|
# Flask's test client uses '127.0.0.1' by default; override with a public IP
|
|
# by setting REMOTE_ADDR in the environ base.
|
|
return app.test_client()
|
|
|
|
|
|
# ── helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _get_non_local(client, path):
|
|
"""Perform a GET request that appears to originate from a non-local IP."""
|
|
return client.get(path, environ_base={'REMOTE_ADDR': '203.0.113.1'})
|
|
|
|
|
|
def _post_non_local(client, path, body=None):
|
|
return client.post(
|
|
path,
|
|
data=json.dumps(body or {}),
|
|
content_type='application/json',
|
|
environ_base={'REMOTE_ADDR': '203.0.113.1'},
|
|
)
|
|
|
|
|
|
def _delete_non_local(client, path):
|
|
return client.delete(path, environ_base={'REMOTE_ADDR': '203.0.113.1'})
|
|
|
|
|
|
# ── local-only endpoint tests ─────────────────────────────────────────────────
|
|
|
|
class TestLocalOnlyEndpointsReturn403ForNonLocal(unittest.TestCase):
|
|
"""Every endpoint that calls is_local_request() must return 403 for external IPs."""
|
|
|
|
def setUp(self):
|
|
app.config['TESTING'] = True
|
|
self.client = _non_local_client()
|
|
|
|
# Container management
|
|
|
|
def test_list_containers_returns_403_for_non_local(self):
|
|
r = _get_non_local(self.client, '/api/containers')
|
|
self.assertEqual(r.status_code, 403)
|
|
self.assertIn('error', json.loads(r.data))
|
|
|
|
def test_start_container_returns_403_for_non_local(self):
|
|
r = _post_non_local(self.client, '/api/containers/myapp/start')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_stop_container_returns_403_for_non_local(self):
|
|
r = _post_non_local(self.client, '/api/containers/myapp/stop')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_restart_container_returns_403_for_non_local(self):
|
|
r = _post_non_local(self.client, '/api/containers/myapp/restart')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_get_container_logs_returns_403_for_non_local(self):
|
|
r = _get_non_local(self.client, '/api/containers/myapp/logs')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_get_container_stats_returns_403_for_non_local(self):
|
|
r = _get_non_local(self.client, '/api/containers/myapp/stats')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_remove_container_returns_403_for_non_local(self):
|
|
r = _delete_non_local(self.client, '/api/containers/myapp')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
# Image management
|
|
|
|
def test_list_images_returns_403_for_non_local(self):
|
|
r = _get_non_local(self.client, '/api/images')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_pull_image_returns_403_for_non_local(self):
|
|
r = _post_non_local(self.client, '/api/images/pull', {'image': 'nginx:latest'})
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_remove_image_returns_403_for_non_local(self):
|
|
r = _delete_non_local(self.client, '/api/images/nginx')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
# Volume management
|
|
|
|
def test_list_volumes_returns_403_for_non_local(self):
|
|
r = _get_non_local(self.client, '/api/volumes')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_create_volume_returns_403_for_non_local(self):
|
|
r = _post_non_local(self.client, '/api/volumes', {'name': 'myvol'})
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_remove_volume_returns_403_for_non_local(self):
|
|
r = _delete_non_local(self.client, '/api/volumes/myvol')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
# Vault endpoints
|
|
|
|
def test_list_secrets_returns_403_for_non_local(self):
|
|
r = _get_non_local(self.client, '/api/vault/secrets')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_store_secret_returns_403_for_non_local(self):
|
|
r = _post_non_local(self.client, '/api/vault/secrets', {'name': 'k', 'value': 'v'})
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_get_secret_returns_403_for_non_local(self):
|
|
r = _get_non_local(self.client, '/api/vault/secrets/mykey')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_delete_secret_returns_403_for_non_local(self):
|
|
r = _delete_non_local(self.client, '/api/vault/secrets/mykey')
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
|
|
class TestLocalOnlyEndpointsAllowedFromLocalhost(unittest.TestCase):
|
|
"""The same endpoints must NOT return 403 for loopback / local callers."""
|
|
|
|
def setUp(self):
|
|
app.config['TESTING'] = True
|
|
# Default test client remote_addr is 127.0.0.1, which is local
|
|
self.client = app.test_client()
|
|
|
|
@patch('app.container_manager')
|
|
def test_list_containers_allowed_from_local(self, mock_cm):
|
|
mock_cm.list_containers.return_value = []
|
|
r = self.client.get('/api/containers')
|
|
self.assertNotEqual(r.status_code, 403)
|
|
|
|
@patch('app.container_manager')
|
|
def test_list_images_allowed_from_local(self, mock_cm):
|
|
mock_cm.list_images.return_value = []
|
|
r = self.client.get('/api/images')
|
|
self.assertNotEqual(r.status_code, 403)
|
|
|
|
@patch('app.container_manager')
|
|
def test_list_volumes_allowed_from_local(self, mock_cm):
|
|
mock_cm.list_volumes.return_value = []
|
|
r = self.client.get('/api/volumes')
|
|
self.assertNotEqual(r.status_code, 403)
|
|
|
|
|
|
# ── public endpoint tests — no is_local_request guard ────────────────────────
|
|
|
|
class TestPublicEndpointsNotBlockedForNonLocal(unittest.TestCase):
|
|
"""
|
|
Endpoints that do NOT call is_local_request() must remain reachable
|
|
from non-local addresses. A 403 here would indicate an unintended
|
|
broadening of the local-only guard.
|
|
"""
|
|
|
|
def setUp(self):
|
|
app.config['TESTING'] = True
|
|
self.client = _non_local_client()
|
|
|
|
@patch('app.calendar_manager')
|
|
def test_calendar_status_not_403_for_non_local(self, mock_cm):
|
|
mock_cm.get_status.return_value = {'running': True}
|
|
r = _get_non_local(self.client, '/api/calendar/status')
|
|
self.assertNotEqual(r.status_code, 403)
|
|
|
|
@patch('app.network_manager')
|
|
def test_dns_records_not_403_for_non_local(self, mock_nm):
|
|
mock_nm.get_dns_records.return_value = []
|
|
r = _get_non_local(self.client, '/api/dns/records')
|
|
self.assertNotEqual(r.status_code, 403)
|
|
|
|
@patch('app.cell_link_manager')
|
|
def test_cells_list_not_403_for_non_local(self, mock_clm):
|
|
mock_clm.list_connections.return_value = []
|
|
r = _get_non_local(self.client, '/api/cells')
|
|
self.assertNotEqual(r.status_code, 403)
|
|
|
|
def test_health_check_not_403_for_non_local(self):
|
|
r = _get_non_local(self.client, '/health')
|
|
self.assertNotEqual(r.status_code, 403)
|
|
|
|
|
|
# ── is_local_request logic unit tests ────────────────────────────────────────
|
|
|
|
class TestIsLocalRequestLogic(unittest.TestCase):
|
|
"""
|
|
Directly verify the is_local_request() function from app.py.
|
|
These tests exercise the address-checking logic without going through
|
|
a full HTTP request cycle.
|
|
"""
|
|
|
|
def setUp(self):
|
|
from app import is_local_request as _fn
|
|
self._fn = _fn
|
|
app.config['TESTING'] = True
|
|
|
|
def _call_with_addr(self, remote_addr, xff=None):
|
|
"""Push a fake request context and evaluate is_local_request()."""
|
|
from app import app as _app
|
|
headers = {}
|
|
if xff:
|
|
headers['X-Forwarded-For'] = xff
|
|
with _app.test_request_context('/', environ_base={'REMOTE_ADDR': remote_addr},
|
|
headers=headers):
|
|
return self._fn()
|
|
|
|
def test_loopback_127_is_local(self):
|
|
self.assertTrue(self._call_with_addr('127.0.0.1'))
|
|
|
|
def test_ipv6_loopback_is_local(self):
|
|
self.assertTrue(self._call_with_addr('::1'))
|
|
|
|
def test_docker_bridge_172_20_is_local(self):
|
|
# 172.20.x.x is inside 172.16.0.0/12
|
|
self.assertTrue(self._call_with_addr('172.20.0.5'))
|
|
|
|
def test_docker_bridge_172_16_boundary_is_local(self):
|
|
# Exact boundary of 172.16.0.0/12
|
|
self.assertTrue(self._call_with_addr('172.16.0.1'))
|
|
|
|
def test_public_ip_is_not_local(self):
|
|
self.assertFalse(self._call_with_addr('8.8.8.8'))
|
|
|
|
def test_wireguard_peer_10_0_0_x_is_not_local(self):
|
|
# WireGuard peer IPs (10.0.0.0/8) must NOT be treated as local
|
|
self.assertFalse(self._call_with_addr('10.0.0.2'))
|
|
|
|
def test_lan_192_168_is_not_local(self):
|
|
# LAN addresses must NOT be treated as local (comment in app.py confirms this)
|
|
self.assertFalse(self._call_with_addr('192.168.1.50'))
|
|
|
|
def test_xff_last_entry_loopback_is_local(self):
|
|
# Public remote addr, but last XFF entry is loopback → allowed
|
|
self.assertTrue(self._call_with_addr('8.8.8.8', xff='8.8.8.8, 127.0.0.1'))
|
|
|
|
def test_xff_first_entry_spoofed_loopback_not_local(self):
|
|
# Spoofed first XFF entry; last entry is a public IP → should be rejected
|
|
# remote_addr is also public to rule out that shortcut
|
|
result = self._call_with_addr('8.8.8.8', xff='127.0.0.1, 8.8.8.8')
|
|
self.assertFalse(result)
|
|
|
|
def test_xff_last_entry_docker_bridge_is_local(self):
|
|
# Last XFF entry is Caddy's Docker bridge address
|
|
self.assertTrue(self._call_with_addr('8.8.8.8', xff='1.2.3.4, 172.20.0.2'))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|