a43f9fbf0d
P0 — Broken functionality: - Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing) - Fix email_manager.delete_email_user() missing domain arg - Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file) - Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR) - Fix _next_peer_ip exhaustion returning 500 instead of 409 - Fix ConfigManager Caddyfile path (/app/config-caddy/) - Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx - Remove hardcoded credentials from Dashboard.jsx P1 — Security: - CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern) - enforce_auth: 503 only when users file readable but empty; never bypass on IOError - WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes - DNS add_cell_dns_forward: validate IP and domain; reject injection chars - DNS zone write: realpath containment + record content validation - iptables comment /32 suffix prevents substring match deleting wrong peer rules - is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge) - POST /api/containers: volume allow-list prevents arbitrary host mounts - file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user - email/calendar: stop persisting plaintext passwords in user records - routing_manager: validate IPs, networks, and interface names - peer_registry: write peers.json at mode 0o600 - vault_manager: Fernet key file at mode 0o600 - CORS: lock down to explicit origin list - domain/cell_name validation: reject newline, brace, semicolon injection chars P2 — Architecture: - Peer add: rollback registry entry if firewall rules fail post-add - restart_service(): base class now calls _restart_container(); email and calendar managers call cell-mail / cell-radicale respectively - email/calendar managers sync user list (no passwords) to cell_config.json - Pending-restart flag cleared only after helper subprocess exits with code 0 - docker-compose.yml: add config-caddy volume to API container P3 — Tests (854 → 1020): - Fill test_email_endpoints.py, test_calendar_endpoints.py, test_network_endpoints.py, test_routing_endpoints.py - New: test_peer_management_update.py, test_peer_management_edge_cases.py, test_input_validation.py, test_enforce_auth_configured.py, test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py, test_is_local_request_per_endpoint.py, test_caddy_routing.py - E2E conftest: skip WireGuard suite when wg-quick absent - Update existing tests to match fixed signatures and comment formats Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
728 lines
27 KiB
Python
728 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
VaultManager - Secure Certificate Management and Trust Systems
|
|
|
|
Handles:
|
|
- Self-hosted Certificate Authority (CA)
|
|
- TLS certificate generation and management
|
|
- Age encryption for sensitive data
|
|
- Trust management and verification
|
|
- Certificate lifecycle management
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import tempfile
|
|
import shutil
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
import logging
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
|
import base64
|
|
from cryptography.fernet import Fernet
|
|
from base_service_manager import BaseServiceManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VaultManager(BaseServiceManager):
|
|
"""Manages secure certificate authority, trust systems, and encrypted storage."""
|
|
|
|
def __init__(self, config_dir: str = "config", data_dir: str = "data"):
|
|
super().__init__('vault', data_dir, config_dir)
|
|
self.config_dir = Path(config_dir)
|
|
self.data_dir = Path(data_dir)
|
|
self.vault_dir = self.data_dir / "vault"
|
|
self.ca_dir = self.vault_dir / "ca"
|
|
self.certs_dir = self.vault_dir / "certs"
|
|
self.keys_dir = self.vault_dir / "keys"
|
|
self.trust_dir = self.vault_dir / "trust"
|
|
|
|
# Create directories
|
|
for directory in [self.vault_dir, self.ca_dir, self.certs_dir, self.keys_dir, self.trust_dir]:
|
|
try:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
except (PermissionError, OSError):
|
|
pass
|
|
|
|
# CA files
|
|
self.ca_key_file = self.ca_dir / "ca.key"
|
|
self.ca_cert_file = self.ca_dir / "ca.crt"
|
|
self.ca_config_file = self.ca_dir / "ca.conf"
|
|
|
|
# Fernet encryption
|
|
self.fernet_key_file = self.keys_dir / "fernet.key"
|
|
self._load_or_create_fernet_key()
|
|
|
|
# Trust store
|
|
self.trusted_keys_file = self.trust_dir / "trusted_keys.json"
|
|
self.trust_chains_file = self.trust_dir / "trust_chains.json"
|
|
|
|
self.trusted_keys = {}
|
|
self.trust_chains = {}
|
|
self.ca_key = None
|
|
self.ca_cert = None
|
|
try:
|
|
self._load_or_create_ca()
|
|
except (PermissionError, OSError):
|
|
pass
|
|
self._load_trust_store()
|
|
|
|
def _load_or_create_ca(self) -> None:
|
|
"""Load existing CA or create new one."""
|
|
if self.ca_key_file.exists() and self.ca_cert_file.exists():
|
|
logger.info("Loading existing CA")
|
|
self._load_ca()
|
|
else:
|
|
logger.info("Creating new CA")
|
|
self._create_ca()
|
|
|
|
def _create_ca(self) -> None:
|
|
"""Create a new Certificate Authority."""
|
|
# Generate CA private key
|
|
ca_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=4096
|
|
)
|
|
|
|
# Save CA private key
|
|
with open(self.ca_key_file, "wb") as f:
|
|
f.write(ca_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
))
|
|
|
|
# Create CA certificate
|
|
subject = issuer = x509.Name([
|
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
|
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "CA"),
|
|
x509.NameAttribute(NameOID.LOCALITY_NAME, "Personal Internet Cell"),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Personal Internet Cell CA"),
|
|
x509.NameAttribute(NameOID.COMMON_NAME, "Personal Internet Cell Root CA"),
|
|
])
|
|
|
|
ca_cert = x509.CertificateBuilder().subject_name(
|
|
subject
|
|
).issuer_name(
|
|
issuer
|
|
).public_key(
|
|
ca_key.public_key()
|
|
).serial_number(
|
|
x509.random_serial_number()
|
|
).not_valid_before(
|
|
datetime.utcnow()
|
|
).not_valid_after(
|
|
datetime.utcnow() + timedelta(days=3650) # 10 years
|
|
).add_extension(
|
|
x509.BasicConstraints(ca=True, path_length=None),
|
|
critical=True,
|
|
).add_extension(
|
|
x509.KeyUsage(
|
|
digital_signature=True,
|
|
key_encipherment=True,
|
|
key_cert_sign=True,
|
|
crl_sign=True,
|
|
content_commitment=False,
|
|
data_encipherment=False,
|
|
key_agreement=False,
|
|
encipher_only=False,
|
|
decipher_only=False
|
|
),
|
|
critical=True,
|
|
).sign(ca_key, hashes.SHA256())
|
|
|
|
# Save CA certificate
|
|
with open(self.ca_cert_file, "wb") as f:
|
|
f.write(ca_cert.public_bytes(serialization.Encoding.PEM))
|
|
|
|
self.ca_key = ca_key
|
|
self.ca_cert = ca_cert
|
|
logger.info("CA created successfully")
|
|
|
|
def _load_ca(self) -> None:
|
|
"""Load existing CA key and certificate."""
|
|
with open(self.ca_key_file, "rb") as f:
|
|
self.ca_key = load_pem_private_key(f.read(), password=None)
|
|
|
|
with open(self.ca_cert_file, "rb") as f:
|
|
self.ca_cert = x509.load_pem_x509_certificate(f.read())
|
|
|
|
logger.info("CA loaded successfully")
|
|
|
|
def _load_or_create_fernet_key(self) -> None:
|
|
"""Load existing Fernet key or create a new one."""
|
|
try:
|
|
if self.fernet_key_file.exists():
|
|
with open(self.fernet_key_file, "rb") as f:
|
|
self.fernet_key = f.read()
|
|
# SECURITY: ensure key file is owner-only readable on every load
|
|
# in case it was created with looser perms by an older version.
|
|
try:
|
|
os.chmod(str(self.fernet_key_file), 0o600)
|
|
except OSError:
|
|
pass
|
|
else:
|
|
self.fernet_key = Fernet.generate_key()
|
|
# SECURITY: create the key file with 0o600 from the first byte
|
|
# so the secret is never world-readable, even momentarily.
|
|
fd = os.open(
|
|
str(self.fernet_key_file),
|
|
os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
|
|
0o600,
|
|
)
|
|
with os.fdopen(fd, "wb") as f:
|
|
f.write(self.fernet_key)
|
|
# Belt-and-braces chmod in case umask or a pre-existing file
|
|
# left wider permissions in place.
|
|
os.chmod(str(self.fernet_key_file), 0o600)
|
|
self.fernet = Fernet(self.fernet_key)
|
|
except (PermissionError, OSError):
|
|
self.fernet_key = Fernet.generate_key()
|
|
self.fernet = Fernet(self.fernet_key)
|
|
|
|
def generate_certificate(self, common_name: str, domains: Optional[List[str]] = None,
|
|
key_size: int = 2048, days: int = 365) -> Dict:
|
|
"""Generate a new TLS certificate."""
|
|
try:
|
|
if self.ca_key is None or self.ca_cert is None:
|
|
raise RuntimeError("CA not initialized — cannot generate certificate")
|
|
# Generate private key
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=key_size
|
|
)
|
|
|
|
# Create certificate
|
|
subject = x509.Name([
|
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
|
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "CA"),
|
|
x509.NameAttribute(NameOID.LOCALITY_NAME, "Personal Internet Cell"),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Personal Internet Cell"),
|
|
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
|
|
])
|
|
|
|
# Add SAN if domains provided
|
|
sans = []
|
|
if domains:
|
|
sans.extend([x509.DNSName(domain) for domain in domains])
|
|
|
|
cert_builder = x509.CertificateBuilder().subject_name(
|
|
subject
|
|
).issuer_name(
|
|
self.ca_cert.subject
|
|
).public_key(
|
|
private_key.public_key()
|
|
).serial_number(
|
|
x509.random_serial_number()
|
|
).not_valid_before(
|
|
datetime.utcnow()
|
|
).not_valid_after(
|
|
datetime.utcnow() + timedelta(days=days)
|
|
).add_extension(
|
|
x509.BasicConstraints(ca=False, path_length=None),
|
|
critical=True,
|
|
).add_extension(
|
|
x509.KeyUsage(
|
|
digital_signature=True,
|
|
key_encipherment=True,
|
|
key_cert_sign=False,
|
|
crl_sign=False,
|
|
content_commitment=False,
|
|
data_encipherment=False,
|
|
key_agreement=False,
|
|
encipher_only=False,
|
|
decipher_only=False
|
|
),
|
|
critical=True,
|
|
).add_extension(
|
|
x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]),
|
|
critical=False,
|
|
)
|
|
|
|
if sans:
|
|
cert_builder = cert_builder.add_extension(
|
|
x509.SubjectAlternativeName(sans),
|
|
critical=False,
|
|
)
|
|
|
|
certificate = cert_builder.sign(self.ca_key, hashes.SHA256())
|
|
|
|
# Save certificate and key
|
|
cert_file = self.certs_dir / f"{common_name}.crt"
|
|
key_file = self.certs_dir / f"{common_name}.key"
|
|
|
|
with open(cert_file, "wb") as f:
|
|
f.write(certificate.public_bytes(serialization.Encoding.PEM))
|
|
|
|
with open(key_file, "wb") as f:
|
|
f.write(private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
))
|
|
|
|
# Encrypt private key with Fernet
|
|
self._encrypt_file_with_fernet(key_file)
|
|
|
|
return {
|
|
"common_name": common_name,
|
|
"domains": domains or [],
|
|
"cert_file": str(cert_file),
|
|
"key_file": str(key_file),
|
|
"serial_number": certificate.serial_number,
|
|
"not_valid_before": certificate.not_valid_before.isoformat(),
|
|
"not_valid_after": certificate.not_valid_after.isoformat(),
|
|
"encrypted": True
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate certificate for {common_name}: {e}")
|
|
raise
|
|
|
|
def _encrypt_file_with_fernet(self, file_path: Path) -> None:
|
|
"""Encrypt a file with Fernet."""
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
content = f.read()
|
|
encrypted = self.fernet.encrypt(content)
|
|
with open(file_path, "wb") as f:
|
|
f.write(encrypted)
|
|
logger.info(f"Encrypted {file_path} with Fernet")
|
|
except Exception as e:
|
|
logger.warning(f"Fernet encryption failed, keeping file unencrypted: {e}")
|
|
|
|
def _decrypt_file_with_fernet(self, file_path: Path) -> bytes:
|
|
"""Decrypt a file with Fernet."""
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
encrypted = f.read()
|
|
return self.fernet.decrypt(encrypted)
|
|
except Exception as e:
|
|
logger.error(f"Failed to decrypt {file_path}: {e}")
|
|
raise
|
|
|
|
def list_certificates(self) -> List[Dict]:
|
|
"""List all certificates."""
|
|
certificates = []
|
|
|
|
for cert_file in self.certs_dir.glob("*.crt"):
|
|
try:
|
|
with open(cert_file, "rb") as f:
|
|
cert = x509.load_pem_x509_certificate(f.read())
|
|
|
|
key_file = cert_file.with_suffix(".key")
|
|
encrypted = key_file.exists()
|
|
|
|
certificates.append({
|
|
"common_name": cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
|
|
"serial_number": cert.serial_number,
|
|
"not_valid_before": cert.not_valid_before.isoformat(),
|
|
"not_valid_after": cert.not_valid_after.isoformat(),
|
|
"cert_file": str(cert_file),
|
|
"key_file": str(key_file),
|
|
"encrypted": encrypted,
|
|
"expired": cert.not_valid_after < datetime.utcnow()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to read certificate {cert_file}: {e}")
|
|
|
|
return certificates
|
|
|
|
def revoke_certificate(self, common_name: str) -> bool:
|
|
"""Revoke a certificate."""
|
|
try:
|
|
cert_file = self.certs_dir / f"{common_name}.crt"
|
|
key_file = self.certs_dir / f"{common_name}.key"
|
|
|
|
if cert_file.exists():
|
|
cert_file.unlink()
|
|
if key_file.exists():
|
|
key_file.unlink()
|
|
|
|
logger.info(f"Revoked certificate for {common_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to revoke certificate for {common_name}: {e}")
|
|
return False
|
|
|
|
def add_trusted_key(self, name: str, public_key: str, trust_level: str = "direct") -> bool:
|
|
"""Add a trusted public key."""
|
|
try:
|
|
self.trusted_keys[name] = {
|
|
"public_key": public_key,
|
|
"trust_level": trust_level,
|
|
"added_at": datetime.utcnow().isoformat(),
|
|
"verified": False
|
|
}
|
|
self._save_trust_store()
|
|
logger.info(f"Added trusted key for {name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to add trusted key for {name}: {e}")
|
|
return False
|
|
|
|
def remove_trusted_key(self, name: str) -> bool:
|
|
"""Remove a trusted public key."""
|
|
try:
|
|
if name in self.trusted_keys:
|
|
del self.trusted_keys[name]
|
|
self._save_trust_store()
|
|
logger.info(f"Removed trusted key for {name}")
|
|
return True
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to remove trusted key for {name}: {e}")
|
|
return False
|
|
|
|
def verify_trust_chain(self, peer_name: str, signature: str, data: str) -> bool:
|
|
"""Verify a trust chain signature."""
|
|
try:
|
|
if peer_name not in self.trusted_keys:
|
|
logger.warning(f"Peer {peer_name} not in trusted keys")
|
|
return False
|
|
|
|
# For now, implement basic verification
|
|
# In a real implementation, you'd verify the signature cryptographically
|
|
trusted_key = self.trusted_keys[peer_name]
|
|
|
|
# Add to trust chains
|
|
self.trust_chains[peer_name] = {
|
|
"signature": signature,
|
|
"data": data,
|
|
"verified_at": datetime.utcnow().isoformat(),
|
|
"trust_level": trusted_key["trust_level"]
|
|
}
|
|
self._save_trust_store()
|
|
|
|
logger.info(f"Verified trust chain for {peer_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to verify trust chain for {peer_name}: {e}")
|
|
return False
|
|
|
|
def get_ca_certificate(self) -> str:
|
|
"""Get CA certificate as PEM string."""
|
|
with open(self.ca_cert_file, "r") as f:
|
|
return f.read()
|
|
|
|
def get_age_public_key(self) -> str:
|
|
"""Return a dummy Age public key for compatibility."""
|
|
# In a real implementation, this would return the actual Age public key
|
|
return "age1testkey123456789"
|
|
|
|
def get_trusted_keys(self) -> Dict:
|
|
"""Return trusted keys as a dict (for API compatibility)."""
|
|
return self.trusted_keys
|
|
|
|
def get_trust_chains(self) -> Dict:
|
|
"""Return trust chains as a dict (for API compatibility)."""
|
|
return self.trust_chains
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get vault service status"""
|
|
try:
|
|
# Check CA status
|
|
ca_status = self._check_ca_status()
|
|
|
|
# Check certificates
|
|
certificates = self.list_certificates()
|
|
|
|
# Check trust store
|
|
trusted_keys = self.get_trusted_keys()
|
|
|
|
# Check secrets
|
|
secrets = self.list_secrets()
|
|
|
|
ca_ok = ca_status.get('valid', False)
|
|
ca_cert_pem = None
|
|
if self.ca_cert_file.exists():
|
|
ca_cert_pem = self.ca_cert_file.read_text()
|
|
status = {
|
|
'running': ca_ok,
|
|
'status': 'online' if ca_ok else 'offline',
|
|
'ca_configured': ca_ok,
|
|
'age_configured': ca_ok,
|
|
'age_public_key': None,
|
|
'ca_certificate': ca_cert_pem,
|
|
'ca_status': ca_status,
|
|
'certificates_count': len(certificates),
|
|
'certificates': certificates,
|
|
'trusted_keys_count': len(trusted_keys),
|
|
'trusted_keys': list(trusted_keys.values()) if isinstance(trusted_keys, dict) else trusted_keys,
|
|
'trust_chains_count': len(trusted_keys),
|
|
'secrets_count': len(secrets),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return status
|
|
except Exception as e:
|
|
return self.handle_error(e, "get_status")
|
|
|
|
def test_connectivity(self) -> Dict[str, Any]:
|
|
"""Test vault service connectivity"""
|
|
try:
|
|
# Test CA functionality
|
|
ca_test = self._test_ca_functionality()
|
|
|
|
# Test certificate generation
|
|
cert_test = self._test_certificate_generation()
|
|
|
|
# Test encryption/decryption
|
|
encryption_test = self._test_encryption_functionality()
|
|
|
|
# Test trust store
|
|
trust_test = self._test_trust_store()
|
|
|
|
results = {
|
|
'ca_functionality': ca_test,
|
|
'certificate_generation': cert_test,
|
|
'encryption_functionality': encryption_test,
|
|
'trust_store': trust_test,
|
|
'success': ca_test.get('success', False) and encryption_test.get('success', False),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return results
|
|
except Exception as e:
|
|
return self.handle_error(e, "test_connectivity")
|
|
|
|
def _check_ca_status(self) -> Dict[str, Any]:
|
|
"""Check CA certificate status"""
|
|
try:
|
|
if not self.ca_cert_file.exists() or not self.ca_key_file.exists():
|
|
return {
|
|
'valid': False,
|
|
'message': 'CA files not found',
|
|
'error': 'Missing CA certificate or key'
|
|
}
|
|
|
|
# Check if CA certificate is valid
|
|
with open(self.ca_cert_file, "rb") as f:
|
|
ca_cert = x509.load_pem_x509_certificate(f.read())
|
|
|
|
now = datetime.utcnow()
|
|
if now < ca_cert.not_valid_before or now > ca_cert.not_valid_after:
|
|
return {
|
|
'valid': False,
|
|
'message': 'CA certificate expired or not yet valid',
|
|
'not_valid_before': ca_cert.not_valid_before.isoformat(),
|
|
'not_valid_after': ca_cert.not_valid_after.isoformat()
|
|
}
|
|
|
|
return {
|
|
'valid': True,
|
|
'message': 'CA certificate is valid',
|
|
'not_valid_before': ca_cert.not_valid_before.isoformat(),
|
|
'not_valid_after': ca_cert.not_valid_after.isoformat(),
|
|
'subject': str(ca_cert.subject)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'valid': False,
|
|
'message': f'CA status check failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _test_ca_functionality(self) -> Dict[str, Any]:
|
|
"""Test CA functionality"""
|
|
try:
|
|
ca_status = self._check_ca_status()
|
|
|
|
if not ca_status.get('valid', False):
|
|
return {
|
|
'success': False,
|
|
'message': 'CA is not valid',
|
|
'error': ca_status.get('error', 'Unknown CA error')
|
|
}
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'CA functionality working',
|
|
'ca_valid': True
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'CA functionality test failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _test_certificate_generation(self) -> Dict[str, Any]:
|
|
"""Test certificate generation"""
|
|
try:
|
|
# Test generating a temporary certificate
|
|
test_cert = self.generate_certificate(
|
|
common_name="test.example.com",
|
|
domains=["test.example.com"],
|
|
days=1
|
|
)
|
|
|
|
if test_cert.get('success', False):
|
|
# Clean up test certificate
|
|
cert_file = self.certs_dir / f"test.example.com.crt"
|
|
key_file = self.certs_dir / f"test.example.com.key"
|
|
|
|
if cert_file.exists():
|
|
cert_file.unlink()
|
|
if key_file.exists():
|
|
key_file.unlink()
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Certificate generation working'
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'message': 'Certificate generation failed',
|
|
'error': test_cert.get('error', 'Unknown error')
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'Certificate generation test failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _test_encryption_functionality(self) -> Dict[str, Any]:
|
|
"""Test encryption/decryption functionality"""
|
|
try:
|
|
# Test Fernet encryption
|
|
test_data = b"test_secret_data"
|
|
encrypted_data = self.fernet.encrypt(test_data)
|
|
decrypted_data = self.fernet.decrypt(encrypted_data)
|
|
|
|
if decrypted_data == test_data:
|
|
return {
|
|
'success': True,
|
|
'message': 'Encryption/decryption working'
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'message': 'Encryption/decryption failed - data mismatch'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'Encryption test failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _test_trust_store(self) -> Dict[str, Any]:
|
|
"""Test trust store functionality"""
|
|
try:
|
|
trusted_keys = self.get_trusted_keys()
|
|
trust_chains = self.get_trust_chains()
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Trust store accessible',
|
|
'trusted_keys_count': len(trusted_keys),
|
|
'trust_chains_count': len(trust_chains)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'Trust store test failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _load_trust_store(self) -> None:
|
|
"""Load trust store from disk."""
|
|
if self.trusted_keys_file.exists():
|
|
with open(self.trusted_keys_file, "r") as f:
|
|
self.trusted_keys = json.load(f)
|
|
else:
|
|
self.trusted_keys = {}
|
|
if self.trust_chains_file.exists():
|
|
with open(self.trust_chains_file, "r") as f:
|
|
self.trust_chains = json.load(f)
|
|
else:
|
|
self.trust_chains = {}
|
|
|
|
def _save_trust_store(self) -> None:
|
|
"""Save trust store to disk."""
|
|
with open(self.trusted_keys_file, "w") as f:
|
|
json.dump(self.trusted_keys, f, indent=2)
|
|
with open(self.trust_chains_file, "w") as f:
|
|
json.dump(self.trust_chains, f, indent=2)
|
|
|
|
def _secrets_file(self):
|
|
return self.vault_dir / 'secrets.json'
|
|
|
|
def _load_secrets(self):
|
|
secrets_file = self._secrets_file()
|
|
if secrets_file.exists():
|
|
with open(secrets_file, 'rb') as f:
|
|
data = f.read()
|
|
try:
|
|
decrypted = self.fernet.decrypt(data)
|
|
return json.loads(decrypted.decode('utf-8'))
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
def _save_secrets(self, secrets):
|
|
secrets_file = self._secrets_file()
|
|
encrypted = self.fernet.encrypt(json.dumps(secrets).encode('utf-8'))
|
|
with open(secrets_file, 'wb') as f:
|
|
f.write(encrypted)
|
|
|
|
def store_secret(self, name: str, value: str) -> bool:
|
|
secrets = self._load_secrets()
|
|
secrets[name] = value
|
|
self._save_secrets(secrets)
|
|
return True
|
|
|
|
def get_secret(self, name: str) -> str:
|
|
secrets = self._load_secrets()
|
|
return secrets.get(name, None)
|
|
|
|
def list_secrets(self) -> list:
|
|
secrets = self._load_secrets()
|
|
return list(secrets.keys())
|
|
|
|
def delete_secret(self, name: str) -> bool:
|
|
secrets = self._load_secrets()
|
|
if name in secrets:
|
|
del secrets[name]
|
|
self._save_secrets(secrets)
|
|
return True
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test the VaultManager
|
|
vault = VaultManager()
|
|
print("Vault Manager initialized successfully")
|
|
print(f"CA configured: {vault.ca_cert_file.exists()}")
|
|
print(f"Fernet configured: {vault.fernet_key_file.exists()}")
|
|
|
|
# Generate a test certificate
|
|
cert_info = vault.generate_certificate("test.example.com", ["test.example.com", "www.test.example.com"])
|
|
print(f"Generated certificate: {cert_info}")
|
|
|
|
# List certificates
|
|
certs = vault.list_certificates()
|
|
print(f"Total certificates: {len(certs)}")
|
|
|
|
# Add a trusted key
|
|
vault.add_trusted_key("test-peer", "age1testkey123456789", "direct")
|
|
print("Added trusted key")
|
|
|
|
# Get status
|
|
status = vault.get_status()
|
|
print(f"Vault status: {status}") |