Files
pic/api/vault_manager.py
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

728 lines
27 KiB
Python

#!/usr/bin/env python3
"""
VaultManager - Secure Certificate Management and Trust Systems
Handles:
- Self-hosted Certificate Authority (CA)
- TLS certificate generation and management
- Age encryption for sensitive data
- Trust management and verification
- Certificate lifecycle management
"""
import os
import json
import subprocess
import tempfile
import shutil
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
import logging
from cryptography import x509
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.serialization import load_pem_private_key
import base64
from cryptography.fernet import Fernet
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
class VaultManager(BaseServiceManager):
"""Manages secure certificate authority, trust systems, and encrypted storage."""
def __init__(self, config_dir: str = "config", data_dir: str = "data"):
super().__init__('vault', data_dir, config_dir)
self.config_dir = Path(config_dir)
self.data_dir = Path(data_dir)
self.vault_dir = self.data_dir / "vault"
self.ca_dir = self.vault_dir / "ca"
self.certs_dir = self.vault_dir / "certs"
self.keys_dir = self.vault_dir / "keys"
self.trust_dir = self.vault_dir / "trust"
# Create directories
for directory in [self.vault_dir, self.ca_dir, self.certs_dir, self.keys_dir, self.trust_dir]:
try:
directory.mkdir(parents=True, exist_ok=True)
except (PermissionError, OSError):
pass
# CA files
self.ca_key_file = self.ca_dir / "ca.key"
self.ca_cert_file = self.ca_dir / "ca.crt"
self.ca_config_file = self.ca_dir / "ca.conf"
# Fernet encryption
self.fernet_key_file = self.keys_dir / "fernet.key"
self._load_or_create_fernet_key()
# Trust store
self.trusted_keys_file = self.trust_dir / "trusted_keys.json"
self.trust_chains_file = self.trust_dir / "trust_chains.json"
self.trusted_keys = {}
self.trust_chains = {}
self.ca_key = None
self.ca_cert = None
try:
self._load_or_create_ca()
except (PermissionError, OSError):
pass
self._load_trust_store()
def _load_or_create_ca(self) -> None:
"""Load existing CA or create new one."""
if self.ca_key_file.exists() and self.ca_cert_file.exists():
logger.info("Loading existing CA")
self._load_ca()
else:
logger.info("Creating new CA")
self._create_ca()
def _create_ca(self) -> None:
"""Create a new Certificate Authority."""
# Generate CA private key
ca_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096
)
# Save CA private key
with open(self.ca_key_file, "wb") as f:
f.write(ca_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
# Create CA certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "CA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Personal Internet Cell"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Personal Internet Cell CA"),
x509.NameAttribute(NameOID.COMMON_NAME, "Personal Internet Cell Root CA"),
])
ca_cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
ca_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(days=3650) # 10 years
).add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
).add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
key_cert_sign=True,
crl_sign=True,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
encipher_only=False,
decipher_only=False
),
critical=True,
).sign(ca_key, hashes.SHA256())
# Save CA certificate
with open(self.ca_cert_file, "wb") as f:
f.write(ca_cert.public_bytes(serialization.Encoding.PEM))
self.ca_key = ca_key
self.ca_cert = ca_cert
logger.info("CA created successfully")
def _load_ca(self) -> None:
"""Load existing CA key and certificate."""
with open(self.ca_key_file, "rb") as f:
self.ca_key = load_pem_private_key(f.read(), password=None)
with open(self.ca_cert_file, "rb") as f:
self.ca_cert = x509.load_pem_x509_certificate(f.read())
logger.info("CA loaded successfully")
def _load_or_create_fernet_key(self) -> None:
"""Load existing Fernet key or create a new one."""
try:
if self.fernet_key_file.exists():
with open(self.fernet_key_file, "rb") as f:
self.fernet_key = f.read()
# SECURITY: ensure key file is owner-only readable on every load
# in case it was created with looser perms by an older version.
try:
os.chmod(str(self.fernet_key_file), 0o600)
except OSError:
pass
else:
self.fernet_key = Fernet.generate_key()
# SECURITY: create the key file with 0o600 from the first byte
# so the secret is never world-readable, even momentarily.
fd = os.open(
str(self.fernet_key_file),
os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
0o600,
)
with os.fdopen(fd, "wb") as f:
f.write(self.fernet_key)
# Belt-and-braces chmod in case umask or a pre-existing file
# left wider permissions in place.
os.chmod(str(self.fernet_key_file), 0o600)
self.fernet = Fernet(self.fernet_key)
except (PermissionError, OSError):
self.fernet_key = Fernet.generate_key()
self.fernet = Fernet(self.fernet_key)
def generate_certificate(self, common_name: str, domains: Optional[List[str]] = None,
key_size: int = 2048, days: int = 365) -> Dict:
"""Generate a new TLS certificate."""
try:
if self.ca_key is None or self.ca_cert is None:
raise RuntimeError("CA not initialized — cannot generate certificate")
# Generate private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size
)
# Create certificate
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "CA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Personal Internet Cell"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Personal Internet Cell"),
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
])
# Add SAN if domains provided
sans = []
if domains:
sans.extend([x509.DNSName(domain) for domain in domains])
cert_builder = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
self.ca_cert.subject
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(days=days)
).add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
).add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
key_cert_sign=False,
crl_sign=False,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
encipher_only=False,
decipher_only=False
),
critical=True,
).add_extension(
x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]),
critical=False,
)
if sans:
cert_builder = cert_builder.add_extension(
x509.SubjectAlternativeName(sans),
critical=False,
)
certificate = cert_builder.sign(self.ca_key, hashes.SHA256())
# Save certificate and key
cert_file = self.certs_dir / f"{common_name}.crt"
key_file = self.certs_dir / f"{common_name}.key"
with open(cert_file, "wb") as f:
f.write(certificate.public_bytes(serialization.Encoding.PEM))
with open(key_file, "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
# Encrypt private key with Fernet
self._encrypt_file_with_fernet(key_file)
return {
"common_name": common_name,
"domains": domains or [],
"cert_file": str(cert_file),
"key_file": str(key_file),
"serial_number": certificate.serial_number,
"not_valid_before": certificate.not_valid_before.isoformat(),
"not_valid_after": certificate.not_valid_after.isoformat(),
"encrypted": True
}
except Exception as e:
logger.error(f"Failed to generate certificate for {common_name}: {e}")
raise
def _encrypt_file_with_fernet(self, file_path: Path) -> None:
"""Encrypt a file with Fernet."""
try:
with open(file_path, "rb") as f:
content = f.read()
encrypted = self.fernet.encrypt(content)
with open(file_path, "wb") as f:
f.write(encrypted)
logger.info(f"Encrypted {file_path} with Fernet")
except Exception as e:
logger.warning(f"Fernet encryption failed, keeping file unencrypted: {e}")
def _decrypt_file_with_fernet(self, file_path: Path) -> bytes:
"""Decrypt a file with Fernet."""
try:
with open(file_path, "rb") as f:
encrypted = f.read()
return self.fernet.decrypt(encrypted)
except Exception as e:
logger.error(f"Failed to decrypt {file_path}: {e}")
raise
def list_certificates(self) -> List[Dict]:
"""List all certificates."""
certificates = []
for cert_file in self.certs_dir.glob("*.crt"):
try:
with open(cert_file, "rb") as f:
cert = x509.load_pem_x509_certificate(f.read())
key_file = cert_file.with_suffix(".key")
encrypted = key_file.exists()
certificates.append({
"common_name": cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
"serial_number": cert.serial_number,
"not_valid_before": cert.not_valid_before.isoformat(),
"not_valid_after": cert.not_valid_after.isoformat(),
"cert_file": str(cert_file),
"key_file": str(key_file),
"encrypted": encrypted,
"expired": cert.not_valid_after < datetime.utcnow()
})
except Exception as e:
logger.error(f"Failed to read certificate {cert_file}: {e}")
return certificates
def revoke_certificate(self, common_name: str) -> bool:
"""Revoke a certificate."""
try:
cert_file = self.certs_dir / f"{common_name}.crt"
key_file = self.certs_dir / f"{common_name}.key"
if cert_file.exists():
cert_file.unlink()
if key_file.exists():
key_file.unlink()
logger.info(f"Revoked certificate for {common_name}")
return True
except Exception as e:
logger.error(f"Failed to revoke certificate for {common_name}: {e}")
return False
def add_trusted_key(self, name: str, public_key: str, trust_level: str = "direct") -> bool:
"""Add a trusted public key."""
try:
self.trusted_keys[name] = {
"public_key": public_key,
"trust_level": trust_level,
"added_at": datetime.utcnow().isoformat(),
"verified": False
}
self._save_trust_store()
logger.info(f"Added trusted key for {name}")
return True
except Exception as e:
logger.error(f"Failed to add trusted key for {name}: {e}")
return False
def remove_trusted_key(self, name: str) -> bool:
"""Remove a trusted public key."""
try:
if name in self.trusted_keys:
del self.trusted_keys[name]
self._save_trust_store()
logger.info(f"Removed trusted key for {name}")
return True
return False
except Exception as e:
logger.error(f"Failed to remove trusted key for {name}: {e}")
return False
def verify_trust_chain(self, peer_name: str, signature: str, data: str) -> bool:
"""Verify a trust chain signature."""
try:
if peer_name not in self.trusted_keys:
logger.warning(f"Peer {peer_name} not in trusted keys")
return False
# For now, implement basic verification
# In a real implementation, you'd verify the signature cryptographically
trusted_key = self.trusted_keys[peer_name]
# Add to trust chains
self.trust_chains[peer_name] = {
"signature": signature,
"data": data,
"verified_at": datetime.utcnow().isoformat(),
"trust_level": trusted_key["trust_level"]
}
self._save_trust_store()
logger.info(f"Verified trust chain for {peer_name}")
return True
except Exception as e:
logger.error(f"Failed to verify trust chain for {peer_name}: {e}")
return False
def get_ca_certificate(self) -> str:
"""Get CA certificate as PEM string."""
with open(self.ca_cert_file, "r") as f:
return f.read()
def get_age_public_key(self) -> str:
"""Return a dummy Age public key for compatibility."""
# In a real implementation, this would return the actual Age public key
return "age1testkey123456789"
def get_trusted_keys(self) -> Dict:
"""Return trusted keys as a dict (for API compatibility)."""
return self.trusted_keys
def get_trust_chains(self) -> Dict:
"""Return trust chains as a dict (for API compatibility)."""
return self.trust_chains
def get_status(self) -> Dict[str, Any]:
"""Get vault service status"""
try:
# Check CA status
ca_status = self._check_ca_status()
# Check certificates
certificates = self.list_certificates()
# Check trust store
trusted_keys = self.get_trusted_keys()
# Check secrets
secrets = self.list_secrets()
ca_ok = ca_status.get('valid', False)
ca_cert_pem = None
if self.ca_cert_file.exists():
ca_cert_pem = self.ca_cert_file.read_text()
status = {
'running': ca_ok,
'status': 'online' if ca_ok else 'offline',
'ca_configured': ca_ok,
'age_configured': ca_ok,
'age_public_key': None,
'ca_certificate': ca_cert_pem,
'ca_status': ca_status,
'certificates_count': len(certificates),
'certificates': certificates,
'trusted_keys_count': len(trusted_keys),
'trusted_keys': list(trusted_keys.values()) if isinstance(trusted_keys, dict) else trusted_keys,
'trust_chains_count': len(trusted_keys),
'secrets_count': len(secrets),
'timestamp': datetime.utcnow().isoformat()
}
return status
except Exception as e:
return self.handle_error(e, "get_status")
def test_connectivity(self) -> Dict[str, Any]:
"""Test vault service connectivity"""
try:
# Test CA functionality
ca_test = self._test_ca_functionality()
# Test certificate generation
cert_test = self._test_certificate_generation()
# Test encryption/decryption
encryption_test = self._test_encryption_functionality()
# Test trust store
trust_test = self._test_trust_store()
results = {
'ca_functionality': ca_test,
'certificate_generation': cert_test,
'encryption_functionality': encryption_test,
'trust_store': trust_test,
'success': ca_test.get('success', False) and encryption_test.get('success', False),
'timestamp': datetime.utcnow().isoformat()
}
return results
except Exception as e:
return self.handle_error(e, "test_connectivity")
def _check_ca_status(self) -> Dict[str, Any]:
"""Check CA certificate status"""
try:
if not self.ca_cert_file.exists() or not self.ca_key_file.exists():
return {
'valid': False,
'message': 'CA files not found',
'error': 'Missing CA certificate or key'
}
# Check if CA certificate is valid
with open(self.ca_cert_file, "rb") as f:
ca_cert = x509.load_pem_x509_certificate(f.read())
now = datetime.utcnow()
if now < ca_cert.not_valid_before or now > ca_cert.not_valid_after:
return {
'valid': False,
'message': 'CA certificate expired or not yet valid',
'not_valid_before': ca_cert.not_valid_before.isoformat(),
'not_valid_after': ca_cert.not_valid_after.isoformat()
}
return {
'valid': True,
'message': 'CA certificate is valid',
'not_valid_before': ca_cert.not_valid_before.isoformat(),
'not_valid_after': ca_cert.not_valid_after.isoformat(),
'subject': str(ca_cert.subject)
}
except Exception as e:
return {
'valid': False,
'message': f'CA status check failed: {str(e)}',
'error': str(e)
}
def _test_ca_functionality(self) -> Dict[str, Any]:
"""Test CA functionality"""
try:
ca_status = self._check_ca_status()
if not ca_status.get('valid', False):
return {
'success': False,
'message': 'CA is not valid',
'error': ca_status.get('error', 'Unknown CA error')
}
return {
'success': True,
'message': 'CA functionality working',
'ca_valid': True
}
except Exception as e:
return {
'success': False,
'message': f'CA functionality test failed: {str(e)}',
'error': str(e)
}
def _test_certificate_generation(self) -> Dict[str, Any]:
"""Test certificate generation"""
try:
# Test generating a temporary certificate
test_cert = self.generate_certificate(
common_name="test.example.com",
domains=["test.example.com"],
days=1
)
if test_cert.get('success', False):
# Clean up test certificate
cert_file = self.certs_dir / f"test.example.com.crt"
key_file = self.certs_dir / f"test.example.com.key"
if cert_file.exists():
cert_file.unlink()
if key_file.exists():
key_file.unlink()
return {
'success': True,
'message': 'Certificate generation working'
}
else:
return {
'success': False,
'message': 'Certificate generation failed',
'error': test_cert.get('error', 'Unknown error')
}
except Exception as e:
return {
'success': False,
'message': f'Certificate generation test failed: {str(e)}',
'error': str(e)
}
def _test_encryption_functionality(self) -> Dict[str, Any]:
"""Test encryption/decryption functionality"""
try:
# Test Fernet encryption
test_data = b"test_secret_data"
encrypted_data = self.fernet.encrypt(test_data)
decrypted_data = self.fernet.decrypt(encrypted_data)
if decrypted_data == test_data:
return {
'success': True,
'message': 'Encryption/decryption working'
}
else:
return {
'success': False,
'message': 'Encryption/decryption failed - data mismatch'
}
except Exception as e:
return {
'success': False,
'message': f'Encryption test failed: {str(e)}',
'error': str(e)
}
def _test_trust_store(self) -> Dict[str, Any]:
"""Test trust store functionality"""
try:
trusted_keys = self.get_trusted_keys()
trust_chains = self.get_trust_chains()
return {
'success': True,
'message': 'Trust store accessible',
'trusted_keys_count': len(trusted_keys),
'trust_chains_count': len(trust_chains)
}
except Exception as e:
return {
'success': False,
'message': f'Trust store test failed: {str(e)}',
'error': str(e)
}
def _load_trust_store(self) -> None:
"""Load trust store from disk."""
if self.trusted_keys_file.exists():
with open(self.trusted_keys_file, "r") as f:
self.trusted_keys = json.load(f)
else:
self.trusted_keys = {}
if self.trust_chains_file.exists():
with open(self.trust_chains_file, "r") as f:
self.trust_chains = json.load(f)
else:
self.trust_chains = {}
def _save_trust_store(self) -> None:
"""Save trust store to disk."""
with open(self.trusted_keys_file, "w") as f:
json.dump(self.trusted_keys, f, indent=2)
with open(self.trust_chains_file, "w") as f:
json.dump(self.trust_chains, f, indent=2)
def _secrets_file(self):
return self.vault_dir / 'secrets.json'
def _load_secrets(self):
secrets_file = self._secrets_file()
if secrets_file.exists():
with open(secrets_file, 'rb') as f:
data = f.read()
try:
decrypted = self.fernet.decrypt(data)
return json.loads(decrypted.decode('utf-8'))
except Exception:
return {}
return {}
def _save_secrets(self, secrets):
secrets_file = self._secrets_file()
encrypted = self.fernet.encrypt(json.dumps(secrets).encode('utf-8'))
with open(secrets_file, 'wb') as f:
f.write(encrypted)
def store_secret(self, name: str, value: str) -> bool:
secrets = self._load_secrets()
secrets[name] = value
self._save_secrets(secrets)
return True
def get_secret(self, name: str) -> str:
secrets = self._load_secrets()
return secrets.get(name, None)
def list_secrets(self) -> list:
secrets = self._load_secrets()
return list(secrets.keys())
def delete_secret(self, name: str) -> bool:
secrets = self._load_secrets()
if name in secrets:
del secrets[name]
self._save_secrets(secrets)
return True
return False
if __name__ == "__main__":
# Test the VaultManager
vault = VaultManager()
print("Vault Manager initialized successfully")
print(f"CA configured: {vault.ca_cert_file.exists()}")
print(f"Fernet configured: {vault.fernet_key_file.exists()}")
# Generate a test certificate
cert_info = vault.generate_certificate("test.example.com", ["test.example.com", "www.test.example.com"])
print(f"Generated certificate: {cert_info}")
# List certificates
certs = vault.list_certificates()
print(f"Total certificates: {len(certs)}")
# Add a trusted key
vault.add_trusted_key("test-peer", "age1testkey123456789", "direct")
print("Added trusted key")
# Get status
status = vault.get_status()
print(f"Vault status: {status}")