Files
pic/api/config_manager.py
T
roof ac9b26303f fix: restore/import no longer zeros unconfigured services; domain change updates DNS
config_manager restore_config and import_config previously injected zero-filled
entries (port=0, domain='') for every service schema regardless of whether that
service was in the backup/import data. Removed this logic — only restore what's
actually in the backup.

network_manager.apply_domain now:
- updates dnsmasq.conf domain= line (reload cell-dhcp)
- rewrites Corefile zone blocks to the new domain name
- renames and rewrites the primary zone file $ORIGIN + SOA records
- reloads CoreDNS

Tests added first (TDD):
- test_restore_does_not_zero_unconfigured_services
- test_restore_does_not_zero_import
- test_apply_domain_updates_corefile (zone file + Corefile)
- test_apply_domain_updates_dnsmasq
- test_apply_config_writes_dhcp_range / ntp_servers
- test_apply_config_updates_mailserver_env / no_domain_no_restart

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 04:50:10 -04:00

375 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Configuration Manager for Personal Internet Cell
Centralized configuration management for all services
"""
import os
import json
import yaml
import shutil
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Any
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class ConfigManager:
"""Centralized configuration management for all services (unified config)"""
def __init__(self, config_file: str = '/app/config/cell_config.json', data_dir: str = '/app/data'):
config_file = Path(config_file)
if config_file.is_dir():
config_file = config_file / 'cell_config.json'
print(f"[DEBUG] ConfigManager.__init__: config_file = {config_file}")
self.config_file = config_file
self.data_dir = Path(data_dir)
self.backup_dir = self.data_dir / 'config_backups'
self.secrets_file = self.config_file.parent / 'secrets.yaml'
try:
self.backup_dir.mkdir(parents=True, exist_ok=True)
except (PermissionError, OSError):
pass
self.service_schemas = self._load_service_schemas()
self.configs = self._load_all_configs()
if not self.config_file.exists():
self._save_all_configs()
def _load_service_schemas(self) -> Dict[str, Dict]:
"""Load configuration schemas for all services"""
return {
'network': {
'required': ['dns_port', 'dhcp_range', 'ntp_servers'],
'optional': ['dns_zones', 'dhcp_reservations'],
'types': {
'dns_port': int,
'dhcp_range': str,
'ntp_servers': list
}
},
'wireguard': {
'required': ['port', 'private_key', 'address'],
'optional': ['peers', 'allowed_ips'],
'types': {
'port': int,
'private_key': str,
'address': str
}
},
'email': {
'required': ['domain', 'smtp_port', 'imap_port'],
'optional': ['users', 'ssl_cert', 'ssl_key'],
'types': {
'smtp_port': int,
'imap_port': int,
'domain': str
}
},
'calendar': {
'required': ['port', 'data_dir'],
'optional': ['users', 'calendars'],
'types': {
'port': int,
'data_dir': str
}
},
'files': {
'required': ['port', 'data_dir'],
'optional': ['users', 'quota'],
'types': {
'port': int,
'data_dir': str,
'quota': int
}
},
'routing': {
'required': ['nat_enabled', 'firewall_enabled'],
'optional': ['nat_rules', 'firewall_rules', 'peer_routes'],
'types': {
'nat_enabled': bool,
'firewall_enabled': bool
}
},
'vault': {
'required': ['ca_configured', 'fernet_configured'],
'optional': ['certificates', 'trusted_keys'],
'types': {
'ca_configured': bool,
'fernet_configured': bool
}
}
}
def _load_all_configs(self) -> Dict[str, Dict]:
"""Load all existing service configurations"""
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading unified config: {e}")
return {}
return {}
def _save_all_configs(self):
"""Save all service configurations to the unified config file"""
try:
self.config_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_file, 'w') as f:
json.dump(self.configs, f, indent=2)
except (PermissionError, OSError):
pass
def get_service_config(self, service: str) -> Dict[str, Any]:
"""Get configuration for a specific service"""
if service not in self.service_schemas:
raise ValueError(f"Unknown service: {service}")
return self.configs.get(service, {})
def update_service_config(self, service: str, config: Dict[str, Any]) -> bool:
"""Update configuration for a specific service"""
if service not in self.service_schemas:
raise ValueError(f"Unknown service: {service}")
try:
# Validate types only (required fields are checked by validate_config, not here)
schema = self.service_schemas[service]
for field, expected_type in schema['types'].items():
if field in config and not isinstance(config[field], expected_type):
logger.error(f"Invalid type for {field}: expected {expected_type.__name__}")
return False
# Backup current config
self._backup_service_config(service)
# Update configuration
self.configs[service] = config
self._save_all_configs()
logger.info(f"Updated configuration for {service}")
return True
except Exception as e:
logger.error(f"Error updating config for {service}: {e}")
return False
def validate_config(self, service: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Validate configuration for a service"""
if service not in self.service_schemas:
return {
"valid": False,
"errors": [f"Unknown service: {service}"],
"warnings": []
}
schema = self.service_schemas[service]
errors = []
warnings = []
# Check required fields (missing = error, wrong type = error)
for field in schema['required']:
if field not in config:
errors.append(f"Missing required field: {field}")
elif field in schema['types']:
expected_type = schema['types'][field]
if not isinstance(config[field], expected_type):
errors.append(f"Field {field} must be of type {expected_type.__name__}")
# Check optional fields
for field in schema['optional']:
if field in config and field in schema['types']:
expected_type = schema['types'][field]
if not isinstance(config[field], expected_type):
warnings.append(f"Field {field} should be of type {expected_type.__name__}")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings
}
def get_all_configs(self) -> Dict[str, Dict]:
"""Return all stored service configurations."""
return dict(self.configs)
def get_config_summary(self) -> Dict[str, Any]:
"""Return a high-level summary of configuration state."""
backup_count = sum(
1 for p in self.backup_dir.iterdir() if p.is_dir()
) if self.backup_dir.exists() else 0
return {
'total_services': len(self.service_schemas),
'configured_services': len(self.configs),
'backup_count': backup_count,
}
def backup_config(self) -> str:
"""Create a backup of all configurations"""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_id = f"backup_{timestamp}"
backup_path = self.backup_dir / backup_id
# Create backup directory
backup_path.mkdir(parents=True, exist_ok=True)
# Copy all config files
if self.config_file.exists():
shutil.copy2(self.config_file, backup_path / 'cell_config.json')
# Copy secrets file if it exists
if self.secrets_file.exists():
shutil.copy2(self.secrets_file, backup_path / 'secrets.yaml')
# Create backup manifest
manifest = {
"backup_id": backup_id,
"timestamp": datetime.now().isoformat(),
"services": list(self.service_schemas.keys()),
"files": [f.name for f in backup_path.iterdir()]
}
with open(backup_path / 'manifest.json', 'w') as f:
json.dump(manifest, f, indent=2)
logger.info(f"Created configuration backup: {backup_id}")
return backup_id
except Exception as e:
logger.error(f"Error creating backup: {e}")
raise
def restore_config(self, backup_id: str) -> bool:
"""Restore configuration from backup"""
try:
backup_path = self.backup_dir / backup_id
if not backup_path.exists():
raise ValueError(f"Backup {backup_id} not found")
# Read manifest
manifest_file = backup_path / 'manifest.json'
if not manifest_file.exists():
raise ValueError(f"Backup manifest not found")
with open(manifest_file, 'r') as f:
manifest = json.load(f)
# Restore config files
config_backup = backup_path / 'cell_config.json'
if config_backup.exists():
shutil.copy2(config_backup, self.config_file)
# Restore secrets file if it exists
secrets_backup = backup_path / 'secrets.yaml'
if secrets_backup.exists():
shutil.copy2(secrets_backup, self.secrets_file)
# Reload configurations — restore only what was in the backup
self.configs = self._load_all_configs()
logger.info(f"Restored configuration from backup: {backup_id}")
return True
except Exception as e:
logger.error(f"Error restoring backup {backup_id}: {e}")
return False
def list_backups(self) -> List[Dict[str, Any]]:
"""List all available backups"""
backups = []
for backup_dir in self.backup_dir.iterdir():
if backup_dir.is_dir():
manifest_file = backup_dir / 'manifest.json'
if manifest_file.exists():
try:
with open(manifest_file, 'r') as f:
manifest = json.load(f)
backups.append(manifest)
except Exception as e:
logger.error(f"Error reading backup manifest {backup_dir.name}: {e}")
return sorted(backups, key=lambda x: x['timestamp'], reverse=True)
def delete_backup(self, backup_id: str) -> bool:
"""Delete a backup"""
try:
backup_path = self.backup_dir / backup_id
if not backup_path.exists():
raise ValueError(f"Backup {backup_id} not found")
shutil.rmtree(backup_path)
logger.info(f"Deleted backup: {backup_id}")
return True
except Exception as e:
logger.error(f"Error deleting backup {backup_id}: {e}")
return False
def get_config_hash(self, service: str) -> str:
"""Get hash of service configuration for change detection"""
config = self.get_service_config(service)
config_str = json.dumps(config, sort_keys=True)
return hashlib.sha256(config_str.encode()).hexdigest()
def has_config_changed(self, service: str, previous_hash: str) -> bool:
"""Check if configuration has changed"""
current_hash = self.get_config_hash(service)
return current_hash != previous_hash
def export_config(self, format: str = 'json') -> str:
"""Export all configurations in specified format"""
try:
if format == 'json':
return json.dumps(self.configs, indent=2)
elif format == 'yaml':
return yaml.dump(self.configs, default_flow_style=False)
else:
raise ValueError(f"Unsupported format: {format}")
except Exception as e:
logger.error(f"Error exporting config: {e}")
raise
def import_config(self, config_data: str, format: str = 'json') -> bool:
"""Import configurations from string"""
try:
if format == 'json':
configs = json.loads(config_data)
elif format == 'yaml':
configs = yaml.safe_load(config_data)
else:
raise ValueError(f"Unsupported format: {format}")
# Import only services present in the data — don't fabricate missing ones
for service, config in configs.items():
if service in self.service_schemas:
self.update_service_config(service, config)
logger.info("Imported configurations successfully")
return True
except Exception as e:
logger.error(f"Error importing config: {e}")
return False
def _backup_service_config(self, service: str):
"""Create backup of specific service config before update"""
# No-op for unified config, but keep for compatibility
pass
def get_all_configs(self) -> Dict[str, Dict]:
"""Get all service configurations"""
return self.configs.copy()
def get_config_summary(self) -> Dict[str, Any]:
"""Get summary of all configurations"""
summary = {
"total_services": len(self.service_schemas),
"configured_services": [],
"unconfigured_services": [],
"backup_count": len(self.list_backups()),
"last_backup": None
}
backups = self.list_backups()
if backups:
summary["last_backup"] = backups[0]["timestamp"]
for service in self.service_schemas.keys():
config = self.get_service_config(service)
if config and not config.get("error"):
summary["configured_services"].append(service)
else:
summary["unconfigured_services"].append(service)
return summary