524 lines
20 KiB
Python
524 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Log Manager for Personal Internet Cell
|
|
Comprehensive logging management for all services
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
import logging.handlers
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from pathlib import Path
|
|
import re
|
|
import gzip
|
|
import shutil
|
|
from collections import defaultdict
|
|
import threading
|
|
import time
|
|
from enum import Enum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class LogLevel(Enum):
|
|
"""Log levels"""
|
|
DEBUG = "DEBUG"
|
|
INFO = "INFO"
|
|
WARNING = "WARNING"
|
|
ERROR = "ERROR"
|
|
CRITICAL = "CRITICAL"
|
|
|
|
class LogManager:
|
|
"""Comprehensive logging management for all services"""
|
|
|
|
def __init__(self, log_dir: str = '/app/logs', max_file_size: int = 10 * 1024 * 1024,
|
|
backup_count: int = 5):
|
|
self.log_dir = Path(log_dir)
|
|
self.max_file_size = max_file_size
|
|
self.backup_count = backup_count
|
|
|
|
# Ensure log directory exists
|
|
self.log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Service loggers
|
|
self.service_loggers: Dict[str, logging.Logger] = {}
|
|
|
|
# Log formatters
|
|
self.formatters = {
|
|
'json': self._create_json_formatter(),
|
|
'text': self._create_text_formatter(),
|
|
'detailed': self._create_detailed_formatter()
|
|
}
|
|
|
|
# Log handlers
|
|
self.handlers: Dict[str, Dict[str, logging.Handler]] = defaultdict(dict)
|
|
|
|
# Log statistics
|
|
self.log_stats = defaultdict(lambda: {
|
|
'total_entries': 0,
|
|
'error_count': 0,
|
|
'warning_count': 0,
|
|
'last_entry': None
|
|
})
|
|
|
|
# Log rotation thread
|
|
self.rotation_thread = None
|
|
self.running = False
|
|
|
|
# Start log rotation monitoring
|
|
self._start_rotation_monitor()
|
|
|
|
def _create_json_formatter(self) -> logging.Formatter:
|
|
"""Create JSON formatter for structured logging"""
|
|
class JsonFormatter(logging.Formatter):
|
|
def format(self, record):
|
|
log_entry = {
|
|
'timestamp': self.formatTime(record),
|
|
'level': record.levelname,
|
|
'logger': record.name,
|
|
'message': record.getMessage(),
|
|
'module': record.module,
|
|
'function': record.funcName,
|
|
'line': record.lineno
|
|
}
|
|
|
|
# Add extra fields if present
|
|
for key, value in record.__dict__.items():
|
|
if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname',
|
|
'filename', 'module', 'lineno', 'funcName', 'created',
|
|
'msecs', 'relativeCreated', 'thread', 'threadName',
|
|
'processName', 'process', 'getMessage', 'exc_info',
|
|
'exc_text', 'stack_info']:
|
|
log_entry[key] = value
|
|
|
|
# Add exception info if present
|
|
if record.exc_info:
|
|
log_entry['exception'] = self.formatException(record.exc_info)
|
|
|
|
return json.dumps(log_entry)
|
|
|
|
return JsonFormatter()
|
|
|
|
def _create_text_formatter(self) -> logging.Formatter:
|
|
"""Create text formatter for human-readable logs"""
|
|
return logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
def _create_detailed_formatter(self) -> logging.Formatter:
|
|
"""Create detailed formatter with extra information"""
|
|
return logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(funcName)s:%(lineno)d - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
def add_service_logger(self, service: str, config: Dict[str, Any]):
|
|
"""Add a logger for a specific service"""
|
|
try:
|
|
# Create service logger
|
|
service_logger = logging.getLogger(f'picell.{service}')
|
|
service_logger.setLevel(getattr(logging, config.get('level', 'INFO')))
|
|
|
|
# Create log file path
|
|
log_file = self.log_dir / f'{service}.log'
|
|
|
|
# Create rotating file handler
|
|
handler = logging.handlers.RotatingFileHandler(
|
|
log_file,
|
|
maxBytes=self.max_file_size,
|
|
backupCount=self.backup_count,
|
|
encoding='utf-8'
|
|
)
|
|
|
|
# Set formatter
|
|
formatter_name = config.get('formatter', 'json')
|
|
handler.setFormatter(self.formatters[formatter_name])
|
|
|
|
# Add handler to logger
|
|
service_logger.addHandler(handler)
|
|
|
|
# Store logger and handler
|
|
self.service_loggers[service] = service_logger
|
|
self.handlers[service]['file'] = handler
|
|
|
|
# Add console handler if requested
|
|
if config.get('console', False):
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(self.formatters[formatter_name])
|
|
service_logger.addHandler(console_handler)
|
|
self.handlers[service]['console'] = console_handler
|
|
|
|
logger.info(f"Added logger for service: {service}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding logger for {service}: {e}")
|
|
|
|
def get_service_logs(self, service: str, level: str = 'INFO', lines: int = 50) -> List[str]:
|
|
"""Get logs for a specific service"""
|
|
try:
|
|
log_file = self.log_dir / f'{service}.log'
|
|
if not log_file.exists():
|
|
return [f"No log file found for service: {service}"]
|
|
|
|
# Read log file
|
|
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
|
all_lines = f.readlines()
|
|
|
|
# Filter by level if specified
|
|
if level != 'ALL':
|
|
filtered_lines = []
|
|
for line in all_lines:
|
|
if self._is_log_level(line, level):
|
|
filtered_lines.append(line)
|
|
all_lines = filtered_lines
|
|
|
|
# Return last N lines
|
|
return all_lines[-lines:] if lines > 0 else all_lines
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading logs for {service}: {e}")
|
|
return [f"Error reading logs: {str(e)}"]
|
|
|
|
def get_service_logs_parsed(self, service: str, level: str = 'INFO', lines: int = 50) -> List[Dict[str, Any]]:
|
|
"""Get parsed logs for a specific service"""
|
|
try:
|
|
log_file = self.log_dir / f'{service}.log'
|
|
if not log_file.exists():
|
|
return [{"error": f"No log file found for service: {service}"}]
|
|
|
|
results = []
|
|
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
|
all_lines = f.readlines()
|
|
|
|
# Process lines in reverse order to get most recent first
|
|
for line in reversed(all_lines[-lines:] if lines > 0 else all_lines):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Try to parse as JSON
|
|
try:
|
|
log_entry = json.loads(line)
|
|
# Apply level filter
|
|
if level != 'ALL' and log_entry.get('level', '').upper() != level.upper():
|
|
continue
|
|
results.append(log_entry)
|
|
except json.JSONDecodeError:
|
|
# Handle non-JSON logs
|
|
if level == 'ALL' or self._is_log_level(line, level):
|
|
results.append({
|
|
'raw_line': line,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'level': 'INFO'
|
|
})
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading parsed logs for {service}: {e}")
|
|
return [{"error": f"Error reading logs: {str(e)}"}]
|
|
|
|
def search_logs(self, query: str, time_range: Optional[Tuple[datetime, datetime]] = None,
|
|
services: Optional[List[str]] = None, level: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""Search logs across all services"""
|
|
results = []
|
|
|
|
# Determine which services to search
|
|
if services is None:
|
|
services = list(self.service_loggers.keys())
|
|
|
|
for service in services:
|
|
try:
|
|
log_file = self.log_dir / f'{service}.log'
|
|
if not log_file.exists():
|
|
continue
|
|
|
|
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
# Parse JSON log entry
|
|
try:
|
|
log_entry = json.loads(line.strip())
|
|
|
|
# Apply filters
|
|
if not self._matches_search_criteria(log_entry, query, time_range, level):
|
|
continue
|
|
|
|
log_entry['service'] = service
|
|
log_entry['line_number'] = line_num
|
|
results.append(log_entry)
|
|
|
|
except json.JSONDecodeError:
|
|
# Handle non-JSON logs
|
|
if query.lower() in line.lower():
|
|
results.append({
|
|
'service': service,
|
|
'line_number': line_num,
|
|
'raw_line': line.strip(),
|
|
'timestamp': datetime.now().isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching logs for {service}: {e}")
|
|
|
|
# Sort by timestamp
|
|
results.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
|
|
return results
|
|
|
|
def _matches_search_criteria(self, log_entry: Dict[str, Any], query: str,
|
|
time_range: Optional[Tuple[datetime, datetime]],
|
|
level: Optional[str]) -> bool:
|
|
"""Check if log entry matches search criteria"""
|
|
# Check query
|
|
if query:
|
|
message = log_entry.get('message', '').lower()
|
|
if query.lower() not in message:
|
|
return False
|
|
|
|
# Check time range
|
|
if time_range:
|
|
try:
|
|
log_time = datetime.fromisoformat(log_entry.get('timestamp', ''))
|
|
if not (time_range[0] <= log_time <= time_range[1]):
|
|
return False
|
|
except (ValueError, TypeError):
|
|
return False
|
|
|
|
# Check level
|
|
if level:
|
|
if log_entry.get('level', '').upper() != level.upper():
|
|
return False
|
|
|
|
return True
|
|
|
|
def _is_log_level(self, line: str, level: str) -> bool:
|
|
"""Check if log line matches specified level"""
|
|
try:
|
|
# Try to parse as JSON
|
|
log_entry = json.loads(line.strip())
|
|
return log_entry.get('level', '').upper() == level.upper()
|
|
except json.JSONDecodeError:
|
|
# Fallback to text parsing
|
|
level_pattern = rf'\b{level.upper()}\b'
|
|
return bool(re.search(level_pattern, line.upper()))
|
|
|
|
def export_logs(self, format: str = 'json', filters: Optional[Dict[str, Any]] = None) -> str:
|
|
"""Export logs in specified format"""
|
|
try:
|
|
if filters is None:
|
|
filters = {}
|
|
|
|
# Get logs based on filters
|
|
services = filters.get('services', list(self.service_loggers.keys()))
|
|
level = filters.get('level')
|
|
time_range = filters.get('time_range')
|
|
query = filters.get('query', '')
|
|
|
|
logs = self.search_logs(query, time_range, services, level)
|
|
|
|
if format == 'json':
|
|
return json.dumps(logs, indent=2)
|
|
elif format == 'csv':
|
|
return self._logs_to_csv(logs)
|
|
elif format == 'text':
|
|
return self._logs_to_text(logs)
|
|
else:
|
|
raise ValueError(f"Unsupported export format: {format}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error exporting logs: {e}")
|
|
raise
|
|
|
|
def _logs_to_csv(self, logs: List[Dict[str, Any]]) -> str:
|
|
"""Convert logs to CSV format"""
|
|
if not logs:
|
|
return ""
|
|
|
|
# Get all possible fields
|
|
fields = set()
|
|
for log in logs:
|
|
fields.update(log.keys())
|
|
|
|
fields = sorted(list(fields))
|
|
|
|
# Create CSV
|
|
csv_lines = [','.join(fields)]
|
|
for log in logs:
|
|
row = [str(log.get(field, '')) for field in fields]
|
|
csv_lines.append(','.join(row))
|
|
|
|
return '\n'.join(csv_lines)
|
|
|
|
def _logs_to_text(self, logs: List[Dict[str, Any]]) -> str:
|
|
"""Convert logs to text format"""
|
|
text_lines = []
|
|
for log in logs:
|
|
timestamp = log.get('timestamp', '')
|
|
level = log.get('level', '')
|
|
service = log.get('service', '')
|
|
message = log.get('message', '')
|
|
text_lines.append(f"{timestamp} [{level}] {service}: {message}")
|
|
|
|
return '\n'.join(text_lines)
|
|
|
|
def get_log_statistics(self, service: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Get log statistics"""
|
|
stats = {}
|
|
|
|
if service:
|
|
services = [service]
|
|
else:
|
|
services = list(self.service_loggers.keys())
|
|
|
|
for svc in services:
|
|
try:
|
|
log_file = self.log_dir / f'{svc}.log'
|
|
if not log_file.exists():
|
|
stats[svc] = {'error': 'Log file not found'}
|
|
continue
|
|
|
|
# Count log entries by level
|
|
level_counts = defaultdict(int)
|
|
total_entries = 0
|
|
last_entry = None
|
|
|
|
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
|
for line in f:
|
|
try:
|
|
log_entry = json.loads(line.strip())
|
|
level = log_entry.get('level', 'UNKNOWN')
|
|
level_counts[level] += 1
|
|
total_entries += 1
|
|
last_entry = log_entry.get('timestamp')
|
|
except json.JSONDecodeError:
|
|
total_entries += 1
|
|
|
|
stats[svc] = {
|
|
'total_entries': total_entries,
|
|
'level_counts': dict(level_counts),
|
|
'last_entry': last_entry,
|
|
'file_size': log_file.stat().st_size
|
|
}
|
|
|
|
except Exception as e:
|
|
stats[svc] = {'error': str(e)}
|
|
|
|
return stats
|
|
|
|
def rotate_logs(self, service: Optional[str] = None):
|
|
"""Manually rotate logs"""
|
|
try:
|
|
if service:
|
|
services = [service]
|
|
else:
|
|
services = list(self.service_loggers.keys())
|
|
|
|
for svc in services:
|
|
if svc in self.handlers and 'file' in self.handlers[svc]:
|
|
handler = self.handlers[svc]['file']
|
|
handler.doRollover()
|
|
logger.info(f"Rotated logs for service: {svc}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error rotating logs: {e}")
|
|
|
|
def cleanup_old_logs(self, days: int = 30):
|
|
"""Clean up log files older than specified days"""
|
|
try:
|
|
cutoff_date = datetime.now() - timedelta(days=days)
|
|
deleted_count = 0
|
|
|
|
for log_file in self.log_dir.glob('*.log.*'):
|
|
try:
|
|
file_time = datetime.fromtimestamp(log_file.stat().st_mtime)
|
|
if file_time < cutoff_date:
|
|
log_file.unlink()
|
|
deleted_count += 1
|
|
except Exception as e:
|
|
logger.warning(f"Error checking file {log_file}: {e}")
|
|
|
|
logger.info(f"Cleaned up {deleted_count} old log files")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up old logs: {e}")
|
|
|
|
def _start_rotation_monitor(self):
|
|
"""Start automatic log rotation monitoring"""
|
|
self.running = True
|
|
self.rotation_thread = threading.Thread(target=self._rotation_monitor_loop, daemon=True)
|
|
self.rotation_thread.start()
|
|
|
|
def _rotation_monitor_loop(self):
|
|
"""Monitor and rotate logs automatically"""
|
|
while self.running:
|
|
try:
|
|
# Check each service's log file size
|
|
for service in self.service_loggers.keys():
|
|
log_file = self.log_dir / f'{service}.log'
|
|
if log_file.exists() and log_file.stat().st_size > self.max_file_size:
|
|
self.rotate_logs(service)
|
|
|
|
# Sleep for 1 hour before next check
|
|
time.sleep(3600)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in rotation monitor: {e}")
|
|
time.sleep(60) # Sleep for 1 minute on error
|
|
|
|
def stop(self):
|
|
"""Stop the log manager"""
|
|
self.running = False
|
|
if self.rotation_thread:
|
|
self.rotation_thread.join(timeout=5)
|
|
|
|
# Close all handlers
|
|
for service_handlers in self.handlers.values():
|
|
for handler in service_handlers.values():
|
|
handler.close()
|
|
|
|
logger.info("Log manager stopped")
|
|
|
|
def get_log_file_info(self, service: str) -> Dict[str, Any]:
|
|
"""Get information about a service's log file"""
|
|
try:
|
|
log_file = self.log_dir / f'{service}.log'
|
|
if not log_file.exists():
|
|
return {'error': 'Log file not found'}
|
|
|
|
stat = log_file.stat()
|
|
return {
|
|
'file_path': str(log_file),
|
|
'file_size': stat.st_size,
|
|
'created': datetime.fromtimestamp(stat.st_ctime).isoformat(),
|
|
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
|
'exists': True
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
|
|
def compress_old_logs(self):
|
|
"""Compress old log files to save space"""
|
|
try:
|
|
compressed_count = 0
|
|
|
|
for log_file in self.log_dir.glob('*.log.*'):
|
|
if not log_file.name.endswith('.gz'):
|
|
try:
|
|
with open(log_file, 'rb') as f_in:
|
|
gz_file = log_file.with_suffix(log_file.suffix + '.gz')
|
|
with gzip.open(gz_file, 'wb') as f_out:
|
|
shutil.copyfileobj(f_in, f_out)
|
|
|
|
# Remove original file
|
|
log_file.unlink()
|
|
compressed_count += 1
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error compressing {log_file}: {e}")
|
|
|
|
logger.info(f"Compressed {compressed_count} log files")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error compressing logs: {e}") |