Files
pic/api/caddy_manager.py
T
roof e9077b2633
Unit Tests / test (push) Successful in 7m35s
fix: Caddy health check must hit /config/ not /
GET http://cell-caddy:2019/ returns 404 because Caddy's admin API has no
root handler.  The health monitor interpreted every response as a failure,
restarted Caddy every 3 minutes, and prevented ACME from ever completing.

/config/ returns 200 + the running config JSON whenever Caddy is up and
serving — that is the correct liveness indicator.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-08 15:57:32 -04:00

769 lines
33 KiB
Python

#!/usr/bin/env python3
"""
Caddy Manager for Personal Internet Cell.
Generates a Caddyfile based on the current identity (domain mode, cell name,
domain) and the list of installed services that contribute reverse-proxy
routes. Uses Caddy's admin API on http://127.0.0.1:2019 to hot-reload the
config without restarting the container.
Domain modes supported:
lan — local-only, internal CA, HTTP + self-signed HTTPS via
/etc/caddy/internal/{cert,key}.pem
pic_ngo — DNS-01 ACME via the pic_ngo Caddy plugin (wildcard cert)
cloudflare — DNS-01 ACME via the cloudflare Caddy plugin (wildcard cert)
duckdns — DNS-01 ACME via the duckdns Caddy plugin
http01 — HTTP-01 ACME (no wildcard); each subdomain gets its own
server block (used by No-IP, FreeDNS, etc.)
For all ACME modes ``acme_ca`` is read from the ``ACME_CA_URL`` env var so
tests / staging can point at Pebble or LE-staging without a code change.
Routes for installed services are inserted before the catch-all ``handle``
in the main server block (or, for ``http01``, written as their own per-host
blocks).
"""
import datetime as _dt
import logging
import os
import socket as _socket
import ssl as _ssl
import time as _time
from typing import Any, Dict, List, Optional
import requests
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
# Live Caddyfile path inside the cell-api container (host path is
# ./config/caddy/Caddyfile, mounted at /app/config-caddy). May be overridden
# in tests via the CADDYFILE_PATH env var.
LIVE_CADDYFILE = os.environ.get('CADDYFILE_PATH', '/app/config-caddy/Caddyfile')
# Caddy admin API base — local to the cell-api container only because Caddy
# binds 2019 on 127.0.0.1. In production the API and Caddy both run with
# host networking via the bridge, so this hostname must be set to the Caddy
# container hostname (or admin enabled cluster-wide). We default to
# localhost to match the dev/test wiring.
CADDY_ADMIN_URL = os.environ.get('CADDY_ADMIN_URL', 'http://cell-caddy:2019')
# Directory where the API writes custom TLS cert/key files.
# The Caddy container mounts ./config/caddy → /config/caddy, so files written
# here appear inside the container as /config/caddy/certs/<file>.
CADDY_CERTS_DIR = os.environ.get('CADDY_CERTS_DIR', '/app/config-caddy/certs')
# Paths as seen by the Caddy process (inside the container).
_CADDY_CUSTOM_CERT = '/config/caddy/certs/cert.pem'
_CADDY_CUSTOM_KEY = '/config/caddy/certs/key.pem'
_CADDY_INTERNAL_CERT = '/etc/caddy/internal/cert.pem'
_CADDY_INTERNAL_KEY = '/etc/caddy/internal/key.pem'
class CaddyManager(BaseServiceManager):
"""Manages Caddy reverse-proxy configuration and runtime health."""
def __init__(self, config_manager=None,
data_dir: str = '/app/data',
config_dir: str = '/app/config',
service_bus=None,
service_registry=None):
super().__init__('caddy', data_dir, config_dir)
self.config_manager = config_manager
self.container_name = 'cell-caddy'
self.caddyfile_path = LIVE_CADDYFILE
self._service_registry = service_registry
# Consecutive health-check failure counter (reset on success or when
# the caller restarts the container).
self._health_failures = 0
# Monotonic timestamp of the last successful cert status refresh.
self._cert_refreshed_at: Optional[float] = None
if service_bus is not None:
from service_bus import EventType
service_bus.subscribe_to_event(EventType.IDENTITY_CHANGED, self._on_identity_changed)
# ── BaseServiceManager required ───────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
"""Return basic Caddy status (running + admin-API reachable)."""
healthy = self.check_caddy_health()
return {
'service': self.service_name,
'running': healthy,
'admin_url': CADDY_ADMIN_URL,
'caddyfile_path': self.caddyfile_path,
'consecutive_failures': self._health_failures,
}
def test_connectivity(self) -> Dict[str, Any]:
"""Ping the Caddy admin API."""
ok = self.check_caddy_health()
return {
'success': ok,
'admin_url': CADDY_ADMIN_URL,
}
# ── Caddyfile generation ──────────────────────────────────────────────
def generate_caddyfile(self, identity: Dict[str, Any],
installed_services: List[Dict[str, Any]]) -> str:
"""Generate a complete Caddyfile based on identity and services.
Args:
identity: identity dict from ``ConfigManager.get_identity()``.
Expected keys: ``cell_name``, ``domain_mode``, optional
``custom_domain``, ``acme_email``.
installed_services: list of service dicts; each may have a
``caddy_route`` string with one or more
Caddyfile directives (e.g.
``"handle /calendar* {\\n reverse_proxy ..."``).
Returns:
Caddyfile text.
"""
identity = identity or {}
cell_name = identity.get('cell_name', 'cell')
domain_mode = identity.get('domain_mode', 'lan')
# Aggregate the per-service route snippets that go inside the main
# server block (everything except http01 mode). Each route is
# indented to four spaces to keep the Caddyfile readable.
service_routes = self._collect_service_routes(installed_services)
# Core routes always present in the main server block. Inserted
# *after* installed-service routes so a more specific /api/* on a
# service can never shadow the API itself (no service should use
# /api anyway, but this protects us from misconfigured plugins).
core_routes = (
" handle /api/* {\n"
" reverse_proxy cell-api:3000\n"
" }\n"
" handle {\n"
" reverse_proxy cell-webui:80\n"
" }"
)
if domain_mode == 'lan':
cert_path, key_path = self._tls_cert_pair()
return self._caddyfile_lan(cell_name, service_routes, core_routes,
cert_path, key_path)
if domain_mode == 'pic_ngo':
return self._caddyfile_pic_ngo(cell_name, service_routes, core_routes)
if domain_mode == 'cloudflare':
custom_domain = identity.get('domain_name', identity.get('domain', f'{cell_name}.local'))
return self._caddyfile_cloudflare(
custom_domain, service_routes, core_routes
)
if domain_mode == 'duckdns':
return self._caddyfile_duckdns(cell_name, service_routes, core_routes)
if domain_mode == 'http01':
host = identity.get('domain_name', identity.get('domain', f'{cell_name}.noip.me'))
return self._caddyfile_http01(host, installed_services, core_routes)
# Fallback to lan so we always emit a valid Caddyfile.
logger.warning("Unknown domain_mode %r; falling back to 'lan'", domain_mode)
return self._caddyfile_lan(cell_name, service_routes, core_routes)
# ── per-mode generators ───────────────────────────────────────────────
@staticmethod
def _global_acme_block(email: Optional[str]) -> str:
"""Return the ``{ ... }`` global block for an ACME-enabled mode."""
lines = ["{"]
# Bind admin API on all interfaces so cell-api can reach cell-caddy
# across the Docker bridge (default 127.0.0.1 is unreachable cross-container).
lines.append(" admin 0.0.0.0:2019")
if email:
lines.append(f" email {email}")
# Only write acme_ca when a URL is configured — an empty ACME_CA_URL
# causes Caddy to reject the Caddyfile with "wrong argument count".
# When absent, Caddy defaults to Let's Encrypt production.
acme_ca_url = os.environ.get('ACME_CA_URL', '').strip()
if acme_ca_url:
lines.append(f" acme_ca {acme_ca_url}")
lines.append("}")
return "\n".join(lines)
def _build_registry_service_routes(self, domain: str) -> str:
"""Build named-matcher + handle blocks from the service registry.
When no registry is wired or the registry returns nothing, only the
api block is emitted (api is always infrastructure, not delegated to
the registry).
"""
routes: List[Dict] = []
if self._service_registry is not None:
try:
routes = self._service_registry.get_caddy_routes()
except Exception as exc:
logger.warning('_build_registry_service_routes: registry error: %s', exc)
# Pre-seed with reserved names so no registry entry can squat them.
seen_matchers: set = {'api', 'webui'}
blocks: List[str] = []
for route in routes:
primary_sub = route['subdomain']
backend = route['backend']
extra_subs: List[str] = route.get('extra_subdomains') or []
extra_backends: Dict[str, str] = route.get('extra_backends') or {}
if primary_sub in seen_matchers:
logger.warning('Caddy: skipping duplicate/reserved matcher %r', primary_sub)
continue
seen_matchers.add(primary_sub)
# Subdomains that share the primary backend go in one matcher block.
shared = [primary_sub] + [s for s in extra_subs if s not in extra_backends]
host_list = ' '.join(f'{s}.{domain}' for s in shared)
blocks.append(
f' @{primary_sub} host {host_list}\n'
f' handle @{primary_sub} {{\n'
f' reverse_proxy {backend}\n'
f' }}'
)
# Extra subdomains with their own backends each get their own block.
for sub, sub_backend in extra_backends.items():
if sub in seen_matchers:
logger.warning('Caddy: skipping duplicate/reserved matcher %r', sub)
continue
seen_matchers.add(sub)
blocks.append(
f' @{sub} host {sub}.{domain}\n'
f' handle @{sub} {{\n'
f' reverse_proxy {sub_backend}\n'
f' }}'
)
# The api subdomain is always infrastructure — not delegated to the registry.
blocks.append(
f' @api host api.{domain}\n'
f' handle @api {{\n'
f' reverse_proxy cell-api:3000\n'
f' }}'
)
return '\n'.join(blocks)
@staticmethod
def _indent_routes(routes: str, spaces: int = 4) -> str:
"""Indent a multi-line route block by ``spaces`` columns."""
if not routes:
return ""
prefix = " " * spaces
return "\n".join(prefix + line if line.strip() else line
for line in routes.splitlines())
def _collect_service_routes(self,
installed_services: List[Dict[str, Any]]) -> str:
"""Concatenate ``caddy_route`` strings from installed services."""
chunks: List[str] = []
for svc in installed_services or []:
route = (svc or {}).get('caddy_route')
if route:
chunks.append(route.strip("\n"))
return "\n".join(chunks)
def _tls_cert_pair(self) -> tuple:
"""Return (cert_path, key_path) as seen inside the Caddy container.
Uses the custom-uploaded cert when one is installed, otherwise falls
back to the internal-CA cert that the VaultManager writes.
"""
ident = (self.config_manager.get_identity() if self.config_manager else {}) or {}
if ident.get('tls', {}).get('cert_type') == 'custom':
return _CADDY_CUSTOM_CERT, _CADDY_CUSTOM_KEY
return _CADDY_INTERNAL_CERT, _CADDY_INTERNAL_KEY
def _caddyfile_lan(self, cell_name: str,
service_routes: str, core_routes: str,
cert_path: str = _CADDY_INTERNAL_CERT,
key_path: str = _CADDY_INTERNAL_KEY) -> str:
"""LAN mode: HTTP only + internal-CA TLS, no ACME."""
body = []
if service_routes:
body.append(self._indent_routes(service_routes))
body.append(core_routes)
inner = "\n".join(body)
return (
"{\n"
" admin 0.0.0.0:2019\n"
" auto_https off\n"
"}\n"
"\n"
f"http://{cell_name}.cell, http://172.20.0.2:80 {{\n"
f" tls {cert_path} {key_path}\n"
f"{inner}\n"
"}\n"
)
def _caddyfile_pic_ngo(self, cell_name: str,
service_routes: str, core_routes: str) -> str:
"""pic_ngo mode: wildcard DNS-01 via the pic_ngo plugin."""
domain = f"{cell_name}.pic.ngo"
body = [self._build_registry_service_routes(domain)]
if service_routes:
body.append(self._indent_routes(service_routes))
body.append(core_routes)
inner = "\n".join(body)
email = f"admin@{domain}"
# Resolve credentials at write time — Caddy runs in its own container
# and does not inherit the API's environment variables, so we embed the
# actual values instead of {$VAR} placeholders.
# Use the registration bearer token (ddns.token), NOT the TOTP secret —
# the pic_ngo plugin authenticates to POST /api/v1/dns-challenge with this token.
ddns_cfg = self.config_manager.configs.get('ddns', {})
ddns_token = (ddns_cfg.get('token') or os.environ.get('DDNS_TOKEN') or '').strip()
_raw_api = (os.environ.get('DDNS_URL') or ddns_cfg.get('url') or 'https://ddns.pic.ngo').strip()
# Strip legacy /api/v1 suffix — the pic_ngo plugin appends /api/v1 itself.
ddns_api = _raw_api.rstrip('/').removesuffix('/api/v1')
# No token yet (fresh install, pre-registration) — Caddy would reject a
# bare `token` keyword with no value. Fall back to LAN mode so Caddy
# starts cleanly; the Caddyfile is regenerated once registration completes.
if not ddns_token:
logger.warning(
'pic_ngo mode configured but no DDNS token available; '
'falling back to lan mode until registration completes'
)
cert_path, key_path = self._tls_cert_pair()
return self._caddyfile_lan(cell_name, service_routes, core_routes,
cert_path, key_path)
return (
f"{self._global_acme_block(email)}\n"
"\n"
f"*.{domain}, {domain} {{\n"
" tls {\n"
" dns pic_ngo {\n"
f" token {ddns_token}\n"
f" api_base_url {ddns_api}\n"
" }\n"
" }\n"
f"{inner}\n"
"}\n"
)
def _caddyfile_cloudflare(self, custom_domain: str,
service_routes: str, core_routes: str) -> str:
"""cloudflare mode: wildcard DNS-01 via the cloudflare plugin."""
body = [self._build_registry_service_routes(custom_domain)]
if service_routes:
body.append(self._indent_routes(service_routes))
body.append(core_routes)
inner = "\n".join(body)
return (
f"{self._global_acme_block('{$ACME_EMAIL}')}\n"
"\n"
f"*.{custom_domain}, {custom_domain} {{\n"
" tls {\n"
" dns cloudflare {$CF_API_TOKEN}\n"
" }\n"
f"{inner}\n"
"}\n"
)
def _caddyfile_duckdns(self, cell_name: str,
service_routes: str, core_routes: str) -> str:
"""duckdns mode: DNS-01 via the duckdns plugin."""
domain = f"{cell_name}.duckdns.org"
body = [self._build_registry_service_routes(domain)]
if service_routes:
body.append(self._indent_routes(service_routes))
body.append(core_routes)
inner = "\n".join(body)
return (
f"{self._global_acme_block(None)}\n"
"\n"
f"*.{domain} {{\n"
" tls {\n"
" dns duckdns {$DUCKDNS_TOKEN}\n"
" }\n"
f"{inner}\n"
"}\n"
)
def _caddyfile_http01(self, host: str,
installed_services: List[Dict[str, Any]],
core_routes: str) -> str:
"""http01 mode: no wildcard. Each service gets its own block."""
# Main host block — only the core routes (api + webui).
out = [self._global_acme_block('{$ACME_EMAIL}'), ""]
out.append(f"{host} {{")
out.append(core_routes)
out.append("}")
# Build (subdomain, backend) pairs from registry when available.
_core_services = self._http01_service_pairs()
for subdomain, backend in _core_services:
out.append("")
out.append(f"{subdomain}.{host} {{")
out.append(f" reverse_proxy {backend}")
out.append("}")
# One block per installed (store plugin) service that has a caddy_route,
# skipping any name that conflicts with a core service.
_core_names = {s for s, _ in _core_services}
for svc in installed_services or []:
if not svc:
continue
route = svc.get('caddy_route')
name = svc.get('name') or svc.get('subdomain')
if not route or not name or name in _core_names:
continue
out.append("")
out.append(f"{name}.{host} {{")
out.append(self._indent_routes(route))
out.append("}")
return "\n".join(out) + "\n"
def _http01_service_pairs(self) -> List[tuple]:
"""Return (subdomain, backend) pairs for http01 per-host blocks."""
pairs: List[tuple] = []
if self._service_registry is not None:
try:
for route in self._service_registry.get_caddy_routes():
pairs.append((route['subdomain'], route['backend']))
extra_subs: List[str] = route.get('extra_subdomains') or []
extra_backends: Dict[str, str] = route.get('extra_backends') or {}
for sub in extra_subs:
backend = extra_backends.get(sub, route['backend'])
pairs.append((sub, backend))
except Exception as exc:
logger.warning('_http01_service_pairs: registry error: %s', exc)
pairs = []
pairs.append(('api', 'cell-api:3000'))
return pairs
# ── filesystem + admin-API operations ─────────────────────────────────
def write_caddyfile(self, caddyfile_content: str) -> bool:
"""Write the Caddyfile and reload Caddy via the admin API.
Writes in-place (same inode) so Docker bind-mounts continue to see
the file. Returns True if both write and reload succeed.
"""
try:
os.makedirs(os.path.dirname(os.path.abspath(self.caddyfile_path)),
exist_ok=True)
except (PermissionError, OSError) as e:
logger.warning("Could not create Caddyfile dir: %s", e)
try:
with open(self.caddyfile_path, 'w') as f:
f.write(caddyfile_content)
f.flush()
try:
os.fsync(f.fileno())
except OSError:
pass
logger.info("Wrote Caddyfile to %s (%d bytes)",
self.caddyfile_path, len(caddyfile_content))
except Exception as e:
logger.error("Failed to write Caddyfile: %s", e)
return False
return self.reload_caddy()
def reload_caddy(self) -> bool:
"""POST the current Caddyfile to the Caddy admin API for a hot reload.
Returns True on HTTP 200, False otherwise.
"""
try:
with open(self.caddyfile_path, 'r') as f:
caddyfile = f.read()
except Exception as e:
logger.error("Cannot read Caddyfile for reload: %s", e)
return False
url = f"{CADDY_ADMIN_URL}/load"
try:
resp = requests.post(
url,
data=caddyfile,
headers={'Content-Type': 'text/caddyfile'},
timeout=10,
)
except requests.RequestException as e:
logger.error("Caddy admin reload failed: %s", e)
return False
if resp.status_code == 200:
logger.info("Caddy reload succeeded (status=200)")
return True
logger.error(
"Caddy reload failed: status=%s body=%s",
resp.status_code, resp.text[:500],
)
return False
def check_caddy_health(self) -> bool:
"""GET Caddy's config endpoint. Returns True on HTTP 200.
Caddy's admin API has no root handler — GET / returns 404 even when
fully healthy. GET /config/ returns 200 + the running config JSON
whenever Caddy is up and serving.
"""
try:
resp = requests.get(CADDY_ADMIN_URL + "/config/", timeout=5)
except requests.RequestException as e:
logger.debug("Caddy health check error: %s", e)
return False
return resp.status_code == 200
# ── consecutive-failure bookkeeping ───────────────────────────────────
def get_health_failure_count(self) -> int:
"""Return the current consecutive failure count."""
return self._health_failures
def increment_health_failure(self) -> int:
"""Increment and return the consecutive failure count."""
self._health_failures += 1
return self._health_failures
def reset_health_failures(self) -> None:
"""Reset the consecutive failure counter to zero."""
self._health_failures = 0
# ── certificate status ────────────────────────────────────────────────
def regenerate_with_installed(self, installed_services: list) -> bool:
"""Regenerate Caddyfile with installed services and reload."""
identity = self.config_manager.get_identity()
content = self.generate_caddyfile(identity, installed_services)
return self.write_caddyfile(content)
def _on_identity_changed(self, event) -> None:
"""Regenerate and reload the Caddyfile when cell identity changes."""
try:
self.regenerate_with_installed([])
except Exception as exc:
self.logger.warning('caddy_manager identity_changed handler failed: %s', exc)
# ── Certificate status ────────────────────────────────────────────────
def get_cert_status(self) -> Dict[str, Any]:
"""Return TLS cert status enriched with identity context (cached)."""
ident: Dict[str, Any] = {}
if self.config_manager:
try:
ident = self.config_manager.get_identity() or {}
except Exception as e:
logger.error("get_cert_status: failed to read identity: %s", e)
domain_mode = ident.get('domain_mode', 'lan')
tls = ident.get('tls') or {}
cert_type = tls.get('cert_type', 'custom' if tls.get('cert_type') == 'custom'
else ('internal' if domain_mode == 'lan' else 'acme'))
return {
'status': tls.get('status', 'unknown'),
'expiry': tls.get('expiry'),
'days_remaining': tls.get('days_remaining'),
'domain': self._domain_label(ident),
'domain_mode': domain_mode,
'cert_type': cert_type,
}
@staticmethod
def _domain_label(ident: Dict[str, Any]) -> Optional[str]:
"""Return a human-readable domain string for display in the UI."""
mode = ident.get('domain_mode', 'lan')
cell = ident.get('cell_name', '')
if mode == 'pic_ngo':
return f'*.{cell}.pic.ngo' if cell else None
if mode == 'cloudflare':
d = ident.get('domain_name') or ident.get('domain', '')
return f'*.{d}' if d else None
if mode == 'duckdns':
return f'*.{cell}.duckdns.org' if cell else None
if mode == 'http01':
return ident.get('domain_name') or ident.get('domain')
return None # lan
def get_cert_status_fresh(self, max_age_seconds: int = 300) -> Dict[str, Any]:
"""Return cert status, refreshing if the cached value is older than max_age_seconds."""
now = _time.monotonic()
if self._cert_refreshed_at is None or (now - self._cert_refreshed_at) > max_age_seconds:
self.refresh_cert_status()
return self.get_cert_status()
def refresh_cert_status(self) -> Dict[str, Any]:
"""Check TLS cert expiry via SSL and persist to identity['tls'].
For LAN mode (no ACME): immediately returns {'status': 'internal'}.
For ACME modes: opens an SSL connection to Caddy on port 443 and
reads the cert expiry from the TLS handshake. On any error (cert
not yet issued, network unreachable): returns {'status': 'unknown'}.
"""
identity = self.config_manager.get_identity() if self.config_manager else {}
domain_mode = (identity or {}).get('domain_mode', 'lan')
if domain_mode == 'lan':
status: Dict[str, Any] = {'status': 'internal', 'expiry': None, 'days_remaining': None}
else:
caddy_host = os.environ.get('CADDY_CERT_HOST', 'cell-caddy')
caddy_port = int(os.environ.get('CADDY_HTTPS_PORT', '443'))
result = self._check_cert_via_ssl(caddy_host, caddy_port)
status = result if result is not None else {
'status': 'unknown', 'expiry': None, 'days_remaining': None
}
if self.config_manager:
try:
self.config_manager.set_identity_field('tls', status)
except Exception as exc:
logger.warning('refresh_cert_status: failed to persist tls status: %s', exc)
self._cert_refreshed_at = _time.monotonic()
return status
@staticmethod
def _check_cert_via_ssl(hostname: str, port: int = 443) -> Optional[Dict[str, Any]]:
"""Open an SSL connection and return cert expiry info, or None on failure."""
ctx = _ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = _ssl.CERT_NONE
try:
with _socket.create_connection((hostname, port), timeout=5) as raw:
with ctx.wrap_socket(raw, server_hostname=hostname) as tls:
der = tls.getpeercert(binary_form=True)
if not der:
return None
from cryptography import x509
from cryptography.hazmat.backends import default_backend
cert = x509.load_der_x509_certificate(der, default_backend())
# Use not_valid_after_utc (cryptography ≥42) with fallback for older builds.
try:
expiry = cert.not_valid_after_utc
except AttributeError:
expiry = cert.not_valid_after.replace(tzinfo=_dt.timezone.utc) # type: ignore[attr-defined]
now = _dt.datetime.now(_dt.timezone.utc)
days = (expiry - now).days
return {
'status': 'valid' if days > 0 else 'expired',
'expiry': expiry.isoformat(),
'days_remaining': days,
}
except Exception:
return None
# ── Active cert management ────────────────────────────────────────────
def renew_cert(self) -> Dict[str, Any]:
"""Regenerate the Caddyfile, reload Caddy, and trigger ACME cert renewal.
Regenerates first so a stale or broken on-disk Caddyfile never blocks
the reload. Returns immediately with status='pending'; the caller
polls GET /api/caddy/cert-status to track progress. Not applicable
to LAN mode — callers should use upload_custom_cert() instead.
"""
ident = (self.config_manager.get_identity() if self.config_manager else {}) or {}
domain_mode = ident.get('domain_mode', 'lan')
if domain_mode == 'lan':
return {
'ok': False,
'error': 'ACME renewal is not available in LAN mode. '
'Upload a custom certificate instead.',
}
# Regenerate → write → reload in one shot so the Caddyfile is always fresh.
if self.config_manager:
try:
ok = self.regenerate_with_installed([])
except Exception as exc:
logger.error('renew_cert: regenerate_with_installed failed: %s', exc)
ok = False
else:
ok = self.reload_caddy()
if not ok:
return {'ok': False, 'error': 'Caddy reload failed — check Caddy logs.'}
# Invalidate the cached status so the next poll triggers a fresh SSL check.
self._cert_refreshed_at = None
return {
'ok': True,
'status': 'pending',
'message': 'Renewal triggered. Certificate status will update within 60 s.',
}
def upload_custom_cert(self, cert_pem: str, key_pem: str) -> Dict[str, Any]:
"""Validate and install a custom TLS certificate.
Writes cert+key to the shared certs directory (visible to Caddy),
regenerates the Caddyfile to reference the new paths, and reloads.
Works for all domain modes — use this when you have a certificate
issued by your own CA or a commercial provider.
"""
cert_info = self._parse_pem_cert(cert_pem)
if cert_info is None:
return {'ok': False, 'error': 'Invalid certificate: could not parse PEM.'}
if not self._validate_key_pem(key_pem):
return {'ok': False, 'error': 'Invalid private key: expected PEM with PRIVATE KEY header.'}
try:
os.makedirs(CADDY_CERTS_DIR, exist_ok=True)
with open(os.path.join(CADDY_CERTS_DIR, 'cert.pem'), 'w') as fh:
fh.write(cert_pem)
with open(os.path.join(CADDY_CERTS_DIR, 'key.pem'), 'w') as fh:
fh.write(key_pem)
except OSError as exc:
logger.error('upload_custom_cert: write failed: %s', exc)
return {'ok': False, 'error': f'Failed to write cert files: {exc}'}
days = cert_info.get('days_remaining', 0)
tls_info: Dict[str, Any] = {
'status': 'valid' if days > 0 else 'expired',
'expiry': cert_info.get('expiry'),
'days_remaining': days,
'cert_type': 'custom',
}
if self.config_manager:
try:
self.config_manager.set_identity_field('tls', tls_info)
except Exception as exc:
logger.warning('upload_custom_cert: could not persist tls info: %s', exc)
# Regenerate Caddyfile so the tls directive references the new cert.
if self.config_manager:
try:
self.regenerate_with_installed([])
except Exception as exc:
logger.warning('upload_custom_cert: Caddyfile regeneration failed: %s', exc)
return {'ok': True, **tls_info}
@staticmethod
def _parse_pem_cert(cert_pem: str) -> Optional[Dict[str, Any]]:
"""Parse a PEM certificate and return expiry metadata, or None on error."""
try:
from cryptography import x509
cert_bytes = cert_pem.encode() if isinstance(cert_pem, str) else cert_pem
cert = x509.load_pem_x509_certificate(cert_bytes)
try:
expiry = cert.not_valid_after_utc
except AttributeError:
expiry = cert.not_valid_after.replace(tzinfo=_dt.timezone.utc) # type: ignore[attr-defined]
now = _dt.datetime.now(_dt.timezone.utc)
days = (expiry - now).days
return {
'expiry': expiry.isoformat(),
'days_remaining': days,
'subject': cert.subject.rfc4514_string(),
}
except Exception as exc:
logger.debug('_parse_pem_cert failed: %s', exc)
return None
@staticmethod
def _validate_key_pem(key_pem: str) -> bool:
"""Return True if key_pem contains a PEM-encoded private key block."""
return ('-----BEGIN' in key_pem
and 'PRIVATE KEY' in key_pem
and '-----END' in key_pem)