Files
pic/api/config_manager.py
T
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

460 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Configuration Manager for Personal Internet Cell
Centralized configuration management for all services
"""
import os
import json
import yaml
import shutil
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Any
from pathlib import Path
import logging
# The Caddyfile lives on a separate volume mount from the rest of config
LIVE_CADDYFILE = os.environ.get('CADDYFILE_PATH', '/app/config-caddy/Caddyfile')
logger = logging.getLogger(__name__)
class ConfigManager:
"""Centralized configuration management for all services (unified config)"""
def __init__(self, config_file: str = '/app/config/cell_config.json', data_dir: str = '/app/data'):
config_file = Path(config_file)
if config_file.is_dir():
config_file = config_file / 'cell_config.json'
print(f"[DEBUG] ConfigManager.__init__: config_file = {config_file}")
self.config_file = config_file
self.data_dir = Path(data_dir)
self.backup_dir = self.data_dir / 'config_backups'
self.secrets_file = self.config_file.parent / 'secrets.yaml'
try:
self.backup_dir.mkdir(parents=True, exist_ok=True)
except (PermissionError, OSError):
pass
self.service_schemas = self._load_service_schemas()
self.configs = self._load_all_configs()
if not self.config_file.exists():
self._save_all_configs()
def _load_service_schemas(self) -> Dict[str, Dict]:
"""Load configuration schemas for all services"""
return {
'network': {
'required': ['dns_port', 'dhcp_range', 'ntp_servers'],
'optional': ['dns_zones', 'dhcp_reservations'],
'types': {
'dns_port': int,
'dhcp_range': str,
'ntp_servers': list
}
},
'wireguard': {
'required': ['port', 'private_key', 'address'],
'optional': ['peers', 'allowed_ips'],
'types': {
'port': int,
'private_key': str,
'address': str
}
},
'email': {
'required': ['domain', 'smtp_port', 'imap_port'],
'optional': ['users', 'ssl_cert', 'ssl_key', 'submission_port', 'webmail_port'],
'types': {
'smtp_port': int,
'submission_port': int,
'imap_port': int,
'webmail_port': int,
'domain': str
}
},
'calendar': {
'required': ['port', 'data_dir'],
'optional': ['users', 'calendars'],
'types': {
'port': int,
'data_dir': str
}
},
'files': {
'required': ['port', 'data_dir'],
'optional': ['users', 'quota', 'manager_port'],
'types': {
'port': int,
'manager_port': int,
'data_dir': str,
'quota': int
}
},
'routing': {
'required': ['nat_enabled', 'firewall_enabled'],
'optional': ['nat_rules', 'firewall_rules', 'peer_routes'],
'types': {
'nat_enabled': bool,
'firewall_enabled': bool
}
},
'vault': {
'required': ['ca_configured', 'fernet_configured'],
'optional': ['certificates', 'trusted_keys'],
'types': {
'ca_configured': bool,
'fernet_configured': bool
}
}
}
def _load_all_configs(self) -> Dict[str, Dict]:
"""Load all existing service configurations"""
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading unified config: {e}")
return {}
return {}
def _save_all_configs(self):
"""Save all service configurations to the unified config file (atomic write)."""
try:
self.config_file.parent.mkdir(parents=True, exist_ok=True)
tmp = self.config_file.with_suffix('.tmp')
with open(tmp, 'w') as f:
json.dump(self.configs, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, self.config_file)
except (PermissionError, OSError):
pass
def get_service_config(self, service: str) -> Dict[str, Any]:
"""Get configuration for a specific service"""
if service not in self.service_schemas:
raise ValueError(f"Unknown service: {service}")
return self.configs.get(service, {})
def update_service_config(self, service: str, config: Dict[str, Any]) -> bool:
"""Update configuration for a specific service"""
if service not in self.service_schemas:
raise ValueError(f"Unknown service: {service}")
try:
# Validate types only (required fields are checked by validate_config, not here)
schema = self.service_schemas[service]
for field, expected_type in schema['types'].items():
if field in config and not isinstance(config[field], expected_type):
logger.error(f"Invalid type for {field}: expected {expected_type.__name__}")
return False
# Backup current config
self._backup_service_config(service)
# Update configuration
self.configs[service] = config
self._save_all_configs()
logger.info(f"Updated configuration for {service}")
return True
except Exception as e:
logger.error(f"Error updating config for {service}: {e}")
return False
def validate_config(self, service: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Validate configuration for a service"""
if service not in self.service_schemas:
return {
"valid": False,
"errors": [f"Unknown service: {service}"],
"warnings": []
}
schema = self.service_schemas[service]
errors = []
warnings = []
# Check required fields (missing = error, wrong type = error)
for field in schema['required']:
if field not in config:
errors.append(f"Missing required field: {field}")
elif field in schema['types']:
expected_type = schema['types'][field]
if not isinstance(config[field], expected_type):
errors.append(f"Field {field} must be of type {expected_type.__name__}")
# Check optional fields
for field in schema['optional']:
if field in config and field in schema['types']:
expected_type = schema['types'][field]
if not isinstance(config[field], expected_type):
warnings.append(f"Field {field} should be of type {expected_type.__name__}")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings
}
def backup_config(self) -> str:
"""Create a backup of cell_config.json, secrets, Caddyfile, .env, Corefile, and DNS zones."""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_id = f"backup_{timestamp}"
backup_path = self.backup_dir / backup_id
backup_path.mkdir(parents=True, exist_ok=True)
# Primary config and secrets
if self.config_file.exists():
shutil.copy2(self.config_file, backup_path / 'cell_config.json')
if self.secrets_file.exists():
shutil.copy2(self.secrets_file, backup_path / 'secrets.yaml')
# Runtime-generated files that must match cell_config.json after restore
config_dir = Path(os.environ.get('CONFIG_DIR', '/app/config'))
data_dir = Path(os.environ.get('DATA_DIR', '/app/data'))
env_file = Path(os.environ.get('ENV_FILE', '/app/.env'))
extra = [
(Path(LIVE_CADDYFILE), 'Caddyfile'),
(config_dir / 'dns' / 'Corefile', 'Corefile'),
(env_file, '.env'),
]
for src, dest_name in extra:
if src.exists():
shutil.copy2(src, backup_path / dest_name)
# DNS zone files
dns_data = data_dir / 'dns'
if dns_data.is_dir():
zones_dir = backup_path / 'dns_zones'
zones_dir.mkdir(exist_ok=True)
for zone_file in dns_data.glob('*.zone'):
shutil.copy2(zone_file, zones_dir / zone_file.name)
services = ['identity'] + list(self.service_schemas.keys())
manifest = {
"backup_id": backup_id,
"timestamp": datetime.now().isoformat(),
"services": services,
"files": [f.name for f in backup_path.iterdir()],
}
with open(backup_path / 'manifest.json', 'w') as f:
json.dump(manifest, f, indent=2)
logger.info(f"Created configuration backup: {backup_id}")
return backup_id
except Exception as e:
logger.error(f"Error creating backup: {e}")
raise
def restore_config(self, backup_id: str, services: list = None) -> bool:
"""Restore from backup. If services list given, only restore those service configs (selective)."""
try:
backup_path = self.backup_dir / backup_id
if not backup_path.exists():
raise ValueError(f"Backup {backup_id} not found")
manifest_file = backup_path / 'manifest.json'
if not manifest_file.exists():
raise ValueError(f"Backup manifest not found")
if services is not None:
# Selective restore: only update specified services in running config
backup_cfg_path = backup_path / 'cell_config.json'
if backup_cfg_path.exists():
with open(backup_cfg_path) as f:
backup_cfg = json.load(f)
for svc in services:
if svc == 'identity':
if '_identity' in backup_cfg:
self.configs['_identity'] = backup_cfg['_identity']
elif svc in backup_cfg:
self.configs[svc] = backup_cfg[svc]
self._save_all_configs()
logger.info(f"Selectively restored {services} from backup: {backup_id}")
return True
# Full restore: copy all files back
config_backup = backup_path / 'cell_config.json'
if config_backup.exists():
shutil.copy2(config_backup, self.config_file)
secrets_backup = backup_path / 'secrets.yaml'
if secrets_backup.exists():
shutil.copy2(secrets_backup, self.secrets_file)
config_dir = Path(os.environ.get('CONFIG_DIR', '/app/config'))
data_dir = Path(os.environ.get('DATA_DIR', '/app/data'))
env_file = Path(os.environ.get('ENV_FILE', '/app/.env'))
restore_map = [
(backup_path / 'Caddyfile', Path(LIVE_CADDYFILE)),
(backup_path / 'Corefile', config_dir / 'dns' / 'Corefile'),
(backup_path / '.env', env_file),
]
for src, dest in restore_map:
if src.exists():
try:
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dest)
except (PermissionError, OSError) as copy_err:
logger.warning(f"Could not restore {dest}: {copy_err} (skipping)")
zones_backup = backup_path / 'dns_zones'
if zones_backup.is_dir():
dns_data = data_dir / 'dns'
try:
dns_data.mkdir(parents=True, exist_ok=True)
for zone_file in zones_backup.glob('*.zone'):
try:
shutil.copy2(zone_file, dns_data / zone_file.name)
except (PermissionError, OSError) as zone_err:
logger.warning(f"Could not restore zone {zone_file.name}: {zone_err} (skipping)")
except (PermissionError, OSError) as dir_err:
logger.warning(f"Could not create dns data dir {dns_data}: {dir_err} (skipping)")
self.configs = self._load_all_configs()
logger.info(f"Restored configuration from backup: {backup_id}")
return True
except Exception as e:
logger.error(f"Error restoring backup {backup_id}: {e}")
return False
def list_backups(self) -> List[Dict[str, Any]]:
"""List all available backups"""
backups = []
for backup_dir in self.backup_dir.iterdir():
if backup_dir.is_dir():
manifest_file = backup_dir / 'manifest.json'
if manifest_file.exists():
try:
with open(manifest_file, 'r') as f:
manifest = json.load(f)
backups.append(manifest)
except Exception as e:
logger.error(f"Error reading backup manifest {backup_dir.name}: {e}")
return sorted(backups, key=lambda x: x['timestamp'], reverse=True)
def delete_backup(self, backup_id: str) -> bool:
"""Delete a backup"""
try:
backup_path = self.backup_dir / backup_id
if not backup_path.exists():
raise ValueError(f"Backup {backup_id} not found")
shutil.rmtree(backup_path)
logger.info(f"Deleted backup: {backup_id}")
return True
except Exception as e:
logger.error(f"Error deleting backup {backup_id}: {e}")
return False
def get_config_hash(self, service: str) -> str:
"""Get hash of service configuration for change detection"""
config = self.get_service_config(service)
config_str = json.dumps(config, sort_keys=True)
return hashlib.sha256(config_str.encode()).hexdigest()
def has_config_changed(self, service: str, previous_hash: str) -> bool:
"""Check if configuration has changed"""
current_hash = self.get_config_hash(service)
return current_hash != previous_hash
def export_config(self, format: str = 'json', services: list = None) -> str:
"""Export service configurations (excludes internal state like pending_restart)."""
try:
export_data = {}
# Include identity under a clean key
if '_identity' in self.configs:
export_data['identity'] = dict(self.configs['_identity'])
# Include service configs, skip internal _ keys
for key, val in self.configs.items():
if key.startswith('_'):
continue
if services is not None and key not in services:
continue
export_data[key] = val
if format == 'json':
return json.dumps(export_data, indent=2)
elif format == 'yaml':
return yaml.dump(export_data, default_flow_style=False)
else:
raise ValueError(f"Unsupported format: {format}")
except Exception as e:
logger.error(f"Error exporting config: {e}")
raise
def import_config(self, config_data: str, format: str = 'json', services: list = None) -> bool:
"""Import configurations from string. Merges into existing config."""
try:
if format == 'json':
configs = json.loads(config_data)
elif format == 'yaml':
configs = yaml.safe_load(config_data)
else:
raise ValueError(f"Unsupported format: {format}")
# Handle identity (exported as 'identity', stored as '_identity')
if 'identity' in configs and (services is None or 'identity' in services):
ident = configs['identity']
cur = dict(self.configs.get('_identity', {}))
for k in ('cell_name', 'domain', 'ip_range', 'wireguard_port'):
if k in ident:
cur[k] = ident[k]
self.configs['_identity'] = cur
# Merge service configs (don't replace wholesale — keep existing fields not in import)
for key, val in configs.items():
if key == 'identity':
continue
if key not in self.service_schemas:
continue
if services is not None and key not in services:
continue
cur_svc = dict(self.configs.get(key, {}))
cur_svc.update(val)
self.configs[key] = cur_svc
self._save_all_configs()
logger.info("Imported configurations successfully")
return True
except Exception as e:
logger.error(f"Error importing config: {e}")
return False
def _backup_service_config(self, service: str):
"""Create backup of specific service config before update"""
# No-op for unified config, but keep for compatibility
pass
def get_all_configs(self) -> Dict[str, Dict]:
"""Get all service configurations"""
return self.configs.copy()
def get_config_summary(self) -> Dict[str, Any]:
"""Get summary of all configurations"""
summary = {
"total_services": len(self.service_schemas),
"configured_services": [],
"unconfigured_services": [],
"backup_count": len(self.list_backups()),
"last_backup": None
}
backups = self.list_backups()
if backups:
summary["last_backup"] = backups[0]["timestamp"]
for service in self.service_schemas.keys():
config = self.get_service_config(service)
if config and not config.get("error"):
summary["configured_services"].append(service)
else:
summary["unconfigured_services"].append(service)
return summary