#!/usr/bin/env python3 """ Log Manager for Personal Internet Cell Comprehensive logging management for all services """ import os import json import logging import logging.handlers from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple from pathlib import Path import re import gzip import shutil from collections import defaultdict import threading import time from enum import Enum logger = logging.getLogger(__name__) class LogLevel(Enum): """Log levels""" DEBUG = "DEBUG" INFO = "INFO" WARNING = "WARNING" ERROR = "ERROR" CRITICAL = "CRITICAL" class LogManager: """Comprehensive logging management for all services""" def __init__(self, log_dir: str = '/app/logs', max_file_size: int = 10 * 1024 * 1024, backup_count: int = 5): self.log_dir = Path(log_dir) self.max_file_size = max_file_size self.backup_count = backup_count # Ensure log directory exists self.log_dir.mkdir(parents=True, exist_ok=True) # Service loggers self.service_loggers: Dict[str, logging.Logger] = {} # Log formatters self.formatters = { 'json': self._create_json_formatter(), 'text': self._create_text_formatter(), 'detailed': self._create_detailed_formatter() } # Log handlers self.handlers: Dict[str, Dict[str, logging.Handler]] = defaultdict(dict) # Log statistics self.log_stats = defaultdict(lambda: { 'total_entries': 0, 'error_count': 0, 'warning_count': 0, 'last_entry': None }) # Log rotation thread self.rotation_thread = None self.running = False # Start log rotation monitoring self._start_rotation_monitor() def _create_json_formatter(self) -> logging.Formatter: """Create JSON formatter for structured logging""" class JsonFormatter(logging.Formatter): def format(self, record): log_entry = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'logger': record.name, 'message': record.getMessage(), 'module': record.module, 'function': record.funcName, 'line': record.lineno } # Add extra fields if present for key, value in record.__dict__.items(): if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename', 'module', 'lineno', 'funcName', 'created', 'msecs', 'relativeCreated', 'thread', 'threadName', 'processName', 'process', 'getMessage', 'exc_info', 'exc_text', 'stack_info']: log_entry[key] = value # Add exception info if present if record.exc_info: log_entry['exception'] = self.formatException(record.exc_info) return json.dumps(log_entry) return JsonFormatter() def _create_text_formatter(self) -> logging.Formatter: """Create text formatter for human-readable logs""" return logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def _create_detailed_formatter(self) -> logging.Formatter: """Create detailed formatter with extra information""" return logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(funcName)s:%(lineno)d - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def add_service_logger(self, service: str, config: Dict[str, Any]): """Add a logger for a specific service""" try: # Create service logger service_logger = logging.getLogger(f'picell.{service}') service_logger.setLevel(getattr(logging, config.get('level', 'INFO'))) # Create log file path log_file = self.log_dir / f'{service}.log' # Create rotating file handler handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=self.max_file_size, backupCount=self.backup_count, encoding='utf-8' ) # Set formatter formatter_name = config.get('formatter', 'json') handler.setFormatter(self.formatters[formatter_name]) # Add handler to logger service_logger.addHandler(handler) # Store logger and handler self.service_loggers[service] = service_logger self.handlers[service]['file'] = handler # Add console handler if requested if config.get('console', False): console_handler = logging.StreamHandler() console_handler.setFormatter(self.formatters[formatter_name]) service_logger.addHandler(console_handler) self.handlers[service]['console'] = console_handler logger.info(f"Added logger for service: {service}") except Exception as e: logger.error(f"Error adding logger for {service}: {e}") def get_service_logs(self, service: str, level: str = 'INFO', lines: int = 50) -> List[str]: """Get logs for a specific service""" try: log_file = self.log_dir / f'{service}.log' if not log_file.exists(): return [f"No log file found for service: {service}"] # Read log file with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: all_lines = f.readlines() # Filter by level if specified if level != 'ALL': filtered_lines = [] for line in all_lines: if self._is_log_level(line, level): filtered_lines.append(line) all_lines = filtered_lines # Return last N lines return all_lines[-lines:] if lines > 0 else all_lines except Exception as e: logger.error(f"Error reading logs for {service}: {e}") return [f"Error reading logs: {str(e)}"] def get_service_logs_parsed(self, service: str, level: str = 'INFO', lines: int = 50) -> List[Dict[str, Any]]: """Get parsed logs for a specific service""" try: log_file = self.log_dir / f'{service}.log' if not log_file.exists(): return [{"error": f"No log file found for service: {service}"}] results = [] with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: all_lines = f.readlines() # Process lines in reverse order to get most recent first for line in reversed(all_lines[-lines:] if lines > 0 else all_lines): line = line.strip() if not line: continue # Try to parse as JSON try: log_entry = json.loads(line) # Apply level filter if level != 'ALL' and log_entry.get('level', '').upper() != level.upper(): continue results.append(log_entry) except json.JSONDecodeError: # Handle non-JSON logs if level == 'ALL' or self._is_log_level(line, level): results.append({ 'raw_line': line, 'timestamp': datetime.now().isoformat(), 'level': 'INFO' }) return results except Exception as e: logger.error(f"Error reading parsed logs for {service}: {e}") return [{"error": f"Error reading logs: {str(e)}"}] def search_logs(self, query: str, time_range: Optional[Tuple[datetime, datetime]] = None, services: Optional[List[str]] = None, level: Optional[str] = None) -> List[Dict[str, Any]]: """Search logs across all services""" results = [] # Determine which services to search if services is None: services = list(self.service_loggers.keys()) for service in services: try: log_file = self.log_dir / f'{service}.log' if not log_file.exists(): continue with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: for line_num, line in enumerate(f, 1): # Parse JSON log entry try: log_entry = json.loads(line.strip()) # Apply filters if not self._matches_search_criteria(log_entry, query, time_range, level): continue log_entry['service'] = service log_entry['line_number'] = line_num results.append(log_entry) except json.JSONDecodeError: # Handle non-JSON logs if query.lower() in line.lower(): results.append({ 'service': service, 'line_number': line_num, 'raw_line': line.strip(), 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Error searching logs for {service}: {e}") # Sort by timestamp results.sort(key=lambda x: x.get('timestamp', ''), reverse=True) return results def _matches_search_criteria(self, log_entry: Dict[str, Any], query: str, time_range: Optional[Tuple[datetime, datetime]], level: Optional[str]) -> bool: """Check if log entry matches search criteria""" # Check query if query: message = log_entry.get('message', '').lower() if query.lower() not in message: return False # Check time range if time_range: try: log_time = datetime.fromisoformat(log_entry.get('timestamp', '')) if not (time_range[0] <= log_time <= time_range[1]): return False except (ValueError, TypeError): return False # Check level if level: if log_entry.get('level', '').upper() != level.upper(): return False return True def _is_log_level(self, line: str, level: str) -> bool: """Check if log line matches specified level""" try: # Try to parse as JSON log_entry = json.loads(line.strip()) return log_entry.get('level', '').upper() == level.upper() except json.JSONDecodeError: # Fallback to text parsing level_pattern = rf'\b{level.upper()}\b' return bool(re.search(level_pattern, line.upper())) def export_logs(self, format: str = 'json', filters: Optional[Dict[str, Any]] = None) -> str: """Export logs in specified format""" try: if filters is None: filters = {} # Get logs based on filters services = filters.get('services', list(self.service_loggers.keys())) level = filters.get('level') time_range = filters.get('time_range') query = filters.get('query', '') logs = self.search_logs(query, time_range, services, level) if format == 'json': return json.dumps(logs, indent=2) elif format == 'csv': return self._logs_to_csv(logs) elif format == 'text': return self._logs_to_text(logs) else: raise ValueError(f"Unsupported export format: {format}") except Exception as e: logger.error(f"Error exporting logs: {e}") raise def _logs_to_csv(self, logs: List[Dict[str, Any]]) -> str: """Convert logs to CSV format""" if not logs: return "" # Get all possible fields fields = set() for log in logs: fields.update(log.keys()) fields = sorted(list(fields)) # Create CSV csv_lines = [','.join(fields)] for log in logs: row = [str(log.get(field, '')) for field in fields] csv_lines.append(','.join(row)) return '\n'.join(csv_lines) def _logs_to_text(self, logs: List[Dict[str, Any]]) -> str: """Convert logs to text format""" text_lines = [] for log in logs: timestamp = log.get('timestamp', '') level = log.get('level', '') service = log.get('service', '') message = log.get('message', '') text_lines.append(f"{timestamp} [{level}] {service}: {message}") return '\n'.join(text_lines) def get_log_statistics(self, service: Optional[str] = None) -> Dict[str, Any]: """Get log statistics""" stats = {} if service: services = [service] else: services = list(self.service_loggers.keys()) for svc in services: try: log_file = self.log_dir / f'{svc}.log' if not log_file.exists(): stats[svc] = {'error': 'Log file not found'} continue # Count log entries by level level_counts = defaultdict(int) total_entries = 0 last_entry = None with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: for line in f: try: log_entry = json.loads(line.strip()) level = log_entry.get('level', 'UNKNOWN') level_counts[level] += 1 total_entries += 1 last_entry = log_entry.get('timestamp') except json.JSONDecodeError: total_entries += 1 stats[svc] = { 'total_entries': total_entries, 'level_counts': dict(level_counts), 'last_entry': last_entry, 'file_size': log_file.stat().st_size } except Exception as e: stats[svc] = {'error': str(e)} return stats def rotate_logs(self, service: Optional[str] = None): """Manually rotate logs""" try: if service: services = [service] else: services = list(self.service_loggers.keys()) for svc in services: if svc in self.handlers and 'file' in self.handlers[svc]: handler = self.handlers[svc]['file'] handler.doRollover() logger.info(f"Rotated logs for service: {svc}") except Exception as e: logger.error(f"Error rotating logs: {e}") def cleanup_old_logs(self, days: int = 30): """Clean up log files older than specified days""" try: cutoff_date = datetime.now() - timedelta(days=days) deleted_count = 0 for log_file in self.log_dir.glob('*.log.*'): try: file_time = datetime.fromtimestamp(log_file.stat().st_mtime) if file_time < cutoff_date: log_file.unlink() deleted_count += 1 except Exception as e: logger.warning(f"Error checking file {log_file}: {e}") logger.info(f"Cleaned up {deleted_count} old log files") except Exception as e: logger.error(f"Error cleaning up old logs: {e}") def _start_rotation_monitor(self): """Start automatic log rotation monitoring""" self.running = True self.rotation_thread = threading.Thread(target=self._rotation_monitor_loop, daemon=True) self.rotation_thread.start() def _rotation_monitor_loop(self): """Monitor and rotate logs automatically""" while self.running: try: # Check each service's log file size for service in self.service_loggers.keys(): log_file = self.log_dir / f'{service}.log' if log_file.exists() and log_file.stat().st_size > self.max_file_size: self.rotate_logs(service) # Sleep for 1 hour before next check time.sleep(3600) except Exception as e: logger.error(f"Error in rotation monitor: {e}") time.sleep(60) # Sleep for 1 minute on error def stop(self): """Stop the log manager""" self.running = False if self.rotation_thread: self.rotation_thread.join(timeout=5) # Close all handlers for service_handlers in self.handlers.values(): for handler in service_handlers.values(): handler.close() logger.info("Log manager stopped") def get_log_file_info(self, service: str) -> Dict[str, Any]: """Get information about a service's log file""" try: log_file = self.log_dir / f'{service}.log' if not log_file.exists(): return {'error': 'Log file not found'} stat = log_file.stat() return { 'file_path': str(log_file), 'file_size': stat.st_size, 'created': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'exists': True } except Exception as e: return {'error': str(e)} def collect_container_logs(self, container_name: str, docker_client=None) -> int: """Append new docker container stdout/stderr to a persistent log file. Returns number of new lines written, or -1 on error.""" try: import subprocess log_file = self.log_dir / f'container_{container_name}.log' # Determine --since timestamp from last line of existing file since_arg = [] if log_file.exists() and log_file.stat().st_size > 0: with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: for line in f: pass # Parse last timestamp from docker log line (format: 2006-01-02T15:04:05...) ts_match = re.match(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)', line.strip()) if ts_match: since_arg = ['--since', ts_match.group(1)] result = subprocess.run( ['docker', 'logs', '--timestamps'] + since_arg + [container_name], capture_output=True, text=True, timeout=30 ) output = result.stdout + result.stderr lines = [l for l in output.splitlines() if l.strip()] if lines: with open(log_file, 'a', encoding='utf-8') as f: f.write('\n'.join(lines) + '\n') return len(lines) except Exception as e: logger.error(f"Error collecting container logs for {container_name}: {e}") return -1 def get_container_log_lines(self, container_name: str, lines: int = 100) -> List[str]: """Read last N lines from stored container log file.""" try: log_file = self.log_dir / f'container_{container_name}.log' if not log_file.exists(): return [] with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: all_lines = f.readlines() return [l.rstrip() for l in all_lines[-lines:]] if lines > 0 else [l.rstrip() for l in all_lines] except Exception as e: logger.error(f"Error reading container log for {container_name}: {e}") return [] def rotate_container_log(self, container_name: str): """Rotate a stored container log file.""" try: log_file = self.log_dir / f'container_{container_name}.log' if not log_file.exists(): return # Find next available backup index for i in range(1, self.backup_count + 1): backup = self.log_dir / f'container_{container_name}.log.{i}' if not backup.exists(): log_file.rename(backup) logger.info(f"Rotated container log for {container_name} → {backup.name}") return # All slots full — remove oldest, shift others oldest = self.log_dir / f'container_{container_name}.log.{self.backup_count}' oldest.unlink(missing_ok=True) for i in range(self.backup_count - 1, 0, -1): src = self.log_dir / f'container_{container_name}.log.{i}' if src.exists(): src.rename(self.log_dir / f'container_{container_name}.log.{i + 1}') log_file.rename(self.log_dir / f'container_{container_name}.log.1') logger.info(f"Rotated container log for {container_name}") except Exception as e: logger.error(f"Error rotating container log for {container_name}: {e}") def get_all_log_file_infos(self) -> List[Dict[str, Any]]: """Return size/mtime info for all log files (API service logs + container logs).""" results = [] for log_file in sorted(self.log_dir.glob('*.log')): try: stat = log_file.stat() name = log_file.stem # e.g. 'wireguard' or 'container_cell-api' kind = 'container' if name.startswith('container_') else 'service' label = name[len('container_'):] if kind == 'container' else name results.append({ 'name': name, 'label': label, 'kind': kind, 'file': log_file.name, 'size': stat.st_size, 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), }) except Exception: pass return results def compress_old_logs(self): """Compress old log files to save space""" try: compressed_count = 0 for log_file in self.log_dir.glob('*.log.*'): if not log_file.name.endswith('.gz'): try: with open(log_file, 'rb') as f_in: gz_file = log_file.with_suffix(log_file.suffix + '.gz') with gzip.open(gz_file, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) # Remove original file log_file.unlink() compressed_count += 1 except Exception as e: logger.warning(f"Error compressing {log_file}: {e}") logger.info(f"Compressed {compressed_count} log files") except Exception as e: logger.error(f"Error compressing logs: {e}")