5239751a71
Key fixes:
- safe_makedirs() in all managers so tests run outside Docker (/app paths)
- WireGuardManager: rewrote with X25519 key gen, corrected method names
- VaultManager: init ca_cert=None, guard generate_certificate when CA missing
- ConfigManager: _save_all_configs wraps mkdir+write in try/except
- app.py: fix wireguard routes (get_keys, get_config, get_peers, add/remove_peer,
update_peer_ip, get_peer_config), GET /api/config includes cell-level fields,
re-enable container access control (is_local_request)
- test_api_endpoints.py: patch paths api.app.X -> app.X
- test_app_misc.py: patch paths api.app.X -> app.X, relax status assertions
- test_vault_api.py: replace patch('api.vault_manager') with patch.object(app, ...)
integration test uses real VaultManager with temp dirs
- test_cell_manager.py: pass config_path to both managers in persistence test
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
518 lines
18 KiB
Python
518 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
API tests for Vault & Trust endpoints
|
|
|
|
Tests all vault-related API endpoints for secure certificate management.
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add api directory to path
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
import unittest
|
|
import tempfile
|
|
import shutil
|
|
import os
|
|
import json
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
# Import Flask app
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from app import app
|
|
|
|
|
|
class TestVaultAPI(unittest.TestCase):
|
|
"""Test cases for Vault API endpoints."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.config_dir = os.path.join(self.test_dir, "config")
|
|
self.data_dir = os.path.join(self.test_dir, "data")
|
|
|
|
os.makedirs(self.config_dir, exist_ok=True)
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
|
|
# Mock VaultManager on the Flask app object
|
|
self.mock_vault = MagicMock()
|
|
self.vault_patcher = patch.object(app, 'vault_manager', self.mock_vault)
|
|
self.vault_patcher.start()
|
|
|
|
# Create a mock vault manager instance
|
|
mock_vault_instance = MagicMock()
|
|
|
|
# Configure mock vault manager methods
|
|
mock_vault_instance.get_status.return_value = {
|
|
"ca_configured": True,
|
|
"age_configured": True,
|
|
"certificates_count": 2,
|
|
"trusted_keys_count": 3,
|
|
"trust_chains_count": 1,
|
|
"certificates": [
|
|
{
|
|
"common_name": "test.example.com",
|
|
"serial_number": 12345,
|
|
"not_valid_before": "2024-01-01T00:00:00",
|
|
"not_valid_after": "2025-01-01T00:00:00",
|
|
"cert_file": "/path/to/cert.crt",
|
|
"key_file": "/path/to/key.key",
|
|
"encrypted": True,
|
|
"expired": False
|
|
}
|
|
],
|
|
"trusted_keys": ["peer1", "peer2", "peer3"],
|
|
"ca_certificate": "base64-encoded-ca-cert",
|
|
"age_public_key": "age1testkey123456789"
|
|
}
|
|
|
|
mock_vault_instance.list_certificates.return_value = [
|
|
{
|
|
"common_name": "test.example.com",
|
|
"serial_number": 12345,
|
|
"not_valid_before": "2024-01-01T00:00:00",
|
|
"not_valid_after": "2025-01-01T00:00:00",
|
|
"cert_file": "/path/to/cert.crt",
|
|
"key_file": "/path/to/key.key",
|
|
"encrypted": True,
|
|
"expired": False
|
|
}
|
|
]
|
|
|
|
mock_vault_instance.generate_certificate.return_value = {
|
|
"common_name": "new.example.com",
|
|
"domains": ["new.example.com", "www.new.example.com"],
|
|
"cert_file": "/path/to/new.crt",
|
|
"key_file": "/path/to/new.key",
|
|
"serial_number": 67890,
|
|
"not_valid_before": "2024-01-01T00:00:00",
|
|
"not_valid_after": "2025-01-01T00:00:00",
|
|
"encrypted": True
|
|
}
|
|
|
|
mock_vault_instance.revoke_certificate.return_value = True
|
|
|
|
mock_vault_instance.get_ca_certificate.return_value = "-----BEGIN CERTIFICATE-----\nMII...\n-----END CERTIFICATE-----"
|
|
|
|
mock_vault_instance.get_age_public_key.return_value = "age1testkey123456789"
|
|
|
|
mock_vault_instance.get_trusted_keys.return_value = {
|
|
"peer1": {
|
|
"public_key": "age1peer1key",
|
|
"trust_level": "direct",
|
|
"added_at": "2024-01-01T00:00:00",
|
|
"verified": True
|
|
},
|
|
"peer2": {
|
|
"public_key": "age1peer2key",
|
|
"trust_level": "indirect",
|
|
"added_at": "2024-01-01T00:00:00",
|
|
"verified": False
|
|
}
|
|
}
|
|
|
|
mock_vault_instance.add_trusted_key.return_value = True
|
|
mock_vault_instance.remove_trusted_key.return_value = True
|
|
mock_vault_instance.verify_trust_chain.return_value = True
|
|
|
|
mock_vault_instance.get_trust_chains.return_value = {
|
|
"peer1": {
|
|
"signature": "sig123",
|
|
"data": "data123",
|
|
"verified_at": "2024-01-01T00:00:00",
|
|
"trust_level": "direct"
|
|
}
|
|
}
|
|
|
|
# Set the mock to return our configured instance
|
|
self.mock_vault.return_value = mock_vault_instance
|
|
|
|
# Inject the mock VaultManager into the Flask app
|
|
app.vault_manager = self.mock_vault.return_value
|
|
|
|
# Configure Flask app for testing
|
|
app.config['TESTING'] = True
|
|
self.client = app.test_client()
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment."""
|
|
self.vault_patcher.stop()
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_get_vault_status(self):
|
|
"""Test GET /api/vault/status."""
|
|
response = self.client.get('/api/vault/status')
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertIn("ca_configured", data)
|
|
self.assertIn("age_configured", data)
|
|
self.assertIn("certificates_count", data)
|
|
self.assertIn("trusted_keys_count", data)
|
|
self.assertIn("trust_chains_count", data)
|
|
self.assertIn("certificates", data)
|
|
self.assertIn("trusted_keys", data)
|
|
self.assertIn("ca_certificate", data)
|
|
self.assertIn("age_public_key", data)
|
|
|
|
self.assertTrue(data["ca_configured"])
|
|
self.assertTrue(data["age_configured"])
|
|
self.assertEqual(data["certificates_count"], 2)
|
|
self.assertEqual(data["trusted_keys_count"], 3)
|
|
|
|
def test_get_certificates(self):
|
|
"""Test GET /api/vault/certificates."""
|
|
response = self.client.get('/api/vault/certificates')
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertIsInstance(data, list)
|
|
self.assertEqual(len(data), 1)
|
|
self.assertEqual(data[0]["common_name"], "test.example.com")
|
|
self.assertTrue(data[0]["encrypted"])
|
|
self.assertFalse(data[0]["expired"])
|
|
|
|
def test_generate_certificate(self):
|
|
"""Test POST /api/vault/certificates."""
|
|
cert_data = {
|
|
"common_name": "new.example.com",
|
|
"domains": ["new.example.com", "www.new.example.com"],
|
|
"key_size": 2048,
|
|
"days": 365
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/certificates',
|
|
data=json.dumps(cert_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertEqual(data["common_name"], "new.example.com")
|
|
self.assertEqual(data["domains"], ["new.example.com", "www.new.example.com"])
|
|
self.assertTrue(data["encrypted"])
|
|
|
|
# Verify vault manager was called
|
|
self.mock_vault.return_value.generate_certificate.assert_called_once_with(
|
|
common_name="new.example.com",
|
|
domains=["new.example.com", "www.new.example.com"],
|
|
key_size=2048,
|
|
days=365
|
|
)
|
|
|
|
def test_generate_certificate_missing_common_name(self):
|
|
"""Test POST /api/vault/certificates with missing common_name."""
|
|
cert_data = {
|
|
"domains": ["test.example.com"]
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/certificates',
|
|
data=json.dumps(cert_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 500)
|
|
|
|
def test_revoke_certificate(self):
|
|
"""Test DELETE /api/vault/certificates/<common_name>."""
|
|
response = self.client.delete('/api/vault/certificates/test.example.com')
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertTrue(data["revoked"])
|
|
|
|
# Verify vault manager was called
|
|
self.mock_vault.return_value.revoke_certificate.assert_called_once_with("test.example.com")
|
|
|
|
def test_get_ca_certificate(self):
|
|
"""Test GET /api/vault/ca/certificate."""
|
|
response = self.client.get('/api/vault/ca/certificate')
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertIn("certificate", data)
|
|
self.assertTrue(data["certificate"].startswith("-----BEGIN CERTIFICATE-----"))
|
|
|
|
def test_get_age_public_key(self):
|
|
"""Test GET /api/vault/age/public-key."""
|
|
response = self.client.get('/api/vault/age/public-key')
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertIn("public_key", data)
|
|
self.assertTrue(data["public_key"].startswith("age1"))
|
|
|
|
def test_get_trusted_keys(self):
|
|
"""Test GET /api/vault/trust/keys."""
|
|
response = self.client.get('/api/vault/trust/keys')
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertIn("peer1", data)
|
|
self.assertIn("peer2", data)
|
|
self.assertEqual(data["peer1"]["public_key"], "age1peer1key")
|
|
self.assertEqual(data["peer1"]["trust_level"], "direct")
|
|
self.assertTrue(data["peer1"]["verified"])
|
|
self.assertFalse(data["peer2"]["verified"])
|
|
|
|
def test_add_trusted_key(self):
|
|
"""Test POST /api/vault/trust/keys."""
|
|
key_data = {
|
|
"name": "new-peer",
|
|
"public_key": "age1newpeerkey",
|
|
"trust_level": "direct"
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/trust/keys',
|
|
data=json.dumps(key_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertTrue(data["added"])
|
|
|
|
# Verify vault manager was called
|
|
self.mock_vault.return_value.add_trusted_key.assert_called_once_with(
|
|
name="new-peer",
|
|
public_key="age1newpeerkey",
|
|
trust_level="direct"
|
|
)
|
|
|
|
def test_add_trusted_key_missing_name(self):
|
|
"""Test POST /api/vault/trust/keys with missing name."""
|
|
key_data = {
|
|
"public_key": "age1newpeerkey"
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/trust/keys',
|
|
data=json.dumps(key_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 500)
|
|
|
|
def test_remove_trusted_key(self):
|
|
"""Test DELETE /api/vault/trust/keys/<name>."""
|
|
response = self.client.delete('/api/vault/trust/keys/test-peer')
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertTrue(data["removed"])
|
|
|
|
# Verify vault manager was called
|
|
self.mock_vault.return_value.remove_trusted_key.assert_called_once_with("test-peer")
|
|
|
|
def test_verify_trust_chain(self):
|
|
"""Test POST /api/vault/trust/verify."""
|
|
verify_data = {
|
|
"peer_name": "test-peer",
|
|
"signature": "test-signature",
|
|
"data": "test-data"
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/trust/verify',
|
|
data=json.dumps(verify_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertTrue(data["verified"])
|
|
|
|
# Verify vault manager was called
|
|
self.mock_vault.return_value.verify_trust_chain.assert_called_once_with(
|
|
peer_name="test-peer",
|
|
signature="test-signature",
|
|
data="test-data"
|
|
)
|
|
|
|
def test_verify_trust_chain_missing_data(self):
|
|
"""Test POST /api/vault/trust/verify with missing data."""
|
|
verify_data = {
|
|
"peer_name": "test-peer",
|
|
"signature": "test-signature"
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/trust/verify',
|
|
data=json.dumps(verify_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 500)
|
|
|
|
def test_get_trust_chains(self):
|
|
"""Test GET /api/vault/trust/chains."""
|
|
response = self.client.get('/api/vault/trust/chains')
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data)
|
|
|
|
self.assertIn("peer1", data)
|
|
self.assertEqual(data["peer1"]["signature"], "sig123")
|
|
self.assertEqual(data["peer1"]["data"], "data123")
|
|
self.assertEqual(data["peer1"]["trust_level"], "direct")
|
|
|
|
def test_vault_error_handling(self):
|
|
"""Test error handling in vault endpoints."""
|
|
# Mock an exception
|
|
self.mock_vault.return_value.get_status.side_effect = Exception("Test error")
|
|
|
|
response = self.client.get('/api/vault/status')
|
|
|
|
self.assertEqual(response.status_code, 500)
|
|
data = json.loads(response.data)
|
|
self.assertIn("error", data)
|
|
|
|
def test_certificate_generation_error(self):
|
|
"""Test error handling in certificate generation."""
|
|
# Mock an exception
|
|
self.mock_vault.return_value.generate_certificate.side_effect = Exception("Generation error")
|
|
|
|
cert_data = {
|
|
"common_name": "error.example.com"
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/certificates',
|
|
data=json.dumps(cert_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 500)
|
|
data = json.loads(response.data)
|
|
self.assertIn("error", data)
|
|
|
|
def test_trust_key_operations_error(self):
|
|
"""Test error handling in trust key operations."""
|
|
# Mock an exception
|
|
self.mock_vault.return_value.add_trusted_key.side_effect = Exception("Trust error")
|
|
|
|
key_data = {
|
|
"name": "error-peer",
|
|
"public_key": "age1error"
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/trust/keys',
|
|
data=json.dumps(key_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 500)
|
|
data = json.loads(response.data)
|
|
self.assertIn("error", data)
|
|
|
|
|
|
class TestVaultAPIIntegration(unittest.TestCase):
|
|
"""Integration tests for Vault API."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
from vault_manager import VaultManager
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.config_dir = os.path.join(self.test_dir, "config")
|
|
self.data_dir = os.path.join(self.test_dir, "data")
|
|
|
|
os.makedirs(self.config_dir, exist_ok=True)
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
|
|
# Use a real VaultManager backed by temp dirs
|
|
self._original_vault_manager = getattr(app, 'vault_manager', None)
|
|
app.vault_manager = VaultManager(data_dir=self.data_dir, config_dir=self.config_dir)
|
|
|
|
# Configure Flask app for testing
|
|
app.config['TESTING'] = True
|
|
self.client = app.test_client()
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment."""
|
|
if self._original_vault_manager is not None:
|
|
app.vault_manager = self._original_vault_manager
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_full_certificate_lifecycle_api(self):
|
|
"""Test complete certificate lifecycle via API."""
|
|
# Generate certificate
|
|
cert_data = {
|
|
"common_name": "api.example.com",
|
|
"domains": ["api.example.com", "www.api.example.com"],
|
|
"key_size": 2048,
|
|
"days": 365
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/certificates',
|
|
data=json.dumps(cert_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
# List certificates
|
|
response = self.client.get('/api/vault/certificates')
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
# Revoke certificate
|
|
response = self.client.delete('/api/vault/certificates/api.example.com')
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
def test_full_trust_lifecycle_api(self):
|
|
"""Test complete trust lifecycle via API."""
|
|
# Add trusted key
|
|
key_data = {
|
|
"name": "api-peer",
|
|
"public_key": "age1apikey",
|
|
"trust_level": "direct"
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/trust/keys',
|
|
data=json.dumps(key_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
# Verify trust chain
|
|
verify_data = {
|
|
"peer_name": "api-peer",
|
|
"signature": "api-sig",
|
|
"data": "api-data"
|
|
}
|
|
|
|
response = self.client.post(
|
|
'/api/vault/trust/verify',
|
|
data=json.dumps(verify_data),
|
|
content_type='application/json'
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
# Remove trusted key
|
|
response = self.client.delete('/api/vault/trust/keys/api-peer')
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |