687 lines
25 KiB
Python
687 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
VaultManager - Secure Certificate Management and Trust Systems
|
|
|
|
Handles:
|
|
- Self-hosted Certificate Authority (CA)
|
|
- TLS certificate generation and management
|
|
- Age encryption for sensitive data
|
|
- Trust management and verification
|
|
- Certificate lifecycle management
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import tempfile
|
|
import shutil
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
import logging
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
|
import base64
|
|
from cryptography.fernet import Fernet
|
|
from base_service_manager import BaseServiceManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VaultManager(BaseServiceManager):
|
|
"""Manages secure certificate authority, trust systems, and encrypted storage."""
|
|
|
|
def __init__(self, config_dir: str = "config", data_dir: str = "data"):
|
|
super().__init__('vault', data_dir, config_dir)
|
|
self.config_dir = Path(config_dir)
|
|
self.data_dir = Path(data_dir)
|
|
self.vault_dir = self.data_dir / "vault"
|
|
self.ca_dir = self.vault_dir / "ca"
|
|
self.certs_dir = self.vault_dir / "certs"
|
|
self.keys_dir = self.vault_dir / "keys"
|
|
self.trust_dir = self.vault_dir / "trust"
|
|
|
|
# Create directories
|
|
for directory in [self.vault_dir, self.ca_dir, self.certs_dir, self.keys_dir, self.trust_dir]:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
# CA files
|
|
self.ca_key_file = self.ca_dir / "ca.key"
|
|
self.ca_cert_file = self.ca_dir / "ca.crt"
|
|
self.ca_config_file = self.ca_dir / "ca.conf"
|
|
|
|
# Fernet encryption
|
|
self.fernet_key_file = self.keys_dir / "fernet.key"
|
|
self._load_or_create_fernet_key()
|
|
|
|
# Trust store
|
|
self.trusted_keys_file = self.trust_dir / "trusted_keys.json"
|
|
self.trust_chains_file = self.trust_dir / "trust_chains.json"
|
|
|
|
self.trusted_keys = {}
|
|
self.trust_chains = {}
|
|
self._load_or_create_ca()
|
|
self._load_trust_store()
|
|
|
|
def _load_or_create_ca(self) -> None:
|
|
"""Load existing CA or create new one."""
|
|
if self.ca_key_file.exists() and self.ca_cert_file.exists():
|
|
logger.info("Loading existing CA")
|
|
self._load_ca()
|
|
else:
|
|
logger.info("Creating new CA")
|
|
self._create_ca()
|
|
|
|
def _create_ca(self) -> None:
|
|
"""Create a new Certificate Authority."""
|
|
# Generate CA private key
|
|
ca_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=4096
|
|
)
|
|
|
|
# Save CA private key
|
|
with open(self.ca_key_file, "wb") as f:
|
|
f.write(ca_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
))
|
|
|
|
# Create CA certificate
|
|
subject = issuer = x509.Name([
|
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
|
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "CA"),
|
|
x509.NameAttribute(NameOID.LOCALITY_NAME, "Personal Internet Cell"),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Personal Internet Cell CA"),
|
|
x509.NameAttribute(NameOID.COMMON_NAME, "Personal Internet Cell Root CA"),
|
|
])
|
|
|
|
ca_cert = x509.CertificateBuilder().subject_name(
|
|
subject
|
|
).issuer_name(
|
|
issuer
|
|
).public_key(
|
|
ca_key.public_key()
|
|
).serial_number(
|
|
x509.random_serial_number()
|
|
).not_valid_before(
|
|
datetime.utcnow()
|
|
).not_valid_after(
|
|
datetime.utcnow() + timedelta(days=3650) # 10 years
|
|
).add_extension(
|
|
x509.BasicConstraints(ca=True, path_length=None),
|
|
critical=True,
|
|
).add_extension(
|
|
x509.KeyUsage(
|
|
digital_signature=True,
|
|
key_encipherment=True,
|
|
key_cert_sign=True,
|
|
crl_sign=True,
|
|
content_commitment=False,
|
|
data_encipherment=False,
|
|
key_agreement=False,
|
|
encipher_only=False,
|
|
decipher_only=False
|
|
),
|
|
critical=True,
|
|
).sign(ca_key, hashes.SHA256())
|
|
|
|
# Save CA certificate
|
|
with open(self.ca_cert_file, "wb") as f:
|
|
f.write(ca_cert.public_bytes(serialization.Encoding.PEM))
|
|
|
|
self.ca_key = ca_key
|
|
self.ca_cert = ca_cert
|
|
logger.info("CA created successfully")
|
|
|
|
def _load_ca(self) -> None:
|
|
"""Load existing CA key and certificate."""
|
|
with open(self.ca_key_file, "rb") as f:
|
|
self.ca_key = load_pem_private_key(f.read(), password=None)
|
|
|
|
with open(self.ca_cert_file, "rb") as f:
|
|
self.ca_cert = x509.load_pem_x509_certificate(f.read())
|
|
|
|
logger.info("CA loaded successfully")
|
|
|
|
def _load_or_create_fernet_key(self) -> None:
|
|
"""Load existing Fernet key or create a new one."""
|
|
if self.fernet_key_file.exists():
|
|
with open(self.fernet_key_file, "rb") as f:
|
|
self.fernet_key = f.read()
|
|
else:
|
|
self.fernet_key = Fernet.generate_key()
|
|
with open(self.fernet_key_file, "wb") as f:
|
|
f.write(self.fernet_key)
|
|
self.fernet = Fernet(self.fernet_key)
|
|
|
|
def generate_certificate(self, common_name: str, domains: Optional[List[str]] = None,
|
|
key_size: int = 2048, days: int = 365) -> Dict:
|
|
"""Generate a new TLS certificate."""
|
|
try:
|
|
# Generate private key
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=key_size
|
|
)
|
|
|
|
# Create certificate
|
|
subject = x509.Name([
|
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
|
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "CA"),
|
|
x509.NameAttribute(NameOID.LOCALITY_NAME, "Personal Internet Cell"),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Personal Internet Cell"),
|
|
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
|
|
])
|
|
|
|
# Add SAN if domains provided
|
|
sans = []
|
|
if domains:
|
|
sans.extend([x509.DNSName(domain) for domain in domains])
|
|
|
|
cert_builder = x509.CertificateBuilder().subject_name(
|
|
subject
|
|
).issuer_name(
|
|
self.ca_cert.subject
|
|
).public_key(
|
|
private_key.public_key()
|
|
).serial_number(
|
|
x509.random_serial_number()
|
|
).not_valid_before(
|
|
datetime.utcnow()
|
|
).not_valid_after(
|
|
datetime.utcnow() + timedelta(days=days)
|
|
).add_extension(
|
|
x509.BasicConstraints(ca=False, path_length=None),
|
|
critical=True,
|
|
).add_extension(
|
|
x509.KeyUsage(
|
|
digital_signature=True,
|
|
key_encipherment=True,
|
|
key_cert_sign=False,
|
|
crl_sign=False,
|
|
content_commitment=False,
|
|
data_encipherment=False,
|
|
key_agreement=False,
|
|
encipher_only=False,
|
|
decipher_only=False
|
|
),
|
|
critical=True,
|
|
).add_extension(
|
|
x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]),
|
|
critical=False,
|
|
)
|
|
|
|
if sans:
|
|
cert_builder = cert_builder.add_extension(
|
|
x509.SubjectAlternativeName(sans),
|
|
critical=False,
|
|
)
|
|
|
|
certificate = cert_builder.sign(self.ca_key, hashes.SHA256())
|
|
|
|
# Save certificate and key
|
|
cert_file = self.certs_dir / f"{common_name}.crt"
|
|
key_file = self.certs_dir / f"{common_name}.key"
|
|
|
|
with open(cert_file, "wb") as f:
|
|
f.write(certificate.public_bytes(serialization.Encoding.PEM))
|
|
|
|
with open(key_file, "wb") as f:
|
|
f.write(private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
))
|
|
|
|
# Encrypt private key with Fernet
|
|
self._encrypt_file_with_fernet(key_file)
|
|
|
|
return {
|
|
"common_name": common_name,
|
|
"domains": domains or [],
|
|
"cert_file": str(cert_file),
|
|
"key_file": str(key_file),
|
|
"serial_number": certificate.serial_number,
|
|
"not_valid_before": certificate.not_valid_before.isoformat(),
|
|
"not_valid_after": certificate.not_valid_after.isoformat(),
|
|
"encrypted": True
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate certificate for {common_name}: {e}")
|
|
raise
|
|
|
|
def _encrypt_file_with_fernet(self, file_path: Path) -> None:
|
|
"""Encrypt a file with Fernet."""
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
content = f.read()
|
|
encrypted = self.fernet.encrypt(content)
|
|
with open(file_path, "wb") as f:
|
|
f.write(encrypted)
|
|
logger.info(f"Encrypted {file_path} with Fernet")
|
|
except Exception as e:
|
|
logger.warning(f"Fernet encryption failed, keeping file unencrypted: {e}")
|
|
|
|
def _decrypt_file_with_fernet(self, file_path: Path) -> bytes:
|
|
"""Decrypt a file with Fernet."""
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
encrypted = f.read()
|
|
return self.fernet.decrypt(encrypted)
|
|
except Exception as e:
|
|
logger.error(f"Failed to decrypt {file_path}: {e}")
|
|
raise
|
|
|
|
def list_certificates(self) -> List[Dict]:
|
|
"""List all certificates."""
|
|
certificates = []
|
|
|
|
for cert_file in self.certs_dir.glob("*.crt"):
|
|
try:
|
|
with open(cert_file, "rb") as f:
|
|
cert = x509.load_pem_x509_certificate(f.read())
|
|
|
|
key_file = cert_file.with_suffix(".key")
|
|
encrypted = key_file.exists()
|
|
|
|
certificates.append({
|
|
"common_name": cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
|
|
"serial_number": cert.serial_number,
|
|
"not_valid_before": cert.not_valid_before.isoformat(),
|
|
"not_valid_after": cert.not_valid_after.isoformat(),
|
|
"cert_file": str(cert_file),
|
|
"key_file": str(key_file),
|
|
"encrypted": encrypted,
|
|
"expired": cert.not_valid_after < datetime.utcnow()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to read certificate {cert_file}: {e}")
|
|
|
|
return certificates
|
|
|
|
def revoke_certificate(self, common_name: str) -> bool:
|
|
"""Revoke a certificate."""
|
|
try:
|
|
cert_file = self.certs_dir / f"{common_name}.crt"
|
|
key_file = self.certs_dir / f"{common_name}.key"
|
|
|
|
if cert_file.exists():
|
|
cert_file.unlink()
|
|
if key_file.exists():
|
|
key_file.unlink()
|
|
|
|
logger.info(f"Revoked certificate for {common_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to revoke certificate for {common_name}: {e}")
|
|
return False
|
|
|
|
def add_trusted_key(self, name: str, public_key: str, trust_level: str = "direct") -> bool:
|
|
"""Add a trusted public key."""
|
|
try:
|
|
self.trusted_keys[name] = {
|
|
"public_key": public_key,
|
|
"trust_level": trust_level,
|
|
"added_at": datetime.utcnow().isoformat(),
|
|
"verified": False
|
|
}
|
|
self._save_trust_store()
|
|
logger.info(f"Added trusted key for {name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to add trusted key for {name}: {e}")
|
|
return False
|
|
|
|
def remove_trusted_key(self, name: str) -> bool:
|
|
"""Remove a trusted public key."""
|
|
try:
|
|
if name in self.trusted_keys:
|
|
del self.trusted_keys[name]
|
|
self._save_trust_store()
|
|
logger.info(f"Removed trusted key for {name}")
|
|
return True
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to remove trusted key for {name}: {e}")
|
|
return False
|
|
|
|
def verify_trust_chain(self, peer_name: str, signature: str, data: str) -> bool:
|
|
"""Verify a trust chain signature."""
|
|
try:
|
|
if peer_name not in self.trusted_keys:
|
|
logger.warning(f"Peer {peer_name} not in trusted keys")
|
|
return False
|
|
|
|
# For now, implement basic verification
|
|
# In a real implementation, you'd verify the signature cryptographically
|
|
trusted_key = self.trusted_keys[peer_name]
|
|
|
|
# Add to trust chains
|
|
self.trust_chains[peer_name] = {
|
|
"signature": signature,
|
|
"data": data,
|
|
"verified_at": datetime.utcnow().isoformat(),
|
|
"trust_level": trusted_key["trust_level"]
|
|
}
|
|
self._save_trust_store()
|
|
|
|
logger.info(f"Verified trust chain for {peer_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to verify trust chain for {peer_name}: {e}")
|
|
return False
|
|
|
|
def get_ca_certificate(self) -> str:
|
|
"""Get CA certificate as PEM string."""
|
|
with open(self.ca_cert_file, "r") as f:
|
|
return f.read()
|
|
|
|
def get_age_public_key(self) -> str:
|
|
"""Return a dummy Age public key for compatibility."""
|
|
# In a real implementation, this would return the actual Age public key
|
|
return "age1testkey123456789"
|
|
|
|
def get_trusted_keys(self) -> Dict:
|
|
"""Return trusted keys as a dict (for API compatibility)."""
|
|
return self.trusted_keys
|
|
|
|
def get_trust_chains(self) -> Dict:
|
|
"""Return trust chains as a dict (for API compatibility)."""
|
|
return self.trust_chains
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get vault service status"""
|
|
try:
|
|
# Check CA status
|
|
ca_status = self._check_ca_status()
|
|
|
|
# Check certificates
|
|
certificates = self.list_certificates()
|
|
|
|
# Check trust store
|
|
trusted_keys = self.get_trusted_keys()
|
|
|
|
# Check secrets
|
|
secrets = self.list_secrets()
|
|
|
|
status = {
|
|
'running': ca_status.get('valid', False),
|
|
'status': 'online' if ca_status.get('valid', False) else 'offline',
|
|
'ca_status': ca_status,
|
|
'certificates_count': len(certificates),
|
|
'trusted_keys_count': len(trusted_keys),
|
|
'secrets_count': len(secrets),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return status
|
|
except Exception as e:
|
|
return self.handle_error(e, "get_status")
|
|
|
|
def test_connectivity(self) -> Dict[str, Any]:
|
|
"""Test vault service connectivity"""
|
|
try:
|
|
# Test CA functionality
|
|
ca_test = self._test_ca_functionality()
|
|
|
|
# Test certificate generation
|
|
cert_test = self._test_certificate_generation()
|
|
|
|
# Test encryption/decryption
|
|
encryption_test = self._test_encryption_functionality()
|
|
|
|
# Test trust store
|
|
trust_test = self._test_trust_store()
|
|
|
|
results = {
|
|
'ca_functionality': ca_test,
|
|
'certificate_generation': cert_test,
|
|
'encryption_functionality': encryption_test,
|
|
'trust_store': trust_test,
|
|
'success': ca_test.get('success', False) and encryption_test.get('success', False),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return results
|
|
except Exception as e:
|
|
return self.handle_error(e, "test_connectivity")
|
|
|
|
def _check_ca_status(self) -> Dict[str, Any]:
|
|
"""Check CA certificate status"""
|
|
try:
|
|
if not self.ca_cert_file.exists() or not self.ca_key_file.exists():
|
|
return {
|
|
'valid': False,
|
|
'message': 'CA files not found',
|
|
'error': 'Missing CA certificate or key'
|
|
}
|
|
|
|
# Check if CA certificate is valid
|
|
with open(self.ca_cert_file, "rb") as f:
|
|
ca_cert = x509.load_pem_x509_certificate(f.read())
|
|
|
|
now = datetime.utcnow()
|
|
if now < ca_cert.not_valid_before or now > ca_cert.not_valid_after:
|
|
return {
|
|
'valid': False,
|
|
'message': 'CA certificate expired or not yet valid',
|
|
'not_valid_before': ca_cert.not_valid_before.isoformat(),
|
|
'not_valid_after': ca_cert.not_valid_after.isoformat()
|
|
}
|
|
|
|
return {
|
|
'valid': True,
|
|
'message': 'CA certificate is valid',
|
|
'not_valid_before': ca_cert.not_valid_before.isoformat(),
|
|
'not_valid_after': ca_cert.not_valid_after.isoformat(),
|
|
'subject': str(ca_cert.subject)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'valid': False,
|
|
'message': f'CA status check failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _test_ca_functionality(self) -> Dict[str, Any]:
|
|
"""Test CA functionality"""
|
|
try:
|
|
ca_status = self._check_ca_status()
|
|
|
|
if not ca_status.get('valid', False):
|
|
return {
|
|
'success': False,
|
|
'message': 'CA is not valid',
|
|
'error': ca_status.get('error', 'Unknown CA error')
|
|
}
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'CA functionality working',
|
|
'ca_valid': True
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'CA functionality test failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _test_certificate_generation(self) -> Dict[str, Any]:
|
|
"""Test certificate generation"""
|
|
try:
|
|
# Test generating a temporary certificate
|
|
test_cert = self.generate_certificate(
|
|
common_name="test.example.com",
|
|
domains=["test.example.com"],
|
|
days=1
|
|
)
|
|
|
|
if test_cert.get('success', False):
|
|
# Clean up test certificate
|
|
cert_file = self.certs_dir / f"test.example.com.crt"
|
|
key_file = self.certs_dir / f"test.example.com.key"
|
|
|
|
if cert_file.exists():
|
|
cert_file.unlink()
|
|
if key_file.exists():
|
|
key_file.unlink()
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Certificate generation working'
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'message': 'Certificate generation failed',
|
|
'error': test_cert.get('error', 'Unknown error')
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'Certificate generation test failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _test_encryption_functionality(self) -> Dict[str, Any]:
|
|
"""Test encryption/decryption functionality"""
|
|
try:
|
|
# Test Fernet encryption
|
|
test_data = b"test_secret_data"
|
|
encrypted_data = self.fernet.encrypt(test_data)
|
|
decrypted_data = self.fernet.decrypt(encrypted_data)
|
|
|
|
if decrypted_data == test_data:
|
|
return {
|
|
'success': True,
|
|
'message': 'Encryption/decryption working'
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'message': 'Encryption/decryption failed - data mismatch'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'Encryption test failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _test_trust_store(self) -> Dict[str, Any]:
|
|
"""Test trust store functionality"""
|
|
try:
|
|
trusted_keys = self.get_trusted_keys()
|
|
trust_chains = self.get_trust_chains()
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Trust store accessible',
|
|
'trusted_keys_count': len(trusted_keys),
|
|
'trust_chains_count': len(trust_chains)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'Trust store test failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _load_trust_store(self) -> None:
|
|
"""Load trust store from disk."""
|
|
if self.trusted_keys_file.exists():
|
|
with open(self.trusted_keys_file, "r") as f:
|
|
self.trusted_keys = json.load(f)
|
|
else:
|
|
self.trusted_keys = {}
|
|
if self.trust_chains_file.exists():
|
|
with open(self.trust_chains_file, "r") as f:
|
|
self.trust_chains = json.load(f)
|
|
else:
|
|
self.trust_chains = {}
|
|
|
|
def _save_trust_store(self) -> None:
|
|
"""Save trust store to disk."""
|
|
with open(self.trusted_keys_file, "w") as f:
|
|
json.dump(self.trusted_keys, f, indent=2)
|
|
with open(self.trust_chains_file, "w") as f:
|
|
json.dump(self.trust_chains, f, indent=2)
|
|
|
|
def _secrets_file(self):
|
|
return self.vault_dir / 'secrets.json'
|
|
|
|
def _load_secrets(self):
|
|
secrets_file = self._secrets_file()
|
|
if secrets_file.exists():
|
|
with open(secrets_file, 'rb') as f:
|
|
data = f.read()
|
|
try:
|
|
decrypted = self.fernet.decrypt(data)
|
|
return json.loads(decrypted.decode('utf-8'))
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
def _save_secrets(self, secrets):
|
|
secrets_file = self._secrets_file()
|
|
encrypted = self.fernet.encrypt(json.dumps(secrets).encode('utf-8'))
|
|
with open(secrets_file, 'wb') as f:
|
|
f.write(encrypted)
|
|
|
|
def store_secret(self, name: str, value: str) -> bool:
|
|
secrets = self._load_secrets()
|
|
secrets[name] = value
|
|
self._save_secrets(secrets)
|
|
return True
|
|
|
|
def get_secret(self, name: str) -> str:
|
|
secrets = self._load_secrets()
|
|
return secrets.get(name, None)
|
|
|
|
def list_secrets(self) -> list:
|
|
secrets = self._load_secrets()
|
|
return list(secrets.keys())
|
|
|
|
def delete_secret(self, name: str) -> bool:
|
|
secrets = self._load_secrets()
|
|
if name in secrets:
|
|
del secrets[name]
|
|
self._save_secrets(secrets)
|
|
return True
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test the VaultManager
|
|
vault = VaultManager()
|
|
print("Vault Manager initialized successfully")
|
|
print(f"CA configured: {vault.ca_cert_file.exists()}")
|
|
print(f"Fernet configured: {vault.fernet_key_file.exists()}")
|
|
|
|
# Generate a test certificate
|
|
cert_info = vault.generate_certificate("test.example.com", ["test.example.com", "www.test.example.com"])
|
|
print(f"Generated certificate: {cert_info}")
|
|
|
|
# List certificates
|
|
certs = vault.list_certificates()
|
|
print(f"Total certificates: {len(certs)}")
|
|
|
|
# Add a trusted key
|
|
vault.add_trusted_key("test-peer", "age1testkey123456789", "direct")
|
|
print("Added trusted key")
|
|
|
|
# Get status
|
|
status = vault.get_status()
|
|
print(f"Vault status: {status}") |