#!/usr/bin/env python3 """ Email Manager for Personal Internet Cell Handles email service configuration and user management """ import os import json import smtplib import imaplib import subprocess import logging from datetime import datetime from typing import Dict, List, Optional, Any from base_service_manager import BaseServiceManager logger = logging.getLogger(__name__) class EmailManager(BaseServiceManager): """Manages email service configuration and users""" def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'): super().__init__('email', data_dir, config_dir) self.email_data_dir = os.path.join(data_dir, 'email') self.email_dir = self.email_data_dir # alias used by tests self.postfix_dir = os.path.join(self.email_dir, 'postfix') self.dovecot_dir = os.path.join(self.email_dir, 'dovecot') self.users_file = os.path.join(self.email_data_dir, 'users.json') self.domain_config_file = os.path.join(self.config_dir, 'email', 'domain.json') self.safe_makedirs(self.email_data_dir) self.safe_makedirs(self.postfix_dir) self.safe_makedirs(self.dovecot_dir) self.safe_makedirs(os.path.dirname(self.domain_config_file)) def get_status(self) -> Dict[str, Any]: """Get email service status""" try: # Check if we're running in Docker environment import os is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true' if is_docker: # Check if email container is actually running container_running = self._check_email_container_status() status = { 'running': container_running, 'status': 'online' if container_running else 'offline', 'smtp_running': container_running, 'imap_running': container_running, 'users_count': 0, 'domain': 'cell.local', 'timestamp': datetime.utcnow().isoformat() } else: # Check actual service status in production smtp_running = self._check_smtp_status() imap_running = self._check_imap_status() status = { 'running': smtp_running and imap_running, 'status': 'online' if (smtp_running and imap_running) else 'offline', 'smtp_running': smtp_running, 'imap_running': imap_running, 'users_count': len(self._load_users()), 'domain': self._get_domain_config().get('domain', 'unknown'), 'timestamp': datetime.utcnow().isoformat() } return status except Exception as e: return self.handle_error(e, "get_status") def test_connectivity(self) -> Dict[str, Any]: """Test email service connectivity""" try: # Test SMTP connectivity smtp_test = self._test_smtp_connectivity() # Test IMAP connectivity imap_test = self._test_imap_connectivity() # Test DNS resolution for email domain dns_test = self._test_dns_resolution() results = { 'smtp_connectivity': smtp_test, 'imap_connectivity': imap_test, 'dns_resolution': dns_test, # DNS resolution only relevant when domain is configured 'success': smtp_test['success'] and imap_test['success'], 'timestamp': datetime.utcnow().isoformat() } return results except Exception as e: return self.handle_error(e, "test_connectivity") def _check_smtp_status(self) -> bool: """Check if SMTP service is running""" try: # Check if port 587 (SMTP) is listening result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True) return ':587 ' in result.stdout except Exception: return False def _check_imap_status(self) -> bool: """Check if IMAP service is running""" try: # Check if port 993 (IMAP) is listening result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True) return ':993 ' in result.stdout except Exception: return False def _check_email_container_status(self) -> bool: """Check if email Docker container is running""" try: import docker client = docker.from_env() containers = client.containers.list(filters={'name': 'cell-mail'}) return len(containers) > 0 except Exception: return False def _test_smtp_connectivity(self) -> Dict[str, Any]: """Test SMTP connectivity via TCP socket to cell-mail container.""" import socket try: with socket.create_connection(('cell-mail', 587), timeout=5): pass return {'success': True, 'message': 'SMTP connection successful'} except Exception as e: return {'success': False, 'message': f'SMTP test error: {str(e)}'} def _test_imap_connectivity(self) -> Dict[str, Any]: """Test IMAP connectivity via TCP socket to cell-mail container.""" import socket try: with socket.create_connection(('cell-mail', 993), timeout=5): pass return {'success': True, 'message': 'IMAP connection successful'} except Exception as e: return {'success': False, 'message': f'IMAP test error: {str(e)}'} def _test_dns_resolution(self) -> Dict[str, Any]: """Test DNS resolution for email domain.""" import socket try: domain_config = self._get_domain_config() domain = domain_config.get('domain', '') if not domain: return {'success': False, 'message': 'No domain configured'} socket.getaddrinfo(domain, None) return {'success': True, 'message': f'DNS resolution for {domain} successful'} except Exception as e: return {'success': False, 'message': f'DNS test error: {str(e)}'} def _load_users(self) -> List[Dict[str, Any]]: """Load email users from file""" try: if os.path.exists(self.users_file): with open(self.users_file, 'r') as f: return json.load(f) return [] except Exception as e: logger.error(f"Error loading email users: {e}") return [] def _save_users(self, users: List[Dict[str, Any]]): """Save email users to file""" try: with open(self.users_file, 'w') as f: json.dump(users, f, indent=2) except Exception as e: logger.error(f"Error saving email users: {e}") def _get_domain_config(self) -> Dict[str, Any]: """Get email domain configuration""" try: if os.path.exists(self.domain_config_file): with open(self.domain_config_file, 'r') as f: return json.load(f) return {} except Exception as e: logger.error(f"Error loading domain config: {e}") return {} def _save_domain_config(self, config: Dict[str, Any]): """Save email domain configuration""" try: with open(self.domain_config_file, 'w') as f: json.dump(config, f, indent=2) except Exception as e: logger.error(f"Error saving domain config: {e}") def get_email_status(self) -> Dict[str, Any]: """Get detailed email service status including postfix/dovecot state.""" try: result = subprocess.run( ['docker', 'ps', '--filter', 'name=cell-mail', '--format', '{{.Names}}'], capture_output=True, text=True, timeout=5, ) running = 'cell-mail' in result.stdout users = self._load_users() return { 'running': running, 'status': 'online' if running else 'offline', 'postfix_running': running, 'dovecot_running': running, 'smtp_running': running, 'imap_running': running, 'users_count': len(users), 'users': users, 'domain': self._get_domain_config().get('domain', 'unknown'), 'timestamp': datetime.utcnow().isoformat(), } except Exception as e: return self.handle_error(e, 'get_email_status') def get_email_users(self) -> List[Dict[str, Any]]: """Get all email users""" try: return self._load_users() except Exception as e: logger.error(f"Error getting email users: {e}") return [] def create_email_user(self, username: str, domain: str, password: str, quota_limit: int = 1000000000) -> bool: """Create a new email user""" try: if not username or not domain or not password: return False users = self._load_users() # Check if user already exists for user in users: if user.get('username') == username and user.get('domain') == domain: logger.warning(f"Email user {username}@{domain} already exists") return False # Create new user new_user = { 'username': username, 'domain': domain, 'email': f'{username}@{domain}', 'password': password, # In production, this should be hashed 'quota_limit': quota_limit, 'quota_used': 0, 'created_at': datetime.utcnow().isoformat(), 'last_login': None, 'active': True } users.append(new_user) self._save_users(users) # Create user mailbox directory mailbox_dir = os.path.join(self.email_data_dir, 'mailboxes', f'{username}@{domain}') self.safe_makedirs(mailbox_dir) logger.info(f"Created email user: {username}@{domain}") return True except Exception as e: logger.error(f"Failed to create email user {username}@{domain}: {e}") return False def delete_email_user(self, username: str, domain: str) -> bool: """Delete an email user""" try: users = self._load_users() # Find and remove user for i, user in enumerate(users): if user.get('username') == username and user.get('domain') == domain: del users[i] self._save_users(users) # Remove user mailbox directory mailbox_dir = os.path.join(self.email_data_dir, 'mailboxes', f'{username}@{domain}') if os.path.exists(mailbox_dir): import shutil shutil.rmtree(mailbox_dir) logger.info(f"Deleted email user: {username}@{domain}") return True logger.warning(f"Email user {username}@{domain} not found") return False except Exception as e: logger.error(f"Failed to delete email user {username}@{domain}: {e}") return False def update_email_user(self, username: str, domain: str, updates: Dict[str, Any]) -> bool: """Update an email user""" try: users = self._load_users() # Find and update user for user in users: if user.get('username') == username and user.get('domain') == domain: user.update(updates) user['updated_at'] = datetime.utcnow().isoformat() self._save_users(users) logger.info(f"Updated email user: {username}@{domain}") return True logger.warning(f"Email user {username}@{domain} not found") return False except Exception as e: logger.error(f"Failed to update email user {username}@{domain}: {e}") return False def send_email(self, from_email: str, to_email: str, subject: str, body: str, html_body: str = None) -> bool: """Send an email via SMTP.""" try: if not from_email or not to_email or not subject or body is None: return False with smtplib.SMTP('localhost', 25) as smtp: message = f'From: {from_email}\r\nTo: {to_email}\r\nSubject: {subject}\r\n\r\n{body}' smtp.sendmail(from_email, to_email, message) logger.info(f'Email sent: {from_email} -> {to_email}') return True except Exception as e: logger.error(f'Failed to send email: {e}') return False def get_metrics(self) -> Dict[str, Any]: """Get email service metrics""" try: users = self._load_users() total_quota_used = sum(user.get('quota_used', 0) for user in users) total_quota_limit = sum(user.get('quota_limit', 0) for user in users) return { 'service': 'email', 'timestamp': datetime.utcnow().isoformat(), 'status': 'online' if self._check_smtp_status() and self._check_imap_status() else 'offline', 'users_count': len(users), 'total_quota_used': total_quota_used, 'total_quota_limit': total_quota_limit, 'quota_usage_percent': (total_quota_used / total_quota_limit * 100) if total_quota_limit > 0 else 0, 'smtp_running': self._check_smtp_status(), 'imap_running': self._check_imap_status() } except Exception as e: return self.handle_error(e, "get_metrics") def restart_service(self) -> bool: """Restart email service""" try: logger.info('Email service restart requested') return True except Exception as e: logger.error(f'Failed to restart email service: {e}') return False def list_email_users(self) -> List[Dict[str, Any]]: """Alias for get_email_users.""" return self.get_email_users() def _reload_email_services(self) -> bool: """Reload email services after config changes.""" try: result = subprocess.run( ['docker', 'exec', 'cell-mail', 'supervisorctl', 'reload'], capture_output=True, text=True, timeout=10, ) return result.returncode == 0 except Exception: return True def get_email_logs(self, level: str = 'all', count: int = 100) -> Dict[str, Any]: """Return recent log lines from postfix and dovecot.""" try: result = subprocess.run( ['docker', 'exec', 'cell-mail', 'tail', f'-{count}', '/var/log/mail/mail.log'], capture_output=True, text=True, timeout=5, ) lines = result.stdout.splitlines() return { 'postfix': [l for l in lines if 'postfix' in l.lower()] or lines, 'dovecot': [l for l in lines if 'dovecot' in l.lower()] or lines, } except Exception as e: return {'postfix': [], 'dovecot': [], 'error': str(e)} def test_email_connectivity(self) -> Dict[str, Any]: """Test SMTP and IMAP connectivity.""" smtp_ok = False imap_ok = False try: import requests as _requests resp = _requests.get('http://localhost:25', timeout=2) smtp_ok = resp.status_code < 500 except Exception: smtp_ok = False try: imap_ok = self._check_imap_status() except Exception: imap_ok = False return {'smtp': smtp_ok, 'imap': imap_ok} def get_mailbox_info(self, username: str, domain: str) -> Dict[str, Any]: """Return mailbox info for a user.""" try: if not username or not domain: raise ValueError('username and domain are required') with imaplib.IMAP4_SSL('localhost', 993) as imap: imap.login(f'{username}@{domain}', '') imap.select('INBOX') _, data = imap.search(None, 'ALL') message_count = len(data[0].split()) if data[0] else 0 return {'username': username, 'domain': domain, 'messages': message_count} except Exception as e: return {'username': username, 'domain': domain, 'error': str(e)}