87ff50c378
Each service manager now has apply_config() that writes to the actual config:
- network: dhcp_range → dnsmasq.conf (reload cell-dhcp), ntp_servers → chrony.conf
(restart cell-ntp), domain → dnsmasq.conf domain= line
- email: domain → mailserver.env OVERRIDE_HOSTNAME + POSTMASTER_ADDRESS,
restart cell-mail
- wireguard: port/address/private_key → wg0.conf ListenPort/Address/PrivateKey,
restart cell-wireguard
- calendar: port → radicale config hosts=, restart cell-radicale
PUT /api/config now calls apply_config() after persisting JSON, and returns
{restarted: [...], warnings: [...]} so Settings UI can show which containers
were restarted. _restart_container() helper added to BaseServiceManager.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
188 lines
7.2 KiB
Python
188 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Base Service Manager for Personal Internet Cell
|
|
Provides standardized interface for all service managers
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, List, Optional, Any
|
|
from datetime import datetime
|
|
import traceback
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class BaseServiceManager(ABC):
|
|
"""Base class for all service managers with standardized interface"""
|
|
|
|
def __init__(self, service_name: str, data_dir: str = '/app/data', config_dir: str = '/app/config'):
|
|
self.service_name = service_name
|
|
self.data_dir = data_dir
|
|
self.config_dir = config_dir
|
|
self.logger = logging.getLogger(f'picell.{service_name}')
|
|
|
|
# Ensure directories exist
|
|
self._ensure_directories()
|
|
|
|
def _ensure_directories(self):
|
|
"""Ensure required directories exist"""
|
|
self.safe_makedirs(self.data_dir)
|
|
self.safe_makedirs(self.config_dir)
|
|
|
|
@staticmethod
|
|
def safe_makedirs(path: str):
|
|
"""Create directory, silently ignoring permission errors (e.g. running outside Docker)."""
|
|
import os
|
|
try:
|
|
os.makedirs(path, exist_ok=True)
|
|
except (PermissionError, OSError):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get service status - must be implemented by subclasses"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def test_connectivity(self) -> Dict[str, Any]:
|
|
"""Test service connectivity - must be implemented by subclasses"""
|
|
pass
|
|
|
|
def get_logs(self, lines: int = 50) -> List[str]:
|
|
"""Get service logs - default implementation"""
|
|
try:
|
|
log_file = f"{self.data_dir}/{self.service_name}.log"
|
|
import os
|
|
if not os.path.exists(log_file):
|
|
return [f"No log file found for {self.service_name}"]
|
|
|
|
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
|
all_lines = f.readlines()
|
|
return all_lines[-lines:] if lines > 0 else all_lines
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading logs: {e}")
|
|
return [f"Error reading logs: {str(e)}"]
|
|
|
|
def restart_service(self) -> bool:
|
|
"""Restart service - default implementation"""
|
|
try:
|
|
self.logger.info(f"Restarting {self.service_name} service")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Error restarting {self.service_name}: {e}")
|
|
return False
|
|
|
|
def _restart_container(self, container_name: str) -> bool:
|
|
"""Restart a Docker container by name."""
|
|
import subprocess
|
|
try:
|
|
result = subprocess.run(
|
|
['docker', 'restart', container_name],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
if result.returncode == 0:
|
|
self.logger.info(f"Restarted container {container_name}")
|
|
return True
|
|
self.logger.error(f"Failed to restart {container_name}: {result.stderr}")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.error(f"Error restarting container {container_name}: {e}")
|
|
return False
|
|
|
|
def apply_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Apply config to actual service files and restart. Override in subclasses."""
|
|
return {'restarted': [], 'warnings': []}
|
|
|
|
def get_config(self) -> Dict[str, Any]:
|
|
"""Get service configuration - default implementation"""
|
|
try:
|
|
config_file = f"{self.config_dir}/{self.service_name}.json"
|
|
import os
|
|
if not os.path.exists(config_file):
|
|
return {"error": f"No configuration file found for {self.service_name}"}
|
|
|
|
with open(config_file, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading config: {e}")
|
|
return {"error": str(e)}
|
|
|
|
def update_config(self, config: Dict[str, Any]) -> bool:
|
|
"""Update service configuration - default implementation"""
|
|
try:
|
|
config_file = f"{self.config_dir}/{self.service_name}.json"
|
|
import os
|
|
os.makedirs(os.path.dirname(config_file), exist_ok=True)
|
|
|
|
with open(config_file, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
|
|
self.logger.info(f"Updated configuration for {self.service_name}")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Error updating config: {e}")
|
|
return False
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Validate configuration - default implementation"""
|
|
return {
|
|
"valid": True,
|
|
"errors": [],
|
|
"warnings": []
|
|
}
|
|
|
|
def get_metrics(self) -> Dict[str, Any]:
|
|
"""Get service metrics - default implementation"""
|
|
return {
|
|
"service": self.service_name,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"status": "unknown"
|
|
}
|
|
|
|
def handle_error(self, error: Exception, context: str = "") -> Dict[str, Any]:
|
|
"""Standardized error handling"""
|
|
error_info = {
|
|
"error": str(error),
|
|
"type": type(error).__name__,
|
|
"context": context,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"service": self.service_name,
|
|
"traceback": traceback.format_exc()
|
|
}
|
|
|
|
self.logger.error(f"Error in {context}: {error}")
|
|
return error_info
|
|
|
|
def log_operation(self, operation: str, details: Dict[str, Any] = None):
|
|
"""Log service operations"""
|
|
log_data = {
|
|
"operation": operation,
|
|
"service": self.service_name,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"details": details or {}
|
|
}
|
|
self.logger.info(f"Operation: {operation} - {json.dumps(details) if details else 'No details'}")
|
|
|
|
def health_check(self) -> Dict[str, Any]:
|
|
"""Comprehensive health check"""
|
|
try:
|
|
status = self.get_status()
|
|
connectivity = self.test_connectivity()
|
|
metrics = self.get_metrics()
|
|
|
|
return {
|
|
"service": self.service_name,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"status": status,
|
|
"connectivity": connectivity,
|
|
"metrics": metrics,
|
|
"healthy": self._is_healthy(status, connectivity)
|
|
}
|
|
except Exception as e:
|
|
return self.handle_error(e, "health_check")
|
|
|
|
def _is_healthy(self, status: Dict[str, Any], connectivity: Dict[str, Any]) -> bool:
|
|
"""Determine if service is healthy based on status and connectivity"""
|
|
# Default implementation - subclasses can override
|
|
return status.get("running", False) and connectivity.get("success", False) |