Files
pic/api/base_service_manager.py
T
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

198 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""
Base Service Manager for Personal Internet Cell
Provides standardized interface for all service managers
"""
import logging
import json
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
from datetime import datetime
import traceback
logger = logging.getLogger(__name__)
class BaseServiceManager(ABC):
"""Base class for all service managers with standardized interface"""
def __init__(self, service_name: str, data_dir: str = '/app/data', config_dir: str = '/app/config'):
self.service_name = service_name
self.data_dir = data_dir
self.config_dir = config_dir
self.logger = logging.getLogger(f'picell.{service_name}')
# Ensure directories exist
self._ensure_directories()
def _ensure_directories(self):
"""Ensure required directories exist"""
self.safe_makedirs(self.data_dir)
self.safe_makedirs(self.config_dir)
@staticmethod
def safe_makedirs(path: str):
"""Create directory, silently ignoring permission errors (e.g. running outside Docker)."""
import os
try:
os.makedirs(path, exist_ok=True)
except (PermissionError, OSError):
pass
@abstractmethod
def get_status(self) -> Dict[str, Any]:
"""Get service status - must be implemented by subclasses"""
pass
@abstractmethod
def test_connectivity(self) -> Dict[str, Any]:
"""Test service connectivity - must be implemented by subclasses"""
pass
def get_logs(self, lines: int = 50) -> List[str]:
"""Get service logs - default implementation"""
try:
log_file = f"{self.data_dir}/{self.service_name}.log"
import os
if not os.path.exists(log_file):
return [f"No log file found for {self.service_name}"]
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
all_lines = f.readlines()
return all_lines[-lines:] if lines > 0 else all_lines
except Exception as e:
self.logger.error(f"Error reading logs: {e}")
return [f"Error reading logs: {str(e)}"]
def restart_service(self) -> bool:
"""Restart service - default implementation.
Delegates to _restart_container() using self.container_name when set,
otherwise falls back to self.service_name. Subclasses with a known
container name should set self.container_name in their __init__ or
override this method entirely.
"""
try:
name = getattr(self, 'container_name', None) or self.service_name
if not name:
self.logger.warning("restart_service: no container name available; skipping restart")
return False
self.logger.info(f"Restarting {self.service_name} service via container '{name}'")
return self._restart_container(name)
except Exception as e:
self.logger.error(f"Error restarting {self.service_name}: {e}")
return False
def _restart_container(self, container_name: str) -> bool:
"""Restart a Docker container by name."""
import subprocess
try:
result = subprocess.run(
['docker', 'restart', container_name],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
self.logger.info(f"Restarted container {container_name}")
return True
self.logger.error(f"Failed to restart {container_name}: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Error restarting container {container_name}: {e}")
return False
def apply_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Apply config to actual service files and restart. Override in subclasses."""
return {'restarted': [], 'warnings': []}
def get_config(self) -> Dict[str, Any]:
"""Get service configuration - default implementation"""
try:
config_file = f"{self.config_dir}/{self.service_name}.json"
import os
if not os.path.exists(config_file):
return {"error": f"No configuration file found for {self.service_name}"}
with open(config_file, 'r') as f:
return json.load(f)
except Exception as e:
self.logger.error(f"Error reading config: {e}")
return {"error": str(e)}
def update_config(self, config: Dict[str, Any]) -> bool:
"""Update service configuration - default implementation"""
try:
config_file = f"{self.config_dir}/{self.service_name}.json"
import os
os.makedirs(os.path.dirname(config_file), exist_ok=True)
with open(config_file, 'w') as f:
json.dump(config, f, indent=2)
self.logger.info(f"Updated configuration for {self.service_name}")
return True
except Exception as e:
self.logger.error(f"Error updating config: {e}")
return False
def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Validate configuration - default implementation"""
return {
"valid": True,
"errors": [],
"warnings": []
}
def get_metrics(self) -> Dict[str, Any]:
"""Get service metrics - default implementation"""
return {
"service": self.service_name,
"timestamp": datetime.utcnow().isoformat(),
"status": "unknown"
}
def handle_error(self, error: Exception, context: str = "") -> Dict[str, Any]:
"""Standardized error handling"""
error_info = {
"error": str(error),
"type": type(error).__name__,
"context": context,
"timestamp": datetime.utcnow().isoformat(),
"service": self.service_name,
"traceback": traceback.format_exc()
}
self.logger.error(f"Error in {context}: {error}")
return error_info
def log_operation(self, operation: str, details: Dict[str, Any] = None):
"""Log service operations"""
log_data = {
"operation": operation,
"service": self.service_name,
"timestamp": datetime.utcnow().isoformat(),
"details": details or {}
}
self.logger.info(f"Operation: {operation} - {json.dumps(details) if details else 'No details'}")
def health_check(self) -> Dict[str, Any]:
"""Comprehensive health check"""
try:
status = self.get_status()
connectivity = self.test_connectivity()
metrics = self.get_metrics()
return {
"service": self.service_name,
"timestamp": datetime.utcnow().isoformat(),
"status": status,
"connectivity": connectivity,
"metrics": metrics,
"healthy": self._is_healthy(status, connectivity)
}
except Exception as e:
return self.handle_error(e, "health_check")
def _is_healthy(self, status: Dict[str, Any], connectivity: Dict[str, Any]) -> bool:
"""Determine if service is healthy based on status and connectivity"""
# Default implementation - subclasses can override
return status.get("running", False) and connectivity.get("success", False)