#!/usr/bin/env python3 """ API tests for Vault & Trust endpoints Tests all vault-related API endpoints for secure certificate management. """ import sys from pathlib import Path # Add api directory to path api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) import unittest import tempfile import shutil import os import json from pathlib import Path from unittest.mock import patch, MagicMock # Import Flask app import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app import app class TestVaultAPI(unittest.TestCase): """Test cases for Vault API endpoints.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() self.config_dir = os.path.join(self.test_dir, "config") self.data_dir = os.path.join(self.test_dir, "data") os.makedirs(self.config_dir, exist_ok=True) os.makedirs(self.data_dir, exist_ok=True) # Mock VaultManager self.vault_patcher = patch('api.vault_manager') self.mock_vault = self.vault_patcher.start() # Create a mock vault manager instance mock_vault_instance = MagicMock() # Configure mock vault manager methods mock_vault_instance.get_status.return_value = { "ca_configured": True, "age_configured": True, "certificates_count": 2, "trusted_keys_count": 3, "trust_chains_count": 1, "certificates": [ { "common_name": "test.example.com", "serial_number": 12345, "not_valid_before": "2024-01-01T00:00:00", "not_valid_after": "2025-01-01T00:00:00", "cert_file": "/path/to/cert.crt", "key_file": "/path/to/key.key", "encrypted": True, "expired": False } ], "trusted_keys": ["peer1", "peer2", "peer3"], "ca_certificate": "base64-encoded-ca-cert", "age_public_key": "age1testkey123456789" } mock_vault_instance.list_certificates.return_value = [ { "common_name": "test.example.com", "serial_number": 12345, "not_valid_before": "2024-01-01T00:00:00", "not_valid_after": "2025-01-01T00:00:00", "cert_file": "/path/to/cert.crt", "key_file": "/path/to/key.key", "encrypted": True, "expired": False } ] mock_vault_instance.generate_certificate.return_value = { "common_name": "new.example.com", "domains": ["new.example.com", "www.new.example.com"], "cert_file": "/path/to/new.crt", "key_file": "/path/to/new.key", "serial_number": 67890, "not_valid_before": "2024-01-01T00:00:00", "not_valid_after": "2025-01-01T00:00:00", "encrypted": True } mock_vault_instance.revoke_certificate.return_value = True mock_vault_instance.get_ca_certificate.return_value = "-----BEGIN CERTIFICATE-----\nMII...\n-----END CERTIFICATE-----" mock_vault_instance.get_age_public_key.return_value = "age1testkey123456789" mock_vault_instance.get_trusted_keys.return_value = { "peer1": { "public_key": "age1peer1key", "trust_level": "direct", "added_at": "2024-01-01T00:00:00", "verified": True }, "peer2": { "public_key": "age1peer2key", "trust_level": "indirect", "added_at": "2024-01-01T00:00:00", "verified": False } } mock_vault_instance.add_trusted_key.return_value = True mock_vault_instance.remove_trusted_key.return_value = True mock_vault_instance.verify_trust_chain.return_value = True mock_vault_instance.get_trust_chains.return_value = { "peer1": { "signature": "sig123", "data": "data123", "verified_at": "2024-01-01T00:00:00", "trust_level": "direct" } } # Set the mock to return our configured instance self.mock_vault.return_value = mock_vault_instance # Inject the mock VaultManager into the Flask app app.vault_manager = self.mock_vault.return_value # Configure Flask app for testing app.config['TESTING'] = True self.client = app.test_client() def tearDown(self): """Clean up test environment.""" self.vault_patcher.stop() shutil.rmtree(self.test_dir) def test_get_vault_status(self): """Test GET /api/vault/status.""" response = self.client.get('/api/vault/status') self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertIn("ca_configured", data) self.assertIn("age_configured", data) self.assertIn("certificates_count", data) self.assertIn("trusted_keys_count", data) self.assertIn("trust_chains_count", data) self.assertIn("certificates", data) self.assertIn("trusted_keys", data) self.assertIn("ca_certificate", data) self.assertIn("age_public_key", data) self.assertTrue(data["ca_configured"]) self.assertTrue(data["age_configured"]) self.assertEqual(data["certificates_count"], 2) self.assertEqual(data["trusted_keys_count"], 3) def test_get_certificates(self): """Test GET /api/vault/certificates.""" response = self.client.get('/api/vault/certificates') self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertIsInstance(data, list) self.assertEqual(len(data), 1) self.assertEqual(data[0]["common_name"], "test.example.com") self.assertTrue(data[0]["encrypted"]) self.assertFalse(data[0]["expired"]) def test_generate_certificate(self): """Test POST /api/vault/certificates.""" cert_data = { "common_name": "new.example.com", "domains": ["new.example.com", "www.new.example.com"], "key_size": 2048, "days": 365 } response = self.client.post( '/api/vault/certificates', data=json.dumps(cert_data), content_type='application/json' ) self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertEqual(data["common_name"], "new.example.com") self.assertEqual(data["domains"], ["new.example.com", "www.new.example.com"]) self.assertTrue(data["encrypted"]) # Verify vault manager was called self.mock_vault.return_value.generate_certificate.assert_called_once_with( common_name="new.example.com", domains=["new.example.com", "www.new.example.com"], key_size=2048, days=365 ) def test_generate_certificate_missing_common_name(self): """Test POST /api/vault/certificates with missing common_name.""" cert_data = { "domains": ["test.example.com"] } response = self.client.post( '/api/vault/certificates', data=json.dumps(cert_data), content_type='application/json' ) self.assertEqual(response.status_code, 500) def test_revoke_certificate(self): """Test DELETE /api/vault/certificates/.""" response = self.client.delete('/api/vault/certificates/test.example.com') self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertTrue(data["revoked"]) # Verify vault manager was called self.mock_vault.return_value.revoke_certificate.assert_called_once_with("test.example.com") def test_get_ca_certificate(self): """Test GET /api/vault/ca/certificate.""" response = self.client.get('/api/vault/ca/certificate') self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertIn("certificate", data) self.assertTrue(data["certificate"].startswith("-----BEGIN CERTIFICATE-----")) def test_get_age_public_key(self): """Test GET /api/vault/age/public-key.""" response = self.client.get('/api/vault/age/public-key') self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertIn("public_key", data) self.assertTrue(data["public_key"].startswith("age1")) def test_get_trusted_keys(self): """Test GET /api/vault/trust/keys.""" response = self.client.get('/api/vault/trust/keys') self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertIn("peer1", data) self.assertIn("peer2", data) self.assertEqual(data["peer1"]["public_key"], "age1peer1key") self.assertEqual(data["peer1"]["trust_level"], "direct") self.assertTrue(data["peer1"]["verified"]) self.assertFalse(data["peer2"]["verified"]) def test_add_trusted_key(self): """Test POST /api/vault/trust/keys.""" key_data = { "name": "new-peer", "public_key": "age1newpeerkey", "trust_level": "direct" } response = self.client.post( '/api/vault/trust/keys', data=json.dumps(key_data), content_type='application/json' ) self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertTrue(data["added"]) # Verify vault manager was called self.mock_vault.return_value.add_trusted_key.assert_called_once_with( name="new-peer", public_key="age1newpeerkey", trust_level="direct" ) def test_add_trusted_key_missing_name(self): """Test POST /api/vault/trust/keys with missing name.""" key_data = { "public_key": "age1newpeerkey" } response = self.client.post( '/api/vault/trust/keys', data=json.dumps(key_data), content_type='application/json' ) self.assertEqual(response.status_code, 500) def test_remove_trusted_key(self): """Test DELETE /api/vault/trust/keys/.""" response = self.client.delete('/api/vault/trust/keys/test-peer') self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertTrue(data["removed"]) # Verify vault manager was called self.mock_vault.return_value.remove_trusted_key.assert_called_once_with("test-peer") def test_verify_trust_chain(self): """Test POST /api/vault/trust/verify.""" verify_data = { "peer_name": "test-peer", "signature": "test-signature", "data": "test-data" } response = self.client.post( '/api/vault/trust/verify', data=json.dumps(verify_data), content_type='application/json' ) self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertTrue(data["verified"]) # Verify vault manager was called self.mock_vault.return_value.verify_trust_chain.assert_called_once_with( peer_name="test-peer", signature="test-signature", data="test-data" ) def test_verify_trust_chain_missing_data(self): """Test POST /api/vault/trust/verify with missing data.""" verify_data = { "peer_name": "test-peer", "signature": "test-signature" } response = self.client.post( '/api/vault/trust/verify', data=json.dumps(verify_data), content_type='application/json' ) self.assertEqual(response.status_code, 500) def test_get_trust_chains(self): """Test GET /api/vault/trust/chains.""" response = self.client.get('/api/vault/trust/chains') self.assertEqual(response.status_code, 200) data = json.loads(response.data) self.assertIn("peer1", data) self.assertEqual(data["peer1"]["signature"], "sig123") self.assertEqual(data["peer1"]["data"], "data123") self.assertEqual(data["peer1"]["trust_level"], "direct") def test_vault_error_handling(self): """Test error handling in vault endpoints.""" # Mock an exception self.mock_vault.return_value.get_status.side_effect = Exception("Test error") response = self.client.get('/api/vault/status') self.assertEqual(response.status_code, 500) data = json.loads(response.data) self.assertIn("error", data) def test_certificate_generation_error(self): """Test error handling in certificate generation.""" # Mock an exception self.mock_vault.return_value.generate_certificate.side_effect = Exception("Generation error") cert_data = { "common_name": "error.example.com" } response = self.client.post( '/api/vault/certificates', data=json.dumps(cert_data), content_type='application/json' ) self.assertEqual(response.status_code, 500) data = json.loads(response.data) self.assertIn("error", data) def test_trust_key_operations_error(self): """Test error handling in trust key operations.""" # Mock an exception self.mock_vault.return_value.add_trusted_key.side_effect = Exception("Trust error") key_data = { "name": "error-peer", "public_key": "age1error" } response = self.client.post( '/api/vault/trust/keys', data=json.dumps(key_data), content_type='application/json' ) self.assertEqual(response.status_code, 500) data = json.loads(response.data) self.assertIn("error", data) class TestVaultAPIIntegration(unittest.TestCase): """Integration tests for Vault API.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() self.config_dir = os.path.join(self.test_dir, "config") self.data_dir = os.path.join(self.test_dir, "data") os.makedirs(self.config_dir, exist_ok=True) os.makedirs(self.data_dir, exist_ok=True) # Configure Flask app for testing app.config['TESTING'] = True self.client = app.test_client() def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.test_dir) def test_full_certificate_lifecycle_api(self): """Test complete certificate lifecycle via API.""" # Generate certificate cert_data = { "common_name": "api.example.com", "domains": ["api.example.com", "www.api.example.com"], "key_size": 2048, "days": 365 } response = self.client.post( '/api/vault/certificates', data=json.dumps(cert_data), content_type='application/json' ) self.assertEqual(response.status_code, 200) # List certificates response = self.client.get('/api/vault/certificates') self.assertEqual(response.status_code, 200) # Revoke certificate response = self.client.delete('/api/vault/certificates/api.example.com') self.assertEqual(response.status_code, 200) def test_full_trust_lifecycle_api(self): """Test complete trust lifecycle via API.""" # Add trusted key key_data = { "name": "api-peer", "public_key": "age1apikey", "trust_level": "direct" } response = self.client.post( '/api/vault/trust/keys', data=json.dumps(key_data), content_type='application/json' ) self.assertEqual(response.status_code, 200) # Verify trust chain verify_data = { "peer_name": "api-peer", "signature": "api-sig", "data": "api-data" } response = self.client.post( '/api/vault/trust/verify', data=json.dumps(verify_data), content_type='application/json' ) self.assertEqual(response.status_code, 200) # Remove trusted key response = self.client.delete('/api/vault/trust/keys/api-peer') self.assertEqual(response.status_code, 200) if __name__ == '__main__': unittest.main()