fc3cfc9741
- api/app.py: email/calendar/files provisioning now best-effort (non-fatal); fixed email_manager.create_email_user call to include domain argument - tests/integration: added module-level auth sessions to all integration test files; added admin auth to api fixture and _resolve_admin_pass() helper; added TEST_PEER_PASSWORD constant; added password to peer creation calls - tests/test_peer_provisioning.py: renamed rollback test to reflect new best-effort semantics (email failure no longer causes rollback) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
349 lines
13 KiB
Python
349 lines
13 KiB
Python
"""
|
|
Negative and error-path integration tests.
|
|
|
|
These tests verify that the API:
|
|
1. Rejects malformed or missing inputs with appropriate 4xx status codes
|
|
2. Returns JSON with an 'error' key on failure (never a raw exception traceback)
|
|
3. Returns 404 (or a 200 with a "not found" message) for unknown resource IDs
|
|
4. Does not crash (500) on bad Content-Type or oversized payloads
|
|
|
|
Endpoints covered:
|
|
- /api/peers (POST, PUT, DELETE)
|
|
- /api/config (PUT)
|
|
- /api/dns/records (DELETE)
|
|
- /api/dhcp/reservations (POST, DELETE)
|
|
- /api/containers/<name>/restart
|
|
- /api/wireguard/keys/peer
|
|
|
|
Run with: pytest tests/integration/test_negative_scenarios.py -v
|
|
"""
|
|
import pytest
|
|
import requests
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
from conftest import API_BASE, _resolve_admin_pass
|
|
|
|
# Sentinel peer name that should never exist in the registry
|
|
_GHOST_PEER = 'ghost-peer-that-does-not-exist-xyz'
|
|
_GHOST_CONTAINER = 'cell-container-does-not-exist-xyz'
|
|
|
|
|
|
|
|
_S = None
|
|
|
|
@pytest.fixture(scope='module', autouse=True)
|
|
def _auth_session():
|
|
global _S
|
|
_S = requests.Session()
|
|
_S.headers['Content-Type'] = 'application/json'
|
|
r = _S.post(f"{API_BASE}/api/auth/login",
|
|
json={'username': 'admin', 'password': _resolve_admin_pass()})
|
|
assert r.status_code == 200, f"Login failed: {{r.text}}"
|
|
|
|
def get(path, **kw):
|
|
return _S.get(f"{API_BASE}{path}", **kw)
|
|
|
|
|
|
def post(path, **kw):
|
|
return _S.post(f"{API_BASE}{path}", **kw)
|
|
|
|
|
|
def put(path, **kw):
|
|
return _S.put(f"{API_BASE}{path}", **kw)
|
|
|
|
|
|
def delete(path, **kw):
|
|
return _S.delete(f"{API_BASE}{path}", **kw)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _assert_error_response(r, expected_status):
|
|
"""Assert status code and that the body is valid JSON containing 'error'."""
|
|
assert r.status_code == expected_status, (
|
|
f"Expected {expected_status}, got {r.status_code}: {r.text}"
|
|
)
|
|
data = r.json()
|
|
assert 'error' in data, f"Expected 'error' key in response body: {data}"
|
|
|
|
|
|
def _assert_json_error(r):
|
|
"""Assert that whatever the status code, the body is JSON and has 'error'."""
|
|
body = r.json()
|
|
assert 'error' in body, f"Expected 'error' key in error response body: {body}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Peer endpoints — missing / invalid fields
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPeerNegative:
|
|
def test_create_peer_missing_name_returns_400(self):
|
|
r = post('/api/peers', json={'public_key': 'somefakekey=='})
|
|
_assert_error_response(r, 400)
|
|
|
|
def test_create_peer_missing_public_key_returns_400(self):
|
|
r = post('/api/peers', json={'name': _GHOST_PEER})
|
|
_assert_error_response(r, 400)
|
|
|
|
def test_create_peer_empty_body_returns_400(self):
|
|
r = requests.post(
|
|
f"{API_BASE}/api/peers",
|
|
data='',
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
def test_create_peer_invalid_service_access_returns_400(self):
|
|
r = post('/api/peers', json={
|
|
'name': _GHOST_PEER,
|
|
'public_key': 'somefakekey==',
|
|
'service_access': ['not_a_real_service'],
|
|
})
|
|
_assert_error_response(r, 400)
|
|
|
|
def test_create_peer_service_access_not_a_list_returns_400(self):
|
|
r = post('/api/peers', json={
|
|
'name': _GHOST_PEER,
|
|
'public_key': 'somefakekey==',
|
|
'service_access': 'calendar', # string instead of list
|
|
})
|
|
_assert_error_response(r, 400)
|
|
|
|
def test_update_nonexistent_peer_returns_404(self):
|
|
r = put(f'/api/peers/{_GHOST_PEER}', json={'service_access': ['calendar']})
|
|
assert r.status_code == 404
|
|
_assert_json_error(r)
|
|
|
|
def test_delete_nonexistent_peer_returns_200_with_message(self):
|
|
# app.py returns 200 + a "not found" message (not 404) for idempotent deletes
|
|
r = delete(f'/api/peers/{_GHOST_PEER}')
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
# Should have 'message', not 'error'
|
|
assert 'message' in data
|
|
assert 'not found' in data['message'].lower() or 'removed' in data['message'].lower()
|
|
|
|
def test_create_peer_plain_text_body_returns_400(self):
|
|
"""Sending plain text instead of JSON should produce a 400."""
|
|
r = requests.post(
|
|
f"{API_BASE}/api/peers",
|
|
data='name=foo&public_key=bar',
|
|
headers={'Content-Type': 'text/plain'},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config endpoint — bad JSON, bad values
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestConfigNegative:
|
|
def test_put_config_null_body_returns_400(self):
|
|
r = requests.put(
|
|
f"{API_BASE}/api/config",
|
|
data='null',
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
def test_put_config_completely_invalid_json_returns_400(self):
|
|
r = requests.put(
|
|
f"{API_BASE}/api/config",
|
|
data='{bad json}}}',
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
def test_put_config_ip_range_public_address_returns_400(self):
|
|
r = put('/api/config', json={'ip_range': '203.0.113.0/24'})
|
|
assert r.status_code == 400
|
|
_assert_json_error(r)
|
|
|
|
def test_put_config_ip_range_172_boundary_just_below_rejected(self):
|
|
# 172.15.0.0/24 is just below 172.16.0.0/12 — must be rejected
|
|
r = put('/api/config', json={'ip_range': '172.15.0.0/24'})
|
|
assert r.status_code == 400
|
|
|
|
def test_put_config_ip_range_172_boundary_just_inside_accepted(self):
|
|
# 172.16.0.0/24 is within 172.16.0.0/12 — must be accepted
|
|
current = get('/api/config').json()
|
|
current_range = current['ip_range']
|
|
try:
|
|
r = put('/api/config', json={'ip_range': '172.16.0.0/24'})
|
|
assert r.status_code == 200, (
|
|
f"172.16.0.0/24 is valid RFC-1918 but was rejected: {r.text}"
|
|
)
|
|
finally:
|
|
put('/api/config', json={'ip_range': current_range})
|
|
|
|
def test_put_config_port_string_value_returns_400(self):
|
|
r = put('/api/config', json={'calendar': {'port': 'not-a-number'}})
|
|
assert r.status_code == 400
|
|
|
|
def test_put_config_port_boundary_65535_accepted(self):
|
|
# 65535 is the maximum valid port — must not return 400
|
|
# Use a port field that is unlikely to conflict with existing ports
|
|
# We test the validation boundary only; we do not actually apply this
|
|
# port because that would require a container restart.
|
|
# NOTE: this may conflict with another service's port; accept 409 too.
|
|
r = put('/api/config', json={'calendar': {'port': 65535}})
|
|
assert r.status_code in (200, 409), (
|
|
f"Expected 200 or 409 for port=65535, got {r.status_code}: {r.text}"
|
|
)
|
|
|
|
def test_put_config_port_boundary_1_accepted(self):
|
|
r = put('/api/config', json={'calendar': {'port': 1}})
|
|
assert r.status_code in (200, 409), (
|
|
f"Expected 200 or 409 for port=1, got {r.status_code}: {r.text}"
|
|
)
|
|
|
|
def test_put_config_wireguard_address_bare_ip_returns_400(self):
|
|
r = put('/api/config', json={'wireguard': {'address': '10.0.0.1'}})
|
|
assert r.status_code == 400
|
|
|
|
def test_put_config_oversized_cell_name_does_not_crash(self):
|
|
"""A very long cell_name should not cause an unhandled 500."""
|
|
long_name = 'a' * 2048
|
|
r = put('/api/config', json={'cell_name': long_name})
|
|
# We don't mandate 400 here (the API may accept it), but it must not 500.
|
|
assert r.status_code != 500, (
|
|
f"Oversized cell_name caused a 500: {r.text}"
|
|
)
|
|
r.json() # must be valid JSON
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DNS records — negative
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDnsRecordsNegative:
|
|
def test_delete_dns_record_empty_body_does_not_crash(self):
|
|
"""Sending an empty JSON body to DELETE /api/dns/records must not 500."""
|
|
r = requests.delete(
|
|
f"{API_BASE}/api/dns/records",
|
|
json={},
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
# The endpoint calls network_manager.remove_dns_record(**{}) which will
|
|
# raise a TypeError; the API should catch it and return a 500 OR a 400.
|
|
assert r.status_code in (400, 500)
|
|
r.json() # must still be parseable JSON
|
|
|
|
def test_delete_dns_record_no_content_type_does_not_crash(self):
|
|
"""Sending DELETE with no body at all must return a parseable response."""
|
|
r = requests.delete(f"{API_BASE}/api/dns/records")
|
|
assert r.status_code in (200, 400, 404, 500)
|
|
r.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DHCP reservations — negative
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDhcpReservationsNegative:
|
|
def test_add_reservation_no_body_returns_400(self):
|
|
r = requests.post(
|
|
f"{API_BASE}/api/dhcp/reservations",
|
|
data='',
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
def test_add_reservation_missing_ip_returns_400(self):
|
|
r = post('/api/dhcp/reservations', json={'mac': 'aa:bb:cc:dd:ee:ff'})
|
|
assert r.status_code == 400
|
|
_assert_json_error(r)
|
|
|
|
def test_add_reservation_missing_mac_returns_400(self):
|
|
r = post('/api/dhcp/reservations', json={'ip': '10.0.0.250'})
|
|
assert r.status_code == 400
|
|
_assert_json_error(r)
|
|
|
|
def test_delete_reservation_no_mac_returns_400(self):
|
|
r = delete('/api/dhcp/reservations', json={'ip': '10.0.0.250'})
|
|
assert r.status_code == 400
|
|
_assert_json_error(r)
|
|
|
|
def test_delete_reservation_empty_body_returns_400(self):
|
|
r = requests.delete(
|
|
f"{API_BASE}/api/dhcp/reservations",
|
|
data='',
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Container endpoints — negative
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestContainersNegative:
|
|
def test_restart_nonexistent_container_returns_error(self):
|
|
r = post(f'/api/containers/{_GHOST_CONTAINER}/restart')
|
|
# 403 = local-only endpoint; 404/500 = not found; 200 with restarted=False = ok
|
|
assert r.status_code in (200, 403, 404, 500)
|
|
if r.status_code == 200:
|
|
assert r.json().get('restarted') is not True
|
|
r.json() # must be valid JSON
|
|
|
|
def test_get_logs_nonexistent_container_returns_error(self):
|
|
r = get(f'/api/containers/{_GHOST_CONTAINER}/logs')
|
|
assert r.status_code in (200, 403, 404, 500)
|
|
if r.status_code == 200:
|
|
data = r.json()
|
|
assert 'error' in data or not data.get('logs')
|
|
r.json()
|
|
|
|
def test_get_stats_nonexistent_container_returns_json(self):
|
|
r = get(f'/api/containers/{_GHOST_CONTAINER}/stats')
|
|
assert r.status_code in (200, 403, 404, 500)
|
|
r.json() # must always be parseable
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WireGuard key generation — negative
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestWireGuardKeyGenNegative:
|
|
def test_generate_keys_empty_body_returns_400(self):
|
|
r = requests.post(
|
|
f"{API_BASE}/api/wireguard/keys/peer",
|
|
json={},
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
assert r.status_code == 400
|
|
_assert_json_error(r)
|
|
|
|
def test_generate_keys_missing_name_returns_400(self):
|
|
r = post('/api/wireguard/keys/peer', json={'other_field': 'value'})
|
|
assert r.status_code == 400
|
|
|
|
def test_generate_keys_null_name_returns_400(self):
|
|
r = post('/api/wireguard/keys/peer', json={'name': None})
|
|
assert r.status_code == 400
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Generic: all non-existent URL paths return 404 (Flask default)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestNotFoundRoutes:
|
|
def test_unknown_api_path_returns_404(self):
|
|
r = get('/api/this-route-does-not-exist-at-all')
|
|
assert r.status_code == 404
|
|
|
|
def test_peer_detail_nonexistent_returns_404(self):
|
|
# GET is not defined for /api/peers/<peer_name> in app.py —
|
|
# only PUT and DELETE exist. Flask should return 405 Method Not Allowed.
|
|
r = get(f'/api/peers/{_GHOST_PEER}')
|
|
assert r.status_code in (404, 405)
|
|
|
|
def test_update_nonexistent_peer_gives_404_not_500(self):
|
|
r = put(f'/api/peers/{_GHOST_PEER}', json={'description': 'test'})
|
|
assert r.status_code == 404
|
|
r.json() # must be valid JSON with 'error' key
|