Files
pic/api/email_manager.py
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

513 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Email Manager for Personal Internet Cell
Handles email service configuration and user management
"""
import os
import json
import smtplib
import imaplib
import subprocess
import logging
from datetime import datetime
from typing import Dict, List, Optional, Any
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
class EmailManager(BaseServiceManager):
"""Manages email service configuration and users"""
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('email', data_dir, config_dir)
self.email_data_dir = os.path.join(data_dir, 'email')
self.email_dir = self.email_data_dir # alias used by tests
self.postfix_dir = os.path.join(self.email_dir, 'postfix')
self.dovecot_dir = os.path.join(self.email_dir, 'dovecot')
self.users_file = os.path.join(self.email_data_dir, 'users.json')
self.domain_config_file = os.path.join(self.config_dir, 'email', 'domain.json')
self.safe_makedirs(self.email_data_dir)
self.safe_makedirs(self.postfix_dir)
self.safe_makedirs(self.dovecot_dir)
self.safe_makedirs(os.path.dirname(self.domain_config_file))
def _get_service_config(self) -> Dict[str, Any]:
"""Read configured ports/domain from service config file."""
cfg = self.get_config()
if isinstance(cfg, dict) and 'error' not in cfg:
return cfg
return {}
def get_status(self) -> Dict[str, Any]:
"""Get email service status"""
try:
svc_cfg = self._get_service_config()
smtp_port = svc_cfg.get('smtp_port', 587)
imap_port = svc_cfg.get('imap_port', 993)
domain = svc_cfg.get('domain') or self._get_domain_config().get('domain', 'cell.local')
import os
is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true'
if is_docker:
container_running = self._check_email_container_status()
status = {
'running': container_running,
'status': 'online' if container_running else 'offline',
'smtp_running': container_running,
'imap_running': container_running,
'users_count': 0,
'domain': domain,
'smtp_port': smtp_port,
'imap_port': imap_port,
'timestamp': datetime.utcnow().isoformat()
}
else:
smtp_running = self._check_smtp_status()
imap_running = self._check_imap_status()
status = {
'running': smtp_running and imap_running,
'status': 'online' if (smtp_running and imap_running) else 'offline',
'smtp_running': smtp_running,
'imap_running': imap_running,
'users_count': len(self._load_users()),
'domain': domain,
'smtp_port': smtp_port,
'imap_port': imap_port,
'timestamp': datetime.utcnow().isoformat()
}
return status
except Exception as e:
return self.handle_error(e, "get_status")
def test_connectivity(self) -> Dict[str, Any]:
"""Test email service connectivity"""
try:
# Test SMTP connectivity
smtp_test = self._test_smtp_connectivity()
# Test IMAP connectivity
imap_test = self._test_imap_connectivity()
# Test DNS resolution for email domain
dns_test = self._test_dns_resolution()
results = {
'smtp_connectivity': smtp_test,
'imap_connectivity': imap_test,
'dns_resolution': dns_test,
# DNS resolution only relevant when domain is configured
'success': smtp_test['success'] and imap_test['success'],
'timestamp': datetime.utcnow().isoformat()
}
return results
except Exception as e:
return self.handle_error(e, "test_connectivity")
def _check_smtp_status(self) -> bool:
"""Check if SMTP service is running"""
try:
# Check if port 587 (SMTP) is listening
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
return ':587 ' in result.stdout
except Exception:
return False
def _check_imap_status(self) -> bool:
"""Check if IMAP service is running"""
try:
# Check if port 993 (IMAP) is listening
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
return ':993 ' in result.stdout
except Exception:
return False
def _check_email_container_status(self) -> bool:
"""Check if email Docker container is running"""
try:
import docker
client = docker.from_env()
containers = client.containers.list(filters={'name': 'cell-mail'})
return len(containers) > 0
except Exception:
return False
def _test_smtp_connectivity(self) -> Dict[str, Any]:
"""Test SMTP connectivity via TCP socket to cell-mail container."""
import socket
try:
with socket.create_connection(('cell-mail', 587), timeout=5):
pass
return {'success': True, 'message': 'SMTP connection successful'}
except Exception as e:
return {'success': False, 'message': f'SMTP test error: {str(e)}'}
def _test_imap_connectivity(self) -> Dict[str, Any]:
"""Test IMAP connectivity via TCP socket to cell-mail container."""
import socket
try:
with socket.create_connection(('cell-mail', 993), timeout=5):
pass
return {'success': True, 'message': 'IMAP connection successful'}
except Exception as e:
return {'success': False, 'message': f'IMAP test error: {str(e)}'}
def _test_dns_resolution(self) -> Dict[str, Any]:
"""Test DNS resolution for email domain."""
import socket
try:
domain_config = self._get_domain_config()
domain = domain_config.get('domain', '')
if not domain:
return {'success': False, 'message': 'No domain configured'}
socket.getaddrinfo(domain, None)
return {'success': True, 'message': f'DNS resolution for {domain} successful'}
except Exception as e:
return {'success': False, 'message': f'DNS test error: {str(e)}'}
def _load_users(self) -> List[Dict[str, Any]]:
"""Load email users from file"""
try:
if os.path.exists(self.users_file):
with open(self.users_file, 'r') as f:
return json.load(f)
return []
except Exception as e:
logger.error(f"Error loading email users: {e}")
return []
def _save_users(self, users: List[Dict[str, Any]]):
"""Save email users to file"""
try:
with open(self.users_file, 'w') as f:
json.dump(users, f, indent=2)
except Exception as e:
logger.error(f"Error saving email users: {e}")
def _get_domain_config(self) -> Dict[str, Any]:
"""Get email domain configuration"""
try:
if os.path.exists(self.domain_config_file):
with open(self.domain_config_file, 'r') as f:
return json.load(f)
return {}
except Exception as e:
logger.error(f"Error loading domain config: {e}")
return {}
def _save_domain_config(self, config: Dict[str, Any]):
"""Save email domain configuration"""
try:
with open(self.domain_config_file, 'w') as f:
json.dump(config, f, indent=2)
except Exception as e:
logger.error(f"Error saving domain config: {e}")
def apply_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Write config to mailserver.env and restart cell-mail."""
restarted = []
warnings = []
env_file = os.path.join(self.config_dir, 'mail', 'mailserver.env')
try:
# Read existing env file
env_lines = []
if os.path.exists(env_file):
with open(env_file) as f:
env_lines = f.readlines()
def _set_env(lines, key, value):
found = False
result = []
for l in lines:
if l.startswith(f'{key}='):
result.append(f'{key}={value}\n')
found = True
else:
result.append(l)
if not found:
result.append(f'{key}={value}\n')
return result
changed = False
if 'domain' in config and config['domain']:
domain = config['domain']
env_lines = _set_env(env_lines, 'OVERRIDE_HOSTNAME', f'mail.{domain}')
env_lines = _set_env(env_lines, 'POSTMASTER_ADDRESS', f'admin@{domain}')
# Also persist to domain_config_file
self._save_domain_config({'domain': domain})
changed = True
if changed:
with open(env_file, 'w') as f:
f.writelines(env_lines)
self._restart_container('cell-mail')
restarted.append('cell-mail')
except Exception as e:
warnings.append(f"mailserver.env update failed: {e}")
logger.error(f"apply_config error: {e}")
return {'restarted': restarted, 'warnings': warnings}
def get_email_status(self) -> Dict[str, Any]:
"""Get detailed email service status including postfix/dovecot state."""
try:
result = subprocess.run(
['docker', 'ps', '--filter', 'name=cell-mail', '--format', '{{.Names}}'],
capture_output=True, text=True, timeout=5,
)
running = 'cell-mail' in result.stdout
users = self._load_users()
return {
'running': running,
'status': 'online' if running else 'offline',
'postfix_running': running,
'dovecot_running': running,
'smtp_running': running,
'imap_running': running,
'users_count': len(users),
'users': users,
'domain': self._get_domain_config().get('domain', 'unknown'),
'timestamp': datetime.utcnow().isoformat(),
}
except Exception as e:
return self.handle_error(e, 'get_email_status')
def get_email_users(self) -> List[Dict[str, Any]]:
"""Get all email users"""
try:
return self._load_users()
except Exception as e:
logger.error(f"Error getting email users: {e}")
return []
def create_email_user(self, username: str, domain: str, password: str,
quota_limit: int = 1000000000) -> bool:
"""Create a new email user"""
try:
if not username or not domain or not password:
return False
users = self._load_users()
# Check if user already exists
for user in users:
if user.get('username') == username and user.get('domain') == domain:
logger.warning(f"Email user {username}@{domain} already exists")
return False
# Create new user
# SECURITY: Do NOT persist the plaintext password here. The email
# password is the same as the user's VPN auth password and storing
# it in plain JSON would leak every user credential if this file
# is read. Auth verification goes through auth_manager; the actual
# mailbox auth is handled by the cell-mail container (Dovecot),
# which has its own credential store. This JSON is metadata only.
new_user = {
'username': username,
'domain': domain,
'email': f'{username}@{domain}',
'quota_limit': quota_limit,
'quota_used': 0,
'created_at': datetime.utcnow().isoformat(),
'last_login': None,
'active': True
}
users.append(new_user)
self._save_users(users)
# Sync user list to cell_config.json (best-effort, non-fatal)
self._sync_users_to_cell_config()
# Create user mailbox directory
mailbox_dir = os.path.join(self.email_data_dir, 'mailboxes', f'{username}@{domain}')
self.safe_makedirs(mailbox_dir)
logger.info(f"Created email user: {username}@{domain}")
return True
except Exception as e:
logger.error(f"Failed to create email user {username}@{domain}: {e}")
return False
def delete_email_user(self, username: str, domain: str) -> bool:
"""Delete an email user"""
try:
users = self._load_users()
# Find and remove user
for i, user in enumerate(users):
if user.get('username') == username and user.get('domain') == domain:
del users[i]
self._save_users(users)
# Sync user list to cell_config.json (best-effort, non-fatal)
self._sync_users_to_cell_config()
# Remove user mailbox directory
mailbox_dir = os.path.join(self.email_data_dir, 'mailboxes', f'{username}@{domain}')
if os.path.exists(mailbox_dir):
import shutil
shutil.rmtree(mailbox_dir)
logger.info(f"Deleted email user: {username}@{domain}")
return True
logger.warning(f"Email user {username}@{domain} not found")
return False
except Exception as e:
logger.error(f"Failed to delete email user {username}@{domain}: {e}")
return False
def update_email_user(self, username: str, domain: str,
updates: Dict[str, Any]) -> bool:
"""Update an email user"""
try:
users = self._load_users()
# Find and update user
for user in users:
if user.get('username') == username and user.get('domain') == domain:
user.update(updates)
user['updated_at'] = datetime.utcnow().isoformat()
self._save_users(users)
logger.info(f"Updated email user: {username}@{domain}")
return True
logger.warning(f"Email user {username}@{domain} not found")
return False
except Exception as e:
logger.error(f"Failed to update email user {username}@{domain}: {e}")
return False
def send_email(self, from_email: str, to_email: str, subject: str,
body: str, html_body: str = None) -> bool:
"""Send an email via SMTP."""
try:
if not from_email or not to_email or not subject or body is None:
return False
with smtplib.SMTP('localhost', 25) as smtp:
message = f'From: {from_email}\r\nTo: {to_email}\r\nSubject: {subject}\r\n\r\n{body}'
smtp.sendmail(from_email, to_email, message)
logger.info(f'Email sent: {from_email} -> {to_email}')
return True
except Exception as e:
logger.error(f'Failed to send email: {e}')
return False
def get_metrics(self) -> Dict[str, Any]:
"""Get email service metrics"""
try:
users = self._load_users()
total_quota_used = sum(user.get('quota_used', 0) for user in users)
total_quota_limit = sum(user.get('quota_limit', 0) for user in users)
return {
'service': 'email',
'timestamp': datetime.utcnow().isoformat(),
'status': 'online' if self._check_smtp_status() and self._check_imap_status() else 'offline',
'users_count': len(users),
'total_quota_used': total_quota_used,
'total_quota_limit': total_quota_limit,
'quota_usage_percent': (total_quota_used / total_quota_limit * 100) if total_quota_limit > 0 else 0,
'smtp_running': self._check_smtp_status(),
'imap_running': self._check_imap_status()
}
except Exception as e:
return self.handle_error(e, "get_metrics")
def _sync_users_to_cell_config(self):
"""Best-effort sync of the email user list into cell_config.json via ConfigManager.
Only safe metadata (no passwords) is written. Failures are logged as
warnings so they never block the per-service operation that triggered them.
"""
try:
# Import here to avoid circular imports and to tolerate environments
# where config_manager is not on sys.path.
from config_manager import ConfigManager
cm = ConfigManager()
# Build safe user list: strip any sensitive keys that should not
# land in the shared config file.
_SENSITIVE = {'password', 'hashed_password', 'password_hash'}
safe_users = [
{k: v for k, v in u.items() if k not in _SENSITIVE}
for u in self._load_users()
]
existing = cm.get_service_config('email')
existing['users'] = safe_users
cm.update_service_config('email', existing)
except Exception as e:
self.logger.warning(f"Failed to sync email users to cell_config.json: {e}")
def restart_service(self) -> bool:
"""Restart email service (restarts the cell-mail Docker container)."""
try:
logger.info('Email service restart requested')
return self._restart_container('cell-mail')
except Exception as e:
logger.error(f'Failed to restart email service: {e}')
return False
def list_email_users(self) -> List[Dict[str, Any]]:
"""Alias for get_email_users."""
return self.get_email_users()
def _reload_email_services(self) -> bool:
"""Reload email services after config changes."""
try:
result = subprocess.run(
['docker', 'exec', 'cell-mail', 'supervisorctl', 'reload'],
capture_output=True, text=True, timeout=10,
)
return result.returncode == 0
except Exception:
return True
def get_email_logs(self, level: str = 'all', count: int = 100) -> Dict[str, Any]:
"""Return recent log lines from postfix and dovecot."""
try:
result = subprocess.run(
['docker', 'exec', 'cell-mail', 'tail', f'-{count}', '/var/log/mail/mail.log'],
capture_output=True, text=True, timeout=5,
)
lines = result.stdout.splitlines()
return {
'postfix': [l for l in lines if 'postfix' in l.lower()] or lines,
'dovecot': [l for l in lines if 'dovecot' in l.lower()] or lines,
}
except Exception as e:
return {'postfix': [], 'dovecot': [], 'error': str(e)}
def test_email_connectivity(self) -> Dict[str, Any]:
"""Test SMTP and IMAP connectivity."""
smtp_ok = False
imap_ok = False
try:
import requests as _requests
resp = _requests.get('http://localhost:25', timeout=2)
smtp_ok = resp.status_code < 500
except Exception:
smtp_ok = False
try:
imap_ok = self._check_imap_status()
except Exception:
imap_ok = False
return {'smtp': smtp_ok, 'imap': imap_ok}
def get_mailbox_info(self, username: str, domain: str) -> Dict[str, Any]:
"""Return mailbox info for a user."""
try:
if not username or not domain:
raise ValueError('username and domain are required')
with imaplib.IMAP4_SSL('localhost', 993) as imap:
imap.login(f'{username}@{domain}', '')
imap.select('INBOX')
_, data = imap.search(None, 'ALL')
message_count = len(data[0].split()) if data[0] else 0
return {'username': username, 'domain': domain, 'messages': message_count}
except Exception as e:
return {'username': username, 'domain': domain, 'error': str(e)}