#!/usr/bin/env python3 """ Container Manager for Personal Internet Cell Handles Docker container orchestration and management """ import docker import logging from datetime import datetime from typing import List, Dict, Any, Optional from base_service_manager import BaseServiceManager logger = logging.getLogger(__name__) class ContainerManager(BaseServiceManager): """Manages Docker container orchestration and management""" def __init__(self, data_dir: str = None, config_dir: str = None): import os as _os data_dir = data_dir or _os.environ.get('DATA_DIR', '/app/data') config_dir = config_dir or _os.environ.get('CONFIG_DIR', '/app/config') super().__init__('container', data_dir, config_dir) try: self.client = docker.from_env() self.docker_available = True except Exception as e: logger.error(f"Docker client initialization failed: {e}") self.client = None self.docker_available = False def get_status(self) -> Dict[str, Any]: """Get container service status""" try: if not self.docker_available: return { 'running': False, 'status': 'offline', 'error': 'Docker not available', 'containers_count': 0, 'images_count': 0, 'volumes_count': 0, 'timestamp': datetime.utcnow().isoformat() } containers = self.list_containers() images = self.list_images() volumes = self.list_volumes() # Count running containers running_containers = [c for c in containers if c.get('status') == 'running'] status = { 'running': self.docker_available, 'status': 'online' if self.docker_available else 'offline', 'containers_count': len(containers), 'running_containers_count': len(running_containers), 'images_count': len(images), 'volumes_count': len(volumes), 'docker_info': self._get_docker_info(), 'timestamp': datetime.utcnow().isoformat() } return status except Exception as e: return self.handle_error(e, "get_status") def test_connectivity(self) -> Dict[str, Any]: """Test container service connectivity""" try: if not self.docker_available: return { 'success': False, 'message': 'Docker not available', 'error': 'Docker client not initialized', 'timestamp': datetime.utcnow().isoformat() } # Test Docker daemon connectivity daemon_test = self._test_docker_daemon() # Test container operations container_test = self._test_container_operations() # Test image operations image_test = self._test_image_operations() # Test volume operations volume_test = self._test_volume_operations() results = { 'docker_daemon': daemon_test, 'container_operations': container_test, 'image_operations': image_test, 'volume_operations': volume_test, 'success': daemon_test.get('success', False), 'timestamp': datetime.utcnow().isoformat() } return results except Exception as e: return self.handle_error(e, "test_connectivity") def _get_docker_info(self) -> Dict[str, Any]: """Get Docker daemon information""" try: if not self.client: return {'error': 'Docker client not available'} info = self.client.info() return { 'version': info.get('ServerVersion', 'unknown'), 'containers': info.get('Containers', 0), 'images': info.get('Images', 0), 'driver': info.get('Driver', 'unknown'), 'kernel_version': info.get('KernelVersion', 'unknown'), 'os': info.get('OperatingSystem', 'unknown') } except Exception as e: return {'error': str(e)} def _test_docker_daemon(self) -> Dict[str, Any]: """Test Docker daemon connectivity""" try: if not self.client: return { 'success': False, 'message': 'Docker client not available', 'error': 'Client not initialized' } # Test ping self.client.ping() # Get info info = self.client.info() return { 'success': True, 'message': 'Docker daemon accessible', 'version': info.get('ServerVersion', 'unknown') } except Exception as e: return { 'success': False, 'message': f'Docker daemon test failed: {str(e)}', 'error': str(e) } def _test_container_operations(self) -> Dict[str, Any]: """Test container operations""" try: if not self.client: return { 'success': False, 'message': 'Docker client not available', 'error': 'Client not initialized' } # Test listing containers containers = self.list_containers() return { 'success': True, 'message': 'Container operations working', 'containers_count': len(containers) } except Exception as e: return { 'success': False, 'message': f'Container operations test failed: {str(e)}', 'error': str(e) } def _test_image_operations(self) -> Dict[str, Any]: """Test image operations""" try: if not self.client: return { 'success': False, 'message': 'Docker client not available', 'error': 'Client not initialized' } # Test listing images images = self.list_images() return { 'success': True, 'message': 'Image operations working', 'images_count': len(images) } except Exception as e: return { 'success': False, 'message': f'Image operations test failed: {str(e)}', 'error': str(e) } def _test_volume_operations(self) -> Dict[str, Any]: """Test volume operations""" try: if not self.client: return { 'success': False, 'message': 'Docker client not available', 'error': 'Client not initialized' } # Test listing volumes volumes = self.list_volumes() return { 'success': True, 'message': 'Volume operations working', 'volumes_count': len(volumes) } except Exception as e: return { 'success': False, 'message': f'Volume operations test failed: {str(e)}', 'error': str(e) } def list_containers(self, all: bool = True) -> List[Dict]: """List all containers""" try: if not self.client: return [] containers = self.client.containers.list(all=all) return [ { 'id': c.id, 'name': c.name, 'status': c.status, 'image': c.image.tags, 'labels': c.labels } for c in containers ] except Exception as e: logger.error(f"Error listing containers: {e}") return [] def start_container(self, name: str) -> bool: """Start a container""" try: if not self.client: return False container = self.client.containers.get(name) container.start() return True except Exception as e: logger.error(f"Error starting container {name}: {e}") return False def stop_container(self, name: str) -> bool: """Stop a container""" try: if not self.client: return False container = self.client.containers.get(name) container.stop() return True except Exception as e: logger.error(f"Error stopping container {name}: {e}") return False def restart_container(self, name: str) -> bool: """Restart a container""" try: if not self.client: return False container = self.client.containers.get(name) container.restart() return True except Exception as e: logger.error(f"Error restarting container {name}: {e}") return False def get_container_logs(self, name: str, tail: int = 100) -> str: """Get container logs""" try: if not self.client: return "Docker client not available" container = self.client.containers.get(name) return container.logs(tail=tail).decode('utf-8') except Exception as e: logger.error(f"Error getting logs for container {name}: {e}") return str(e) def get_container_stats(self, name: str) -> dict: """Get container statistics""" try: if not self.client: return {'error': 'Docker client not available'} container = self.client.containers.get(name) stats = container.stats(stream=False) return stats except Exception as e: logger.error(f"Error getting stats for container {name}: {e}") return {'error': str(e)} def create_container(self, image: str, name: str = '', env: dict = None, volumes: dict = None, command: str = '', ports: dict = None) -> dict: """Create a new container""" if env is None: env = {} if volumes is None: volumes = {} if ports is None: ports = {} try: if not self.client: return {'error': 'Docker client not available'} # Convert volumes dict to Docker volume format volume_mounts = [] for host_path, container_path in volumes.items(): volume_mounts.append({ 'bind': container_path, 'mode': 'rw' }) # Create volume mapping for Docker volume_map = {} for host_path, container_path in volumes.items(): volume_map[host_path] = { 'bind': container_path, 'mode': 'rw' } # Create port bindings for Docker port_bindings = {} for container_port, host_port in ports.items(): port_bindings[container_port] = host_port container = self.client.containers.create( image=image, name=name if name else None, environment=env, volumes=volume_map, command=command if command else None, ports=port_bindings, detach=True ) return {'id': container.id, 'name': container.name} except Exception as e: logger.error(f"Error creating container: {e}") return {'error': str(e)} def remove_container(self, name: str, force: bool = False) -> bool: """Remove a container""" try: if not self.client: return False container = self.client.containers.get(name) container.remove(force=force) return True except Exception as e: logger.error(f"Error removing container {name}: {e}") return False def list_images(self) -> list: """List all images""" try: if not self.client: return [] images = self.client.images.list() return [ { 'id': img.id, 'tags': img.tags, 'short_id': img.short_id } for img in images ] except Exception as e: logger.error(f"Error listing images: {e}") return [] def pull_image(self, image: str) -> dict: """Pull an image""" try: if not self.client: return {'error': 'Docker client not available'} img = self.client.images.pull(image) return {'id': img.id, 'tags': img.tags} except Exception as e: logger.error(f"Error pulling image {image}: {e}") return {'error': str(e)} def remove_image(self, image: str, force: bool = False) -> bool: """Remove an image""" try: if not self.client: return False self.client.images.remove(image=image, force=force) return True except Exception as e: logger.error(f"Error removing image {image}: {e}") return False def list_volumes(self) -> list: """List all volumes""" try: if not self.client: return [] volumes = self.client.volumes.list() return [ { 'name': v.name, 'mountpoint': v.attrs.get('Mountpoint', '') } for v in volumes ] except Exception as e: logger.error(f"Error listing volumes: {e}") return [] def create_volume(self, name: str) -> dict: """Create a volume""" try: if not self.client: return {'error': 'Docker client not available'} v = self.client.volumes.create(name=name) return {'name': v.name, 'mountpoint': v.attrs.get('Mountpoint', '')} except Exception as e: logger.error(f"Error creating volume {name}: {e}") return {'error': str(e)} def remove_volume(self, name: str, force: bool = False) -> bool: """Remove a volume""" try: if not self.client: return False v = self.client.volumes.get(name) v.remove(force=force) return True except Exception as e: logger.error(f"Error removing volume {name}: {e}") return False