#!/usr/bin/env python3 """ Log Manager for Personal Internet Cell Comprehensive logging management for all services """ import os import json import logging import logging.handlers from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple from pathlib import Path import re import gzip import shutil from collections import defaultdict import threading import time from enum import Enum logger = logging.getLogger(__name__) class LogLevel(Enum): """Log levels""" DEBUG = "DEBUG" INFO = "INFO" WARNING = "WARNING" ERROR = "ERROR" CRITICAL = "CRITICAL" class LogManager: """Comprehensive logging management for all services""" def __init__(self, log_dir: str = '/app/logs', max_file_size: int = 10 * 1024 * 1024, backup_count: int = 5): self.log_dir = Path(log_dir) self.max_file_size = max_file_size self.backup_count = backup_count # Ensure log directory exists self.log_dir.mkdir(parents=True, exist_ok=True) # Service loggers self.service_loggers: Dict[str, logging.Logger] = {} # Log formatters self.formatters = { 'json': self._create_json_formatter(), 'text': self._create_text_formatter(), 'detailed': self._create_detailed_formatter() } # Log handlers self.handlers: Dict[str, Dict[str, logging.Handler]] = defaultdict(dict) # Log statistics self.log_stats = defaultdict(lambda: { 'total_entries': 0, 'error_count': 0, 'warning_count': 0, 'last_entry': None }) # Log rotation thread self.rotation_thread = None self.running = False # Start log rotation monitoring self._start_rotation_monitor() def _create_json_formatter(self) -> logging.Formatter: """Create JSON formatter for structured logging""" class JsonFormatter(logging.Formatter): def format(self, record): log_entry = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'logger': record.name, 'message': record.getMessage(), 'module': record.module, 'function': record.funcName, 'line': record.lineno } # Add extra fields if present for key, value in record.__dict__.items(): if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename', 'module', 'lineno', 'funcName', 'created', 'msecs', 'relativeCreated', 'thread', 'threadName', 'processName', 'process', 'getMessage', 'exc_info', 'exc_text', 'stack_info']: log_entry[key] = value # Add exception info if present if record.exc_info: log_entry['exception'] = self.formatException(record.exc_info) return json.dumps(log_entry) return JsonFormatter() def _create_text_formatter(self) -> logging.Formatter: """Create text formatter for human-readable logs""" return logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def _create_detailed_formatter(self) -> logging.Formatter: """Create detailed formatter with extra information""" return logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(funcName)s:%(lineno)d - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def add_service_logger(self, service: str, config: Dict[str, Any]): """Add a logger for a specific service""" try: # Create service logger service_logger = logging.getLogger(f'picell.{service}') service_logger.setLevel(getattr(logging, config.get('level', 'INFO'))) # Create log file path log_file = self.log_dir / f'{service}.log' # Create rotating file handler handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=self.max_file_size, backupCount=self.backup_count, encoding='utf-8' ) # Set formatter formatter_name = config.get('formatter', 'json') handler.setFormatter(self.formatters[formatter_name]) # Add handler to logger service_logger.addHandler(handler) # Store logger and handler self.service_loggers[service] = service_logger self.handlers[service]['file'] = handler # Add console handler if requested if config.get('console', False): console_handler = logging.StreamHandler() console_handler.setFormatter(self.formatters[formatter_name]) service_logger.addHandler(console_handler) self.handlers[service]['console'] = console_handler logger.info(f"Added logger for service: {service}") except Exception as e: logger.error(f"Error adding logger for {service}: {e}") def get_service_logs(self, service: str, level: str = 'INFO', lines: int = 50) -> List[str]: """Get logs for a specific service""" try: log_file = self.log_dir / f'{service}.log' if not log_file.exists(): return [f"No log file found for service: {service}"] # Read log file with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: all_lines = f.readlines() # Filter by level if specified if level != 'ALL': filtered_lines = [] for line in all_lines: if self._is_log_level(line, level): filtered_lines.append(line) all_lines = filtered_lines # Return last N lines return all_lines[-lines:] if lines > 0 else all_lines except Exception as e: logger.error(f"Error reading logs for {service}: {e}") return [f"Error reading logs: {str(e)}"] def get_service_logs_parsed(self, service: str, level: str = 'INFO', lines: int = 50) -> List[Dict[str, Any]]: """Get parsed logs for a specific service""" try: log_file = self.log_dir / f'{service}.log' if not log_file.exists(): return [{"error": f"No log file found for service: {service}"}] results = [] with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: all_lines = f.readlines() # Process lines in reverse order to get most recent first for line in reversed(all_lines[-lines:] if lines > 0 else all_lines): line = line.strip() if not line: continue # Try to parse as JSON try: log_entry = json.loads(line) # Apply level filter if level != 'ALL' and log_entry.get('level', '').upper() != level.upper(): continue results.append(log_entry) except json.JSONDecodeError: # Handle non-JSON logs if level == 'ALL' or self._is_log_level(line, level): results.append({ 'raw_line': line, 'timestamp': datetime.now().isoformat(), 'level': 'INFO' }) return results except Exception as e: logger.error(f"Error reading parsed logs for {service}: {e}") return [{"error": f"Error reading logs: {str(e)}"}] def search_logs(self, query: str, time_range: Optional[Tuple[datetime, datetime]] = None, services: Optional[List[str]] = None, level: Optional[str] = None) -> List[Dict[str, Any]]: """Search logs across all services""" results = [] # Determine which services to search if services is None: services = list(self.service_loggers.keys()) for service in services: try: log_file = self.log_dir / f'{service}.log' if not log_file.exists(): continue with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: for line_num, line in enumerate(f, 1): # Parse JSON log entry try: log_entry = json.loads(line.strip()) # Apply filters if not self._matches_search_criteria(log_entry, query, time_range, level): continue log_entry['service'] = service log_entry['line_number'] = line_num results.append(log_entry) except json.JSONDecodeError: # Handle non-JSON logs if query.lower() in line.lower(): results.append({ 'service': service, 'line_number': line_num, 'raw_line': line.strip(), 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Error searching logs for {service}: {e}") # Sort by timestamp results.sort(key=lambda x: x.get('timestamp', ''), reverse=True) return results def _matches_search_criteria(self, log_entry: Dict[str, Any], query: str, time_range: Optional[Tuple[datetime, datetime]], level: Optional[str]) -> bool: """Check if log entry matches search criteria""" # Check query if query: message = log_entry.get('message', '').lower() if query.lower() not in message: return False # Check time range if time_range: try: log_time = datetime.fromisoformat(log_entry.get('timestamp', '')) if not (time_range[0] <= log_time <= time_range[1]): return False except (ValueError, TypeError): return False # Check level if level: if log_entry.get('level', '').upper() != level.upper(): return False return True def _is_log_level(self, line: str, level: str) -> bool: """Check if log line matches specified level""" try: # Try to parse as JSON log_entry = json.loads(line.strip()) return log_entry.get('level', '').upper() == level.upper() except json.JSONDecodeError: # Fallback to text parsing level_pattern = rf'\b{level.upper()}\b' return bool(re.search(level_pattern, line.upper())) def export_logs(self, format: str = 'json', filters: Optional[Dict[str, Any]] = None) -> str: """Export logs in specified format""" try: if filters is None: filters = {} # Get logs based on filters services = filters.get('services', list(self.service_loggers.keys())) level = filters.get('level') time_range = filters.get('time_range') query = filters.get('query', '') logs = self.search_logs(query, time_range, services, level) if format == 'json': return json.dumps(logs, indent=2) elif format == 'csv': return self._logs_to_csv(logs) elif format == 'text': return self._logs_to_text(logs) else: raise ValueError(f"Unsupported export format: {format}") except Exception as e: logger.error(f"Error exporting logs: {e}") raise def _logs_to_csv(self, logs: List[Dict[str, Any]]) -> str: """Convert logs to CSV format""" if not logs: return "" # Get all possible fields fields = set() for log in logs: fields.update(log.keys()) fields = sorted(list(fields)) # Create CSV csv_lines = [','.join(fields)] for log in logs: row = [str(log.get(field, '')) for field in fields] csv_lines.append(','.join(row)) return '\n'.join(csv_lines) def _logs_to_text(self, logs: List[Dict[str, Any]]) -> str: """Convert logs to text format""" text_lines = [] for log in logs: timestamp = log.get('timestamp', '') level = log.get('level', '') service = log.get('service', '') message = log.get('message', '') text_lines.append(f"{timestamp} [{level}] {service}: {message}") return '\n'.join(text_lines) def get_log_statistics(self, service: Optional[str] = None) -> Dict[str, Any]: """Get log statistics""" stats = {} if service: services = [service] else: services = list(self.service_loggers.keys()) for svc in services: try: log_file = self.log_dir / f'{svc}.log' if not log_file.exists(): stats[svc] = {'error': 'Log file not found'} continue # Count log entries by level level_counts = defaultdict(int) total_entries = 0 last_entry = None with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: for line in f: try: log_entry = json.loads(line.strip()) level = log_entry.get('level', 'UNKNOWN') level_counts[level] += 1 total_entries += 1 last_entry = log_entry.get('timestamp') except json.JSONDecodeError: total_entries += 1 stats[svc] = { 'total_entries': total_entries, 'level_counts': dict(level_counts), 'last_entry': last_entry, 'file_size': log_file.stat().st_size } except Exception as e: stats[svc] = {'error': str(e)} return stats def rotate_logs(self, service: Optional[str] = None): """Manually rotate logs""" try: if service: services = [service] else: services = list(self.service_loggers.keys()) for svc in services: if svc in self.handlers and 'file' in self.handlers[svc]: handler = self.handlers[svc]['file'] handler.doRollover() logger.info(f"Rotated logs for service: {svc}") except Exception as e: logger.error(f"Error rotating logs: {e}") def cleanup_old_logs(self, days: int = 30): """Clean up log files older than specified days""" try: cutoff_date = datetime.now() - timedelta(days=days) deleted_count = 0 for log_file in self.log_dir.glob('*.log.*'): try: file_time = datetime.fromtimestamp(log_file.stat().st_mtime) if file_time < cutoff_date: log_file.unlink() deleted_count += 1 except Exception as e: logger.warning(f"Error checking file {log_file}: {e}") logger.info(f"Cleaned up {deleted_count} old log files") except Exception as e: logger.error(f"Error cleaning up old logs: {e}") def _start_rotation_monitor(self): """Start automatic log rotation monitoring""" self.running = True self.rotation_thread = threading.Thread(target=self._rotation_monitor_loop, daemon=True) self.rotation_thread.start() def _rotation_monitor_loop(self): """Monitor and rotate logs automatically""" while self.running: try: # Check each service's log file size for service in self.service_loggers.keys(): log_file = self.log_dir / f'{service}.log' if log_file.exists() and log_file.stat().st_size > self.max_file_size: self.rotate_logs(service) # Sleep for 1 hour before next check time.sleep(3600) except Exception as e: logger.error(f"Error in rotation monitor: {e}") time.sleep(60) # Sleep for 1 minute on error def stop(self): """Stop the log manager""" self.running = False if self.rotation_thread: self.rotation_thread.join(timeout=5) # Close all handlers for service_handlers in self.handlers.values(): for handler in service_handlers.values(): handler.close() logger.info("Log manager stopped") def get_log_file_info(self, service: str) -> Dict[str, Any]: """Get information about a service's log file""" try: log_file = self.log_dir / f'{service}.log' if not log_file.exists(): return {'error': 'Log file not found'} stat = log_file.stat() return { 'file_path': str(log_file), 'file_size': stat.st_size, 'created': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'exists': True } except Exception as e: return {'error': str(e)} def set_service_level(self, service: str, level: str): """Change log level for a service at runtime.""" try: log_level = getattr(logging, level.upper(), logging.INFO) if service in self.service_loggers: self.service_loggers[service].setLevel(log_level) if service in self.handlers and 'file' in self.handlers[service]: self.handlers[service]['file'].setLevel(log_level) logger.info(f"Set log level for {service} to {level}") else: logger.warning(f"Service logger not found: {service}") except Exception as e: logger.error(f"Error setting log level for {service}: {e}") def get_service_levels(self) -> Dict[str, str]: """Return current log level for each service logger.""" return { svc: logging.getLevelName(lgr.level) for svc, lgr in self.service_loggers.items() } def get_all_log_file_infos(self) -> List[Dict[str, Any]]: """Return size/mtime info for active and rotated service log files.""" results = [] # Active logs (*.log) then rotated backups (*.log.1, *.log.2, ...) patterns = ['*.log', '*.log.*'] seen = set() for pattern in patterns: for log_file in sorted(self.log_dir.glob(pattern)): if log_file in seen or log_file.suffix == '.gz': continue seen.add(log_file) try: stat = log_file.stat() name = log_file.name is_backup = not name.endswith('.log') results.append({ 'name': log_file.stem.split('.')[0], # service name 'file': name, 'size': stat.st_size, 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'backup': is_backup, }) except Exception: pass return results def compress_old_logs(self): """Compress old log files to save space""" try: compressed_count = 0 for log_file in self.log_dir.glob('*.log.*'): if not log_file.name.endswith('.gz'): try: with open(log_file, 'rb') as f_in: gz_file = log_file.with_suffix(log_file.suffix + '.gz') with gzip.open(gz_file, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) # Remove original file log_file.unlink() compressed_count += 1 except Exception as e: logger.warning(f"Error compressing {log_file}: {e}") logger.info(f"Compressed {compressed_count} log files") except Exception as e: logger.error(f"Error compressing logs: {e}")