Files
pic/api/log_manager.py
roof 8e1814c7d2 fix: spurious health alerts, show rotated logs, clear history button
app.py:
- Alert logic now checks status.running (container up/down) instead of healthy
  (which requires connectivity tests) — services are only alerted when actually down
- Add POST /api/health/history/clear endpoint to reset history + alert counters

log_manager.py:
- get_all_log_file_infos: include rotated backup files (*.log.1, *.log.2 ...) in listing,
  marked with backup=true so UI can dim them and hide rotate button

api.js: add monitoringAPI.clearHealthHistory

Logs page:
- Health History: add Clear button with confirmation
- File panel: show full filename (including .log.1 backups), explain host path and naming,
  hide rotate button for backup files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 03:05:04 -04:00

571 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Log Manager for Personal Internet Cell
Comprehensive logging management for all services
"""
import os
import json
import logging
import logging.handlers
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from pathlib import Path
import re
import gzip
import shutil
from collections import defaultdict
import threading
import time
from enum import Enum
logger = logging.getLogger(__name__)
class LogLevel(Enum):
"""Log levels"""
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class LogManager:
"""Comprehensive logging management for all services"""
def __init__(self, log_dir: str = '/app/logs', max_file_size: int = 10 * 1024 * 1024,
backup_count: int = 5):
self.log_dir = Path(log_dir)
self.max_file_size = max_file_size
self.backup_count = backup_count
# Ensure log directory exists
self.log_dir.mkdir(parents=True, exist_ok=True)
# Service loggers
self.service_loggers: Dict[str, logging.Logger] = {}
# Log formatters
self.formatters = {
'json': self._create_json_formatter(),
'text': self._create_text_formatter(),
'detailed': self._create_detailed_formatter()
}
# Log handlers
self.handlers: Dict[str, Dict[str, logging.Handler]] = defaultdict(dict)
# Log statistics
self.log_stats = defaultdict(lambda: {
'total_entries': 0,
'error_count': 0,
'warning_count': 0,
'last_entry': None
})
# Log rotation thread
self.rotation_thread = None
self.running = False
# Start log rotation monitoring
self._start_rotation_monitor()
def _create_json_formatter(self) -> logging.Formatter:
"""Create JSON formatter for structured logging"""
class JsonFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# Add extra fields if present
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname',
'filename', 'module', 'lineno', 'funcName', 'created',
'msecs', 'relativeCreated', 'thread', 'threadName',
'processName', 'process', 'getMessage', 'exc_info',
'exc_text', 'stack_info']:
log_entry[key] = value
# Add exception info if present
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
return json.dumps(log_entry)
return JsonFormatter()
def _create_text_formatter(self) -> logging.Formatter:
"""Create text formatter for human-readable logs"""
return logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def _create_detailed_formatter(self) -> logging.Formatter:
"""Create detailed formatter with extra information"""
return logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(funcName)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def add_service_logger(self, service: str, config: Dict[str, Any]):
"""Add a logger for a specific service"""
try:
# Create service logger
service_logger = logging.getLogger(f'picell.{service}')
service_logger.setLevel(getattr(logging, config.get('level', 'INFO')))
# Create log file path
log_file = self.log_dir / f'{service}.log'
# Create rotating file handler
handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=self.max_file_size,
backupCount=self.backup_count,
encoding='utf-8'
)
# Set formatter
formatter_name = config.get('formatter', 'json')
handler.setFormatter(self.formatters[formatter_name])
# Add handler to logger
service_logger.addHandler(handler)
# Store logger and handler
self.service_loggers[service] = service_logger
self.handlers[service]['file'] = handler
# Add console handler if requested
if config.get('console', False):
console_handler = logging.StreamHandler()
console_handler.setFormatter(self.formatters[formatter_name])
service_logger.addHandler(console_handler)
self.handlers[service]['console'] = console_handler
logger.info(f"Added logger for service: {service}")
except Exception as e:
logger.error(f"Error adding logger for {service}: {e}")
def get_service_logs(self, service: str, level: str = 'INFO', lines: int = 50) -> List[str]:
"""Get logs for a specific service"""
try:
log_file = self.log_dir / f'{service}.log'
if not log_file.exists():
return [f"No log file found for service: {service}"]
# Read log file
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
all_lines = f.readlines()
# Filter by level if specified
if level != 'ALL':
filtered_lines = []
for line in all_lines:
if self._is_log_level(line, level):
filtered_lines.append(line)
all_lines = filtered_lines
# Return last N lines
return all_lines[-lines:] if lines > 0 else all_lines
except Exception as e:
logger.error(f"Error reading logs for {service}: {e}")
return [f"Error reading logs: {str(e)}"]
def get_service_logs_parsed(self, service: str, level: str = 'INFO', lines: int = 50) -> List[Dict[str, Any]]:
"""Get parsed logs for a specific service"""
try:
log_file = self.log_dir / f'{service}.log'
if not log_file.exists():
return [{"error": f"No log file found for service: {service}"}]
results = []
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
all_lines = f.readlines()
# Process lines in reverse order to get most recent first
for line in reversed(all_lines[-lines:] if lines > 0 else all_lines):
line = line.strip()
if not line:
continue
# Try to parse as JSON
try:
log_entry = json.loads(line)
# Apply level filter
if level != 'ALL' and log_entry.get('level', '').upper() != level.upper():
continue
results.append(log_entry)
except json.JSONDecodeError:
# Handle non-JSON logs
if level == 'ALL' or self._is_log_level(line, level):
results.append({
'raw_line': line,
'timestamp': datetime.now().isoformat(),
'level': 'INFO'
})
return results
except Exception as e:
logger.error(f"Error reading parsed logs for {service}: {e}")
return [{"error": f"Error reading logs: {str(e)}"}]
def search_logs(self, query: str, time_range: Optional[Tuple[datetime, datetime]] = None,
services: Optional[List[str]] = None, level: Optional[str] = None) -> List[Dict[str, Any]]:
"""Search logs across all services"""
results = []
# Determine which services to search
if services is None:
services = list(self.service_loggers.keys())
for service in services:
try:
log_file = self.log_dir / f'{service}.log'
if not log_file.exists():
continue
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
# Parse JSON log entry
try:
log_entry = json.loads(line.strip())
# Apply filters
if not self._matches_search_criteria(log_entry, query, time_range, level):
continue
log_entry['service'] = service
log_entry['line_number'] = line_num
results.append(log_entry)
except json.JSONDecodeError:
# Handle non-JSON logs
if query.lower() in line.lower():
results.append({
'service': service,
'line_number': line_num,
'raw_line': line.strip(),
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Error searching logs for {service}: {e}")
# Sort by timestamp
results.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
return results
def _matches_search_criteria(self, log_entry: Dict[str, Any], query: str,
time_range: Optional[Tuple[datetime, datetime]],
level: Optional[str]) -> bool:
"""Check if log entry matches search criteria"""
# Check query
if query:
message = log_entry.get('message', '').lower()
if query.lower() not in message:
return False
# Check time range
if time_range:
try:
log_time = datetime.fromisoformat(log_entry.get('timestamp', ''))
if not (time_range[0] <= log_time <= time_range[1]):
return False
except (ValueError, TypeError):
return False
# Check level
if level:
if log_entry.get('level', '').upper() != level.upper():
return False
return True
def _is_log_level(self, line: str, level: str) -> bool:
"""Check if log line matches specified level"""
try:
# Try to parse as JSON
log_entry = json.loads(line.strip())
return log_entry.get('level', '').upper() == level.upper()
except json.JSONDecodeError:
# Fallback to text parsing
level_pattern = rf'\b{level.upper()}\b'
return bool(re.search(level_pattern, line.upper()))
def export_logs(self, format: str = 'json', filters: Optional[Dict[str, Any]] = None) -> str:
"""Export logs in specified format"""
try:
if filters is None:
filters = {}
# Get logs based on filters
services = filters.get('services', list(self.service_loggers.keys()))
level = filters.get('level')
time_range = filters.get('time_range')
query = filters.get('query', '')
logs = self.search_logs(query, time_range, services, level)
if format == 'json':
return json.dumps(logs, indent=2)
elif format == 'csv':
return self._logs_to_csv(logs)
elif format == 'text':
return self._logs_to_text(logs)
else:
raise ValueError(f"Unsupported export format: {format}")
except Exception as e:
logger.error(f"Error exporting logs: {e}")
raise
def _logs_to_csv(self, logs: List[Dict[str, Any]]) -> str:
"""Convert logs to CSV format"""
if not logs:
return ""
# Get all possible fields
fields = set()
for log in logs:
fields.update(log.keys())
fields = sorted(list(fields))
# Create CSV
csv_lines = [','.join(fields)]
for log in logs:
row = [str(log.get(field, '')) for field in fields]
csv_lines.append(','.join(row))
return '\n'.join(csv_lines)
def _logs_to_text(self, logs: List[Dict[str, Any]]) -> str:
"""Convert logs to text format"""
text_lines = []
for log in logs:
timestamp = log.get('timestamp', '')
level = log.get('level', '')
service = log.get('service', '')
message = log.get('message', '')
text_lines.append(f"{timestamp} [{level}] {service}: {message}")
return '\n'.join(text_lines)
def get_log_statistics(self, service: Optional[str] = None) -> Dict[str, Any]:
"""Get log statistics"""
stats = {}
if service:
services = [service]
else:
services = list(self.service_loggers.keys())
for svc in services:
try:
log_file = self.log_dir / f'{svc}.log'
if not log_file.exists():
stats[svc] = {'error': 'Log file not found'}
continue
# Count log entries by level
level_counts = defaultdict(int)
total_entries = 0
last_entry = None
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
try:
log_entry = json.loads(line.strip())
level = log_entry.get('level', 'UNKNOWN')
level_counts[level] += 1
total_entries += 1
last_entry = log_entry.get('timestamp')
except json.JSONDecodeError:
total_entries += 1
stats[svc] = {
'total_entries': total_entries,
'level_counts': dict(level_counts),
'last_entry': last_entry,
'file_size': log_file.stat().st_size
}
except Exception as e:
stats[svc] = {'error': str(e)}
return stats
def rotate_logs(self, service: Optional[str] = None):
"""Manually rotate logs"""
try:
if service:
services = [service]
else:
services = list(self.service_loggers.keys())
for svc in services:
if svc in self.handlers and 'file' in self.handlers[svc]:
handler = self.handlers[svc]['file']
handler.doRollover()
logger.info(f"Rotated logs for service: {svc}")
except Exception as e:
logger.error(f"Error rotating logs: {e}")
def cleanup_old_logs(self, days: int = 30):
"""Clean up log files older than specified days"""
try:
cutoff_date = datetime.now() - timedelta(days=days)
deleted_count = 0
for log_file in self.log_dir.glob('*.log.*'):
try:
file_time = datetime.fromtimestamp(log_file.stat().st_mtime)
if file_time < cutoff_date:
log_file.unlink()
deleted_count += 1
except Exception as e:
logger.warning(f"Error checking file {log_file}: {e}")
logger.info(f"Cleaned up {deleted_count} old log files")
except Exception as e:
logger.error(f"Error cleaning up old logs: {e}")
def _start_rotation_monitor(self):
"""Start automatic log rotation monitoring"""
self.running = True
self.rotation_thread = threading.Thread(target=self._rotation_monitor_loop, daemon=True)
self.rotation_thread.start()
def _rotation_monitor_loop(self):
"""Monitor and rotate logs automatically"""
while self.running:
try:
# Check each service's log file size
for service in self.service_loggers.keys():
log_file = self.log_dir / f'{service}.log'
if log_file.exists() and log_file.stat().st_size > self.max_file_size:
self.rotate_logs(service)
# Sleep for 1 hour before next check
time.sleep(3600)
except Exception as e:
logger.error(f"Error in rotation monitor: {e}")
time.sleep(60) # Sleep for 1 minute on error
def stop(self):
"""Stop the log manager"""
self.running = False
if self.rotation_thread:
self.rotation_thread.join(timeout=5)
# Close all handlers
for service_handlers in self.handlers.values():
for handler in service_handlers.values():
handler.close()
logger.info("Log manager stopped")
def get_log_file_info(self, service: str) -> Dict[str, Any]:
"""Get information about a service's log file"""
try:
log_file = self.log_dir / f'{service}.log'
if not log_file.exists():
return {'error': 'Log file not found'}
stat = log_file.stat()
return {
'file_path': str(log_file),
'file_size': stat.st_size,
'created': datetime.fromtimestamp(stat.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'exists': True
}
except Exception as e:
return {'error': str(e)}
def set_service_level(self, service: str, level: str):
"""Change log level for a service at runtime."""
try:
log_level = getattr(logging, level.upper(), logging.INFO)
if service in self.service_loggers:
self.service_loggers[service].setLevel(log_level)
if service in self.handlers and 'file' in self.handlers[service]:
self.handlers[service]['file'].setLevel(log_level)
logger.info(f"Set log level for {service} to {level}")
else:
logger.warning(f"Service logger not found: {service}")
except Exception as e:
logger.error(f"Error setting log level for {service}: {e}")
def get_service_levels(self) -> Dict[str, str]:
"""Return current log level for each service logger."""
return {
svc: logging.getLevelName(lgr.level)
for svc, lgr in self.service_loggers.items()
}
def get_all_log_file_infos(self) -> List[Dict[str, Any]]:
"""Return size/mtime info for active and rotated service log files."""
results = []
# Active logs (*.log) then rotated backups (*.log.1, *.log.2, ...)
patterns = ['*.log', '*.log.*']
seen = set()
for pattern in patterns:
for log_file in sorted(self.log_dir.glob(pattern)):
if log_file in seen or log_file.suffix == '.gz':
continue
seen.add(log_file)
try:
stat = log_file.stat()
name = log_file.name
is_backup = not name.endswith('.log')
results.append({
'name': log_file.stem.split('.')[0], # service name
'file': name,
'size': stat.st_size,
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'backup': is_backup,
})
except Exception:
pass
return results
def compress_old_logs(self):
"""Compress old log files to save space"""
try:
compressed_count = 0
for log_file in self.log_dir.glob('*.log.*'):
if not log_file.name.endswith('.gz'):
try:
with open(log_file, 'rb') as f_in:
gz_file = log_file.with_suffix(log_file.suffix + '.gz')
with gzip.open(gz_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Remove original file
log_file.unlink()
compressed_count += 1
except Exception as e:
logger.warning(f"Error compressing {log_file}: {e}")
logger.info(f"Compressed {compressed_count} log files")
except Exception as e:
logger.error(f"Error compressing logs: {e}")