Files
pic/api/service_bus.py
T
Constantin 47c2beaf96 fix for bus
2025-09-13 14:42:32 +03:00

383 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Service Bus for Personal Internet Cell
Event-driven service communication and orchestration
"""
import asyncio
import json
import logging
from datetime import datetime
from typing import Dict, List, Callable, Any, Optional
from collections import defaultdict
import threading
import queue
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class EventType(Enum):
"""Event types for service communication"""
SERVICE_STARTED = "service_started"
SERVICE_STOPPED = "service_stopped"
SERVICE_RESTARTED = "service_restarted"
CONFIG_CHANGED = "config_changed"
HEALTH_CHECK = "health_check"
ERROR_OCCURRED = "error_occurred"
PEER_CONNECTED = "peer_connected"
PEER_DISCONNECTED = "peer_disconnected"
SECRET_ROTATED = "secret_rotated"
CERTIFICATE_EXPIRING = "certificate_expiring"
BACKUP_CREATED = "backup_created"
RESTORE_COMPLETED = "restore_completed"
@dataclass
class Event:
"""Event data structure"""
event_type: EventType
source: str
data: Dict[str, Any]
timestamp: datetime
event_id: str
class ServiceBus:
"""Event-driven service communication bus"""
def __init__(self):
self.event_handlers: Dict[EventType, List[Callable]] = defaultdict(list)
self.service_registry: Dict[str, Any] = {}
self.event_queue = queue.Queue()
self.running = False
self.event_loop_thread = None
self.event_history: List[Event] = []
self.max_history = 1000
# Service dependency mapping
self.service_dependencies: Dict[str, List[str]] = {
'wireguard': ['network'],
'email': ['network', 'vault'],
'calendar': ['network', 'vault'],
'files': ['network', 'vault'],
'routing': ['network', 'wireguard'],
'vault': ['network']
}
# Service lifecycle hooks
self.lifecycle_hooks: Dict[str, Dict[str, Callable]] = defaultdict(dict)
def start(self):
"""Start the service bus"""
if self.running:
return
self.running = True
self.event_loop_thread = threading.Thread(target=self._event_loop, daemon=True)
self.event_loop_thread.start()
logger.info("Service bus started")
def stop(self):
"""Stop the service bus"""
self.running = False
if self.event_loop_thread:
self.event_loop_thread.join(timeout=5)
logger.info("Service bus stopped")
def register_service(self, name: str, service: Any):
"""Register a service with the bus"""
self.service_registry[name] = service
logger.info(f"Registered service: {name}")
# Publish service started event
self.publish_event(EventType.SERVICE_STARTED, name, {
"service": name,
"timestamp": datetime.utcnow().isoformat()
})
def unregister_service(self, name: str):
"""Unregister a service from the bus"""
if name in self.service_registry:
del self.service_registry[name]
logger.info(f"Unregistered service: {name}")
# Publish service stopped event
self.publish_event(EventType.SERVICE_STOPPED, name, {
"service": name,
"timestamp": datetime.utcnow().isoformat()
})
def publish_event(self, event_type: EventType, source: str, data: Dict[str, Any]):
"""Publish an event to the bus"""
import uuid
event = Event(
event_type=event_type,
source=source,
data=data,
timestamp=datetime.utcnow(),
event_id=str(uuid.uuid4())
)
self.event_queue.put(event)
logger.debug(f"Published event: {event_type.value} from {source}")
def subscribe_to_event(self, event_type: EventType, handler: Callable[[Event], None]):
"""Subscribe to an event type"""
self.event_handlers[event_type].append(handler)
logger.info(f"Subscribed to event: {event_type.value}")
def unsubscribe_from_event(self, event_type: EventType, handler: Callable[[Event], None]):
"""Unsubscribe from an event type"""
if event_type in self.event_handlers:
try:
self.event_handlers[event_type].remove(handler)
logger.info(f"Unsubscribed from event: {event_type.value}")
except ValueError:
logger.warning(f"Handler not found for event: {event_type.value}")
def call_service(self, service_name: str, method: str, **kwargs) -> Any:
"""Call a method on a registered service"""
if service_name not in self.service_registry:
raise ValueError(f"Service {service_name} not registered")
service = self.service_registry[service_name]
if not hasattr(service, method):
raise ValueError(f"Method {method} not found on service {service_name}")
try:
result = getattr(service, method)(**kwargs)
logger.debug(f"Called {service_name}.{method}")
return result
except Exception as e:
logger.error(f"Error calling {service_name}.{method}: {e}")
self.publish_event(EventType.ERROR_OCCURRED, service_name, {
"error": str(e),
"method": method,
"service": service_name
})
raise
def get_service(self, service_name: str) -> Any:
"""Get a registered service"""
return self.service_registry.get(service_name)
def list_services(self) -> List[str]:
"""List all registered services"""
return list(self.service_registry.keys())
def add_lifecycle_hook(self, service_name: str, hook_type: str, hook: Callable):
"""Add a lifecycle hook for a service"""
self.lifecycle_hooks[service_name][hook_type] = hook
logger.info(f"Added {hook_type} hook for {service_name}")
def remove_lifecycle_hook(self, service_name: str, hook_type: str):
"""Remove a lifecycle hook for a service"""
if service_name in self.lifecycle_hooks and hook_type in self.lifecycle_hooks[service_name]:
del self.lifecycle_hooks[service_name][hook_type]
logger.info(f"Removed {hook_type} hook for {service_name}")
def orchestrate_service_start(self, service_name: str) -> bool:
"""Orchestrate starting a service with its dependencies"""
try:
# Map service names to Docker container names and their dependencies
service_to_containers = {
'wireguard': ['cell-wireguard'],
'email': ['cell-mail', 'cell-rainloop'], # Email service includes both mail server and web client
'calendar': ['cell-radicale'],
'files': ['cell-webdav', 'cell-filegator'], # Files service includes both webdav and file manager
'network': ['cell-dns', 'cell-dhcp', 'cell-ntp'], # Network service includes all network components
'routing': None, # Routing is a system service, not a container
'vault': None, # Vault is part of API, not a separate container
'container': None # Container manager doesn't have its own container
}
containers = service_to_containers.get(service_name)
if containers is None:
# For services without containers (routing, vault, container), just call their start method
if hasattr(self.service_registry[service_name], 'start'):
self.service_registry[service_name].start()
logger.info(f"Started service (no container): {service_name}")
return True
# For services with containers, start all required Docker containers
if 'container' in self.service_registry:
container_manager = self.service_registry['container']
all_success = True
for container_name in containers:
success = container_manager.start_container(container_name)
if success:
logger.info(f"Started container {container_name} for service {service_name}")
else:
logger.error(f"Failed to start container {container_name} for service {service_name}")
all_success = False
if all_success:
logger.info(f"Started all containers for service {service_name}: {containers}")
return True
else:
logger.error(f"Failed to start some containers for service {service_name}")
return False
else:
logger.error("Container manager not available")
return False
except Exception as e:
logger.error(f"Error orchestrating start of {service_name}: {e}")
return False
def orchestrate_service_stop(self, service_name: str) -> bool:
"""Orchestrate stopping a service"""
try:
# Map service names to Docker container names and their dependencies
service_to_containers = {
'wireguard': ['cell-wireguard'],
'email': ['cell-mail', 'cell-rainloop'], # Email service includes both mail server and web client
'calendar': ['cell-radicale'],
'files': ['cell-webdav', 'cell-filegator'], # Files service includes both webdav and file manager
'network': ['cell-dns', 'cell-dhcp', 'cell-ntp'], # Network service includes all network components
'routing': None, # Routing is a system service, not a container
'vault': None, # Vault is part of API, not a separate container
'container': None # Container manager doesn't have its own container
}
containers = service_to_containers.get(service_name)
if containers is None:
# For services without containers (routing, vault, container), just call their stop method
if hasattr(self.service_registry[service_name], 'stop'):
self.service_registry[service_name].stop()
logger.info(f"Stopped service (no container): {service_name}")
return True
# For services with containers, stop all required Docker containers
if 'container' in self.service_registry:
container_manager = self.service_registry['container']
all_success = True
for container_name in containers:
success = container_manager.stop_container(container_name)
if success:
logger.info(f"Stopped container {container_name} for service {service_name}")
else:
logger.error(f"Failed to stop container {container_name} for service {service_name}")
all_success = False
if all_success:
logger.info(f"Stopped all containers for service {service_name}: {containers}")
return True
else:
logger.error(f"Failed to stop some containers for service {service_name}")
return False
else:
logger.error("Container manager not available")
return False
except Exception as e:
logger.error(f"Error orchestrating stop of {service_name}: {e}")
return False
def orchestrate_service_restart(self, service_name: str) -> bool:
"""Orchestrate restarting a service"""
try:
if self.orchestrate_service_stop(service_name):
return self.orchestrate_service_start(service_name)
return False
except Exception as e:
logger.error(f"Error orchestrating restart of {service_name}: {e}")
return False
def get_event_history(self, event_type: Optional[EventType] = None,
source: Optional[str] = None, limit: int = 100) -> List[Event]:
"""Get event history with optional filtering"""
events = self.event_history
if event_type:
events = [e for e in events if e.event_type == event_type]
if source:
events = [e for e in events if e.source == source]
return events[-limit:]
def clear_event_history(self):
"""Clear event history"""
self.event_history.clear()
logger.info("Event history cleared")
def _event_loop(self):
"""Main event processing loop"""
while self.running:
try:
# Get event from queue with timeout
event = self.event_queue.get(timeout=1)
# Add to history
self.event_history.append(event)
if len(self.event_history) > self.max_history:
self.event_history.pop(0)
# Process event handlers
handlers = self.event_handlers.get(event.event_type, [])
for handler in handlers:
try:
handler(event)
except Exception as e:
logger.error(f"Error in event handler for {event.event_type.value}: {e}")
# Mark task as done
self.event_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error in event loop: {e}")
def get_service_dependencies(self, service_name: str) -> List[str]:
"""Get dependencies for a service"""
return self.service_dependencies.get(service_name, [])
def add_service_dependency(self, service_name: str, dependency: str):
"""Add a dependency for a service"""
if service_name not in self.service_dependencies:
self.service_dependencies[service_name] = []
self.service_dependencies[service_name].append(dependency)
logger.info(f"Added dependency {dependency} for service {service_name}")
def remove_service_dependency(self, service_name: str, dependency: str):
"""Remove a dependency for a service"""
if service_name in self.service_dependencies:
try:
self.service_dependencies[service_name].remove(dependency)
logger.info(f"Removed dependency {dependency} for service {service_name}")
except ValueError:
logger.warning(f"Dependency {dependency} not found for service {service_name}")
def get_service_status_summary(self) -> Dict[str, Any]:
"""Get summary of all service statuses"""
summary = {
"total_services": len(self.service_registry),
"services": {},
"event_count": len(self.event_history),
"timestamp": datetime.utcnow().isoformat()
}
for service_name, service in self.service_registry.items():
try:
if hasattr(service, 'get_status'):
status = service.get_status()
else:
status = {"status": "unknown"}
summary["services"][service_name] = {
"status": status,
"dependencies": self.service_dependencies.get(service_name, [])
}
except Exception as e:
summary["services"][service_name] = {
"status": {"error": str(e)},
"dependencies": self.service_dependencies.get(service_name, [])
}
return summary