#!/usr/bin/env python3 """ Tests verifying that is_local_request() enforcement works correctly per endpoint in api/app.py. The audit flagged that is_local_request() checks are performed inline (not via a decorator), so this file confirms: 1. Endpoints that call `is_local_request()` return 403 when the function returns False (i.e., a non-local caller). 2. Endpoints that do NOT call `is_local_request()` still respond normally (2xx / 4xx) for non-local callers. Tested local-only endpoints (representative sample): GET /api/containers — list_containers POST /api/containers//start POST /api/containers//stop POST /api/containers//restart GET /api/containers//logs GET /api/containers//stats GET /api/vault/secrets POST /api/vault/secrets GET /api/vault/secrets/ DELETE /api/vault/secrets/ GET /api/containers — POST with image field GET /api/images POST /api/images/pull DELETE /api/images/ GET /api/volumes POST /api/volumes DELETE /api/volumes/ DELETE /api/containers/ Tested public endpoints (no is_local_request guard): GET /api/calendar/status GET /api/dns/records GET /api/dhcp/leases GET /api/cells """ import sys import json import unittest from pathlib import Path from unittest.mock import patch, MagicMock api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) from app import app def _non_local_client(): """Return a Flask test client that pretends to come from a non-local address.""" app.config['TESTING'] = True # Flask's test client uses '127.0.0.1' by default; override with a public IP # by setting REMOTE_ADDR in the environ base. return app.test_client() # ── helpers ─────────────────────────────────────────────────────────────────── def _get_non_local(client, path): """Perform a GET request that appears to originate from a non-local IP.""" return client.get(path, environ_base={'REMOTE_ADDR': '203.0.113.1'}) def _post_non_local(client, path, body=None): return client.post( path, data=json.dumps(body or {}), content_type='application/json', environ_base={'REMOTE_ADDR': '203.0.113.1'}, ) def _delete_non_local(client, path): return client.delete(path, environ_base={'REMOTE_ADDR': '203.0.113.1'}) # ── local-only endpoint tests ───────────────────────────────────────────────── class TestLocalOnlyEndpointsReturn403ForNonLocal(unittest.TestCase): """Every endpoint that calls is_local_request() must return 403 for external IPs.""" def setUp(self): app.config['TESTING'] = True self.client = _non_local_client() # Container management def test_list_containers_returns_403_for_non_local(self): r = _get_non_local(self.client, '/api/containers') self.assertEqual(r.status_code, 403) self.assertIn('error', json.loads(r.data)) def test_start_container_returns_403_for_non_local(self): r = _post_non_local(self.client, '/api/containers/myapp/start') self.assertEqual(r.status_code, 403) def test_stop_container_returns_403_for_non_local(self): r = _post_non_local(self.client, '/api/containers/myapp/stop') self.assertEqual(r.status_code, 403) def test_restart_container_returns_403_for_non_local(self): r = _post_non_local(self.client, '/api/containers/myapp/restart') self.assertEqual(r.status_code, 403) def test_get_container_logs_returns_403_for_non_local(self): r = _get_non_local(self.client, '/api/containers/myapp/logs') self.assertEqual(r.status_code, 403) def test_get_container_stats_returns_403_for_non_local(self): r = _get_non_local(self.client, '/api/containers/myapp/stats') self.assertEqual(r.status_code, 403) def test_remove_container_returns_403_for_non_local(self): r = _delete_non_local(self.client, '/api/containers/myapp') self.assertEqual(r.status_code, 403) # Image management def test_list_images_returns_403_for_non_local(self): r = _get_non_local(self.client, '/api/images') self.assertEqual(r.status_code, 403) def test_pull_image_returns_403_for_non_local(self): r = _post_non_local(self.client, '/api/images/pull', {'image': 'nginx:latest'}) self.assertEqual(r.status_code, 403) def test_remove_image_returns_403_for_non_local(self): r = _delete_non_local(self.client, '/api/images/nginx') self.assertEqual(r.status_code, 403) # Volume management def test_list_volumes_returns_403_for_non_local(self): r = _get_non_local(self.client, '/api/volumes') self.assertEqual(r.status_code, 403) def test_create_volume_returns_403_for_non_local(self): r = _post_non_local(self.client, '/api/volumes', {'name': 'myvol'}) self.assertEqual(r.status_code, 403) def test_remove_volume_returns_403_for_non_local(self): r = _delete_non_local(self.client, '/api/volumes/myvol') self.assertEqual(r.status_code, 403) # Vault endpoints def test_list_secrets_returns_403_for_non_local(self): r = _get_non_local(self.client, '/api/vault/secrets') self.assertEqual(r.status_code, 403) def test_store_secret_returns_403_for_non_local(self): r = _post_non_local(self.client, '/api/vault/secrets', {'name': 'k', 'value': 'v'}) self.assertEqual(r.status_code, 403) def test_get_secret_returns_403_for_non_local(self): r = _get_non_local(self.client, '/api/vault/secrets/mykey') self.assertEqual(r.status_code, 403) def test_delete_secret_returns_403_for_non_local(self): r = _delete_non_local(self.client, '/api/vault/secrets/mykey') self.assertEqual(r.status_code, 403) class TestLocalOnlyEndpointsAllowedFromLocalhost(unittest.TestCase): """The same endpoints must NOT return 403 for loopback / local callers.""" def setUp(self): app.config['TESTING'] = True # Default test client remote_addr is 127.0.0.1, which is local self.client = app.test_client() @patch('app.container_manager') def test_list_containers_allowed_from_local(self, mock_cm): mock_cm.list_containers.return_value = [] r = self.client.get('/api/containers') self.assertNotEqual(r.status_code, 403) @patch('app.container_manager') def test_list_images_allowed_from_local(self, mock_cm): mock_cm.list_images.return_value = [] r = self.client.get('/api/images') self.assertNotEqual(r.status_code, 403) @patch('app.container_manager') def test_list_volumes_allowed_from_local(self, mock_cm): mock_cm.list_volumes.return_value = [] r = self.client.get('/api/volumes') self.assertNotEqual(r.status_code, 403) # ── public endpoint tests — no is_local_request guard ──────────────────────── class TestPublicEndpointsNotBlockedForNonLocal(unittest.TestCase): """ Endpoints that do NOT call is_local_request() must remain reachable from non-local addresses. A 403 here would indicate an unintended broadening of the local-only guard. """ def setUp(self): app.config['TESTING'] = True self.client = _non_local_client() @patch('app.calendar_manager') def test_calendar_status_not_403_for_non_local(self, mock_cm): mock_cm.get_status.return_value = {'running': True} r = _get_non_local(self.client, '/api/calendar/status') self.assertNotEqual(r.status_code, 403) @patch('app.network_manager') def test_dns_records_not_403_for_non_local(self, mock_nm): mock_nm.get_dns_records.return_value = [] r = _get_non_local(self.client, '/api/dns/records') self.assertNotEqual(r.status_code, 403) @patch('app.network_manager') def test_dhcp_leases_not_403_for_non_local(self, mock_nm): mock_nm.get_dhcp_leases.return_value = [] r = _get_non_local(self.client, '/api/dhcp/leases') self.assertNotEqual(r.status_code, 403) @patch('app.cell_link_manager') def test_cells_list_not_403_for_non_local(self, mock_clm): mock_clm.list_connections.return_value = [] r = _get_non_local(self.client, '/api/cells') self.assertNotEqual(r.status_code, 403) def test_health_check_not_403_for_non_local(self): r = _get_non_local(self.client, '/health') self.assertNotEqual(r.status_code, 403) # ── is_local_request logic unit tests ──────────────────────────────────────── class TestIsLocalRequestLogic(unittest.TestCase): """ Directly verify the is_local_request() function from app.py. These tests exercise the address-checking logic without going through a full HTTP request cycle. """ def setUp(self): from app import is_local_request as _fn self._fn = _fn app.config['TESTING'] = True def _call_with_addr(self, remote_addr, xff=None): """Push a fake request context and evaluate is_local_request().""" from app import app as _app headers = {} if xff: headers['X-Forwarded-For'] = xff with _app.test_request_context('/', environ_base={'REMOTE_ADDR': remote_addr}, headers=headers): return self._fn() def test_loopback_127_is_local(self): self.assertTrue(self._call_with_addr('127.0.0.1')) def test_ipv6_loopback_is_local(self): self.assertTrue(self._call_with_addr('::1')) def test_docker_bridge_172_20_is_local(self): # 172.20.x.x is inside 172.16.0.0/12 self.assertTrue(self._call_with_addr('172.20.0.5')) def test_docker_bridge_172_16_boundary_is_local(self): # Exact boundary of 172.16.0.0/12 self.assertTrue(self._call_with_addr('172.16.0.1')) def test_public_ip_is_not_local(self): self.assertFalse(self._call_with_addr('8.8.8.8')) def test_wireguard_peer_10_0_0_x_is_not_local(self): # WireGuard peer IPs (10.0.0.0/8) must NOT be treated as local self.assertFalse(self._call_with_addr('10.0.0.2')) def test_lan_192_168_is_not_local(self): # LAN addresses must NOT be treated as local (comment in app.py confirms this) self.assertFalse(self._call_with_addr('192.168.1.50')) def test_xff_last_entry_loopback_is_local(self): # Public remote addr, but last XFF entry is loopback → allowed self.assertTrue(self._call_with_addr('8.8.8.8', xff='8.8.8.8, 127.0.0.1')) def test_xff_first_entry_spoofed_loopback_not_local(self): # Spoofed first XFF entry; last entry is a public IP → should be rejected # remote_addr is also public to rule out that shortcut result = self._call_with_addr('8.8.8.8', xff='127.0.0.1, 8.8.8.8') self.assertFalse(result) def test_xff_last_entry_docker_bridge_is_local(self): # Last XFF entry is Caddy's Docker bridge address self.assertTrue(self._call_with_addr('8.8.8.8', xff='1.2.3.4, 172.20.0.2')) if __name__ == '__main__': unittest.main()