401 lines
16 KiB
Python
401 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Email Manager for Personal Internet Cell
|
|
Handles email service configuration and user management
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
from base_service_manager import BaseServiceManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class EmailManager(BaseServiceManager):
|
|
"""Manages email service configuration and users"""
|
|
|
|
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
|
|
super().__init__('email', data_dir, config_dir)
|
|
self.email_data_dir = os.path.join(data_dir, 'email')
|
|
self.users_file = os.path.join(self.email_data_dir, 'users.json')
|
|
self.domain_config_file = os.path.join(self.config_dir, 'email', 'domain.json')
|
|
|
|
# Ensure directories exist
|
|
os.makedirs(self.email_data_dir, exist_ok=True)
|
|
os.makedirs(os.path.dirname(self.domain_config_file), exist_ok=True)
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get email service status"""
|
|
try:
|
|
# Check if we're running in Docker environment
|
|
import os
|
|
is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true'
|
|
|
|
if is_docker:
|
|
# Check if email container is actually running
|
|
container_running = self._check_email_container_status()
|
|
status = {
|
|
'running': container_running,
|
|
'status': 'online' if container_running else 'offline',
|
|
'smtp_running': container_running,
|
|
'imap_running': container_running,
|
|
'users_count': 0,
|
|
'domain': 'cell.local',
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
else:
|
|
# Check actual service status in production
|
|
smtp_running = self._check_smtp_status()
|
|
imap_running = self._check_imap_status()
|
|
|
|
status = {
|
|
'running': smtp_running and imap_running,
|
|
'status': 'online' if (smtp_running and imap_running) else 'offline',
|
|
'smtp_running': smtp_running,
|
|
'imap_running': imap_running,
|
|
'users_count': len(self._load_users()),
|
|
'domain': self._get_domain_config().get('domain', 'unknown'),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return status
|
|
except Exception as e:
|
|
return self.handle_error(e, "get_status")
|
|
|
|
def test_connectivity(self) -> Dict[str, Any]:
|
|
"""Test email service connectivity"""
|
|
try:
|
|
# Test SMTP connectivity
|
|
smtp_test = self._test_smtp_connectivity()
|
|
|
|
# Test IMAP connectivity
|
|
imap_test = self._test_imap_connectivity()
|
|
|
|
# Test DNS resolution for email domain
|
|
dns_test = self._test_dns_resolution()
|
|
|
|
results = {
|
|
'smtp_connectivity': smtp_test,
|
|
'imap_connectivity': imap_test,
|
|
'dns_resolution': dns_test,
|
|
'success': smtp_test['success'] and imap_test['success'] and dns_test['success'],
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return results
|
|
except Exception as e:
|
|
return self.handle_error(e, "test_connectivity")
|
|
|
|
def _check_smtp_status(self) -> bool:
|
|
"""Check if SMTP service is running"""
|
|
try:
|
|
# Check if port 587 (SMTP) is listening
|
|
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
|
|
return ':587 ' in result.stdout
|
|
except Exception:
|
|
return False
|
|
|
|
def _check_imap_status(self) -> bool:
|
|
"""Check if IMAP service is running"""
|
|
try:
|
|
# Check if port 993 (IMAP) is listening
|
|
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
|
|
return ':993 ' in result.stdout
|
|
except Exception:
|
|
return False
|
|
|
|
def _check_email_container_status(self) -> bool:
|
|
"""Check if email Docker container is running"""
|
|
try:
|
|
import docker
|
|
client = docker.from_env()
|
|
containers = client.containers.list(filters={'name': 'cell-mail'})
|
|
return len(containers) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
def _test_smtp_connectivity(self) -> Dict[str, Any]:
|
|
"""Test SMTP connectivity"""
|
|
try:
|
|
# Test SMTP connection to localhost
|
|
result = subprocess.run(['telnet', 'localhost', '587'],
|
|
capture_output=True, text=True, timeout=5)
|
|
|
|
success = result.returncode == 0 or 'Connected' in result.stdout
|
|
return {
|
|
'success': success,
|
|
'message': 'SMTP connection successful' if success else 'SMTP connection failed'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'SMTP test error: {str(e)}'
|
|
}
|
|
|
|
def _test_imap_connectivity(self) -> Dict[str, Any]:
|
|
"""Test IMAP connectivity"""
|
|
try:
|
|
# Test IMAP connection to localhost
|
|
result = subprocess.run(['telnet', 'localhost', '993'],
|
|
capture_output=True, text=True, timeout=5)
|
|
|
|
success = result.returncode == 0 or 'Connected' in result.stdout
|
|
return {
|
|
'success': success,
|
|
'message': 'IMAP connection successful' if success else 'IMAP connection failed'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'IMAP test error: {str(e)}'
|
|
}
|
|
|
|
def _test_dns_resolution(self) -> Dict[str, Any]:
|
|
"""Test DNS resolution for email domain"""
|
|
try:
|
|
domain_config = self._get_domain_config()
|
|
domain = domain_config.get('domain', '')
|
|
|
|
if not domain:
|
|
return {
|
|
'success': False,
|
|
'message': 'No domain configured'
|
|
}
|
|
|
|
# Test MX record resolution
|
|
result = subprocess.run(['nslookup', '-type=mx', domain],
|
|
capture_output=True, text=True, timeout=10)
|
|
|
|
success = result.returncode == 0 and 'mail exchanger' in result.stdout.lower()
|
|
return {
|
|
'success': success,
|
|
'message': f'DNS resolution for {domain} successful' if success else f'DNS resolution for {domain} failed'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'DNS test error: {str(e)}'
|
|
}
|
|
|
|
def _load_users(self) -> List[Dict[str, Any]]:
|
|
"""Load email users from file"""
|
|
try:
|
|
if os.path.exists(self.users_file):
|
|
with open(self.users_file, 'r') as f:
|
|
return json.load(f)
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error loading email users: {e}")
|
|
return []
|
|
|
|
def _save_users(self, users: List[Dict[str, Any]]):
|
|
"""Save email users to file"""
|
|
try:
|
|
with open(self.users_file, 'w') as f:
|
|
json.dump(users, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Error saving email users: {e}")
|
|
|
|
def _get_domain_config(self) -> Dict[str, Any]:
|
|
"""Get email domain configuration"""
|
|
try:
|
|
if os.path.exists(self.domain_config_file):
|
|
with open(self.domain_config_file, 'r') as f:
|
|
return json.load(f)
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"Error loading domain config: {e}")
|
|
return {}
|
|
|
|
def _save_domain_config(self, config: Dict[str, Any]):
|
|
"""Save email domain configuration"""
|
|
try:
|
|
with open(self.domain_config_file, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Error saving domain config: {e}")
|
|
|
|
def get_email_status(self) -> Dict[str, Any]:
|
|
"""Get detailed email service status"""
|
|
try:
|
|
status = self.get_status()
|
|
|
|
# Add user details
|
|
users = self._load_users()
|
|
user_details = []
|
|
|
|
for user in users:
|
|
user_detail = {
|
|
'username': user.get('username', ''),
|
|
'domain': user.get('domain', ''),
|
|
'email': user.get('email', ''),
|
|
'created_at': user.get('created_at', ''),
|
|
'last_login': user.get('last_login', ''),
|
|
'quota_used': user.get('quota_used', 0),
|
|
'quota_limit': user.get('quota_limit', 0)
|
|
}
|
|
user_details.append(user_detail)
|
|
|
|
status['users'] = user_details
|
|
return status
|
|
except Exception as e:
|
|
return self.handle_error(e, "get_email_status")
|
|
|
|
def get_email_users(self) -> List[Dict[str, Any]]:
|
|
"""Get all email users"""
|
|
try:
|
|
return self._load_users()
|
|
except Exception as e:
|
|
logger.error(f"Error getting email users: {e}")
|
|
return []
|
|
|
|
def create_email_user(self, username: str, domain: str, password: str,
|
|
quota_limit: int = 1000000000) -> bool:
|
|
"""Create a new email user"""
|
|
try:
|
|
users = self._load_users()
|
|
|
|
# Check if user already exists
|
|
for user in users:
|
|
if user.get('username') == username and user.get('domain') == domain:
|
|
logger.warning(f"Email user {username}@{domain} already exists")
|
|
return False
|
|
|
|
# Create new user
|
|
new_user = {
|
|
'username': username,
|
|
'domain': domain,
|
|
'email': f'{username}@{domain}',
|
|
'password': password, # In production, this should be hashed
|
|
'quota_limit': quota_limit,
|
|
'quota_used': 0,
|
|
'created_at': datetime.utcnow().isoformat(),
|
|
'last_login': None,
|
|
'active': True
|
|
}
|
|
|
|
users.append(new_user)
|
|
self._save_users(users)
|
|
|
|
# Create user mailbox directory
|
|
mailbox_dir = os.path.join(self.email_data_dir, 'mailboxes', f'{username}@{domain}')
|
|
os.makedirs(mailbox_dir, exist_ok=True)
|
|
|
|
logger.info(f"Created email user: {username}@{domain}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to create email user {username}@{domain}: {e}")
|
|
return False
|
|
|
|
def delete_email_user(self, username: str, domain: str) -> bool:
|
|
"""Delete an email user"""
|
|
try:
|
|
users = self._load_users()
|
|
|
|
# Find and remove user
|
|
for i, user in enumerate(users):
|
|
if user.get('username') == username and user.get('domain') == domain:
|
|
del users[i]
|
|
self._save_users(users)
|
|
|
|
# Remove user mailbox directory
|
|
mailbox_dir = os.path.join(self.email_data_dir, 'mailboxes', f'{username}@{domain}')
|
|
if os.path.exists(mailbox_dir):
|
|
import shutil
|
|
shutil.rmtree(mailbox_dir)
|
|
|
|
logger.info(f"Deleted email user: {username}@{domain}")
|
|
return True
|
|
|
|
logger.warning(f"Email user {username}@{domain} not found")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete email user {username}@{domain}: {e}")
|
|
return False
|
|
|
|
def update_email_user(self, username: str, domain: str,
|
|
updates: Dict[str, Any]) -> bool:
|
|
"""Update an email user"""
|
|
try:
|
|
users = self._load_users()
|
|
|
|
# Find and update user
|
|
for user in users:
|
|
if user.get('username') == username and user.get('domain') == domain:
|
|
user.update(updates)
|
|
user['updated_at'] = datetime.utcnow().isoformat()
|
|
self._save_users(users)
|
|
|
|
logger.info(f"Updated email user: {username}@{domain}")
|
|
return True
|
|
|
|
logger.warning(f"Email user {username}@{domain} not found")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to update email user {username}@{domain}: {e}")
|
|
return False
|
|
|
|
def send_email(self, from_email: str, to_email: str, subject: str,
|
|
body: str, html_body: str = None) -> bool:
|
|
"""Send an email"""
|
|
try:
|
|
# In a real implementation, this would use a proper SMTP library
|
|
# For now, we'll just log the email details
|
|
|
|
email_data = {
|
|
'from': from_email,
|
|
'to': to_email,
|
|
'subject': subject,
|
|
'body': body,
|
|
'html_body': html_body,
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
# Save email to outbox
|
|
outbox_dir = os.path.join(self.email_data_dir, 'outbox')
|
|
os.makedirs(outbox_dir, exist_ok=True)
|
|
|
|
email_file = os.path.join(outbox_dir, f"{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}_{from_email.replace('@', '_at_')}.json")
|
|
with open(email_file, 'w') as f:
|
|
json.dump(email_data, f, indent=2)
|
|
|
|
logger.info(f"Email queued for sending: {from_email} -> {to_email}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to send email: {e}")
|
|
return False
|
|
|
|
def get_metrics(self) -> Dict[str, Any]:
|
|
"""Get email service metrics"""
|
|
try:
|
|
users = self._load_users()
|
|
total_quota_used = sum(user.get('quota_used', 0) for user in users)
|
|
total_quota_limit = sum(user.get('quota_limit', 0) for user in users)
|
|
|
|
return {
|
|
'service': 'email',
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'status': 'online' if self._check_smtp_status() and self._check_imap_status() else 'offline',
|
|
'users_count': len(users),
|
|
'total_quota_used': total_quota_used,
|
|
'total_quota_limit': total_quota_limit,
|
|
'quota_usage_percent': (total_quota_used / total_quota_limit * 100) if total_quota_limit > 0 else 0,
|
|
'smtp_running': self._check_smtp_status(),
|
|
'imap_running': self._check_imap_status()
|
|
}
|
|
except Exception as e:
|
|
return self.handle_error(e, "get_metrics")
|
|
|
|
def restart_service(self) -> bool:
|
|
"""Restart email service"""
|
|
try:
|
|
# In a real implementation, this would restart the mail server
|
|
# For now, we'll just log the restart
|
|
logger.info("Email service restart requested")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to restart email service: {e}")
|
|
return False |