Files
pic/api/routes/email.py
T
roof 1f016de855
Unit Tests / test (push) Successful in 11m35s
feat: make DDNS domain_name the effective domain across all services
- ConfigManager.get_effective_domain(): returns domain_name when DDNS
  active (pic_ngo/cloudflare/duckdns), domain otherwise. Used by all
  public-facing services so they use the real registered FQDN.
- ConfigManager.get_internal_domain(): always returns _identity.domain
  (CoreDNS zone name, dnsmasq, cell-link invites — stays internal).
- Silent migration: if domain_mode != lan and domain is generic "cell",
  auto-set to {cell_name}.local for unique CoreDNS zone naming.
- caddy_manager: fix custom_domain bug — cloudflare/http01 modes were
  reading identity.get('custom_domain') which never exists; now reads
  domain_name correctly.
- routes/config, app: expose effective_domain in GET /api/config and
  /api/status responses.
- email_manager, routes/email: use get_effective_domain() for
  OVERRIDE_HOSTNAME, POSTMASTER_ADDRESS, and new-user email defaults.
- ServiceBus.IDENTITY_CHANGED event: emitted from PUT /api/config and
  POST /api/ddns/register after identity writes; caddy_manager and
  email_manager subscribe to regenerate config automatically.
- Settings.jsx: hide Local Domain input in non-LAN modes; show
  read-only effective_domain with "managed by DDNS" badge and an
  Advanced toggle for the internal CoreDNS zone name.
- 11 new test classes covering all new helpers, event subscriptions,
  caddy/email handlers, and the custom_domain fix.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 02:48:47 -04:00

93 lines
3.3 KiB
Python

import logging
from flask import Blueprint, request, jsonify
logger = logging.getLogger('picell')
bp = Blueprint('email', __name__)
@bp.route('/api/email/users', methods=['GET'])
def get_email_users():
"""Get email users."""
try:
from app import email_manager
users = email_manager.get_users()
return jsonify(users)
except Exception as e:
logger.error(f"Error getting email users: {e}")
return jsonify({"error": str(e)}), 500
@bp.route('/api/email/users', methods=['POST'])
def create_email_user():
"""Create email user."""
try:
from app import email_manager, config_manager
data = request.get_json(silent=True)
if data is None:
return jsonify({"error": "No data provided"}), 400
username = data.get('username')
domain = data.get('domain') or config_manager.get_effective_domain()
password = data.get('password')
if not username or not password:
return jsonify({"error": "Missing required fields: username, password"}), 400
result = email_manager.create_email_user(username, domain, password)
return jsonify({"created": result})
except Exception as e:
logger.error(f"Error creating email user: {e}")
return jsonify({"error": str(e)}), 500
@bp.route('/api/email/users/<username>', methods=['DELETE'])
def delete_email_user(username):
"""Delete email user."""
try:
from app import email_manager, config_manager
domain = request.args.get('domain') or config_manager.get_effective_domain()
result = email_manager.delete_email_user(username, domain)
return jsonify({"deleted": result})
except Exception as e:
logger.error(f"Error deleting email user: {e}")
return jsonify({"error": str(e)}), 500
@bp.route('/api/email/status', methods=['GET'])
def get_email_status():
"""Get email service status."""
try:
from app import email_manager
status = email_manager.get_status()
return jsonify(status)
except Exception as e:
logger.error(f"Error getting email status: {e}")
return jsonify({"error": str(e)}), 500
@bp.route('/api/email/connectivity', methods=['GET'])
def test_email_connectivity():
"""Test email connectivity."""
try:
from app import email_manager
result = email_manager.test_connectivity()
return jsonify(result)
except Exception as e:
logger.error(f"Error testing email connectivity: {e}")
return jsonify({"error": str(e)}), 500
@bp.route('/api/email/send', methods=['POST'])
def send_email():
try:
from app import email_manager
data = request.get_json(silent=True)
if data is None:
return jsonify({"error": "No data provided"}), 400
result = email_manager.send_email(data)
return jsonify(result)
except Exception as e:
logger.error(f"Error sending email: {e}")
return jsonify({"error": str(e)}), 500
@bp.route('/api/email/mailbox/<username>', methods=['GET'])
def get_mailbox_info(username):
"""Get mailbox information."""
try:
from app import email_manager
result = email_manager.get_mailbox_info(username)
return jsonify(result)
except Exception as e:
logger.error(f"Error getting mailbox info: {e}")
return jsonify({"error": str(e)}), 500