Files
pic/api/caddy_manager.py
T
roof a906c26b5d
Unit Tests / test (push) Successful in 11m25s
fix: resolve Caddy env vars at write time to prevent parse errors
acme_ca and the pic_ngo DNS credentials ({$PIC_NGO_DDNS_TOKEN},
{$PIC_NGO_DDNS_API}) were written as Caddy env-var placeholders, but the
Caddy container does not inherit the API container's environment, so the
substitutions always failed — Caddy saw bare directive names with no
arguments and rejected the Caddyfile.

- _global_acme_block: only emit the acme_ca directive when ACME_CA_URL is
  actually set; omitting it makes Caddy default to Let's Encrypt production.
- _caddyfile_pic_ngo: embed the DDNS_TOTP_SECRET and DDNS_URL values directly
  into the Caddyfile at write time rather than relying on Caddy env expansion.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-30 15:01:15 -04:00

509 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Caddy Manager for Personal Internet Cell.
Generates a Caddyfile based on the current identity (domain mode, cell name,
domain) and the list of installed services that contribute reverse-proxy
routes. Uses Caddy's admin API on http://127.0.0.1:2019 to hot-reload the
config without restarting the container.
Domain modes supported:
lan — local-only, internal CA, HTTP + self-signed HTTPS via
/etc/caddy/internal/{cert,key}.pem
pic_ngo — DNS-01 ACME via the pic_ngo Caddy plugin (wildcard cert)
cloudflare — DNS-01 ACME via the cloudflare Caddy plugin (wildcard cert)
duckdns — DNS-01 ACME via the duckdns Caddy plugin
http01 — HTTP-01 ACME (no wildcard); each subdomain gets its own
server block (used by No-IP, FreeDNS, etc.)
For all ACME modes ``acme_ca`` is read from the ``ACME_CA_URL`` env var so
tests / staging can point at Pebble or LE-staging without a code change.
Routes for installed services are inserted before the catch-all ``handle``
in the main server block (or, for ``http01``, written as their own per-host
blocks).
"""
import logging
import os
from typing import Any, Dict, List, Optional
import requests
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
# Live Caddyfile path inside the cell-api container (host path is
# ./config/caddy/Caddyfile, mounted at /app/config-caddy). May be overridden
# in tests via the CADDYFILE_PATH env var.
LIVE_CADDYFILE = os.environ.get('CADDYFILE_PATH', '/app/config-caddy/Caddyfile')
# Caddy admin API base — local to the cell-api container only because Caddy
# binds 2019 on 127.0.0.1. In production the API and Caddy both run with
# host networking via the bridge, so this hostname must be set to the Caddy
# container hostname (or admin enabled cluster-wide). We default to
# localhost to match the dev/test wiring.
CADDY_ADMIN_URL = os.environ.get('CADDY_ADMIN_URL', 'http://cell-caddy:2019')
class CaddyManager(BaseServiceManager):
"""Manages Caddy reverse-proxy configuration and runtime health."""
def __init__(self, config_manager=None,
data_dir: str = '/app/data',
config_dir: str = '/app/config',
service_bus=None,
service_registry=None):
super().__init__('caddy', data_dir, config_dir)
self.config_manager = config_manager
self.container_name = 'cell-caddy'
self.caddyfile_path = LIVE_CADDYFILE
self._service_registry = service_registry
# Consecutive health-check failure counter (reset on success or when
# the caller restarts the container).
self._health_failures = 0
if service_bus is not None:
from service_bus import EventType
service_bus.subscribe_to_event(EventType.IDENTITY_CHANGED, self._on_identity_changed)
# ── BaseServiceManager required ───────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
"""Return basic Caddy status (running + admin-API reachable)."""
healthy = self.check_caddy_health()
return {
'service': self.service_name,
'running': healthy,
'admin_url': CADDY_ADMIN_URL,
'caddyfile_path': self.caddyfile_path,
'consecutive_failures': self._health_failures,
}
def test_connectivity(self) -> Dict[str, Any]:
"""Ping the Caddy admin API."""
ok = self.check_caddy_health()
return {
'success': ok,
'admin_url': CADDY_ADMIN_URL,
}
# ── Caddyfile generation ──────────────────────────────────────────────
def generate_caddyfile(self, identity: Dict[str, Any],
installed_services: List[Dict[str, Any]]) -> str:
"""Generate a complete Caddyfile based on identity and services.
Args:
identity: identity dict from ``ConfigManager.get_identity()``.
Expected keys: ``cell_name``, ``domain_mode``, optional
``custom_domain``, ``acme_email``.
installed_services: list of service dicts; each may have a
``caddy_route`` string with one or more
Caddyfile directives (e.g.
``"handle /calendar* {\\n reverse_proxy ..."``).
Returns:
Caddyfile text.
"""
identity = identity or {}
cell_name = identity.get('cell_name', 'cell')
domain_mode = identity.get('domain_mode', 'lan')
# Aggregate the per-service route snippets that go inside the main
# server block (everything except http01 mode). Each route is
# indented to four spaces to keep the Caddyfile readable.
service_routes = self._collect_service_routes(installed_services)
# Core routes always present in the main server block. Inserted
# *after* installed-service routes so a more specific /api/* on a
# service can never shadow the API itself (no service should use
# /api anyway, but this protects us from misconfigured plugins).
core_routes = (
" handle /api/* {\n"
" reverse_proxy cell-api:3000\n"
" }\n"
" handle {\n"
" reverse_proxy cell-webui:80\n"
" }"
)
if domain_mode == 'lan':
return self._caddyfile_lan(cell_name, service_routes, core_routes)
if domain_mode == 'pic_ngo':
return self._caddyfile_pic_ngo(cell_name, service_routes, core_routes)
if domain_mode == 'cloudflare':
custom_domain = identity.get('domain_name', identity.get('domain', f'{cell_name}.local'))
return self._caddyfile_cloudflare(
custom_domain, service_routes, core_routes
)
if domain_mode == 'duckdns':
return self._caddyfile_duckdns(cell_name, service_routes, core_routes)
if domain_mode == 'http01':
host = identity.get('domain_name', identity.get('domain', f'{cell_name}.noip.me'))
return self._caddyfile_http01(host, installed_services, core_routes)
# Fallback to lan so we always emit a valid Caddyfile.
logger.warning("Unknown domain_mode %r; falling back to 'lan'", domain_mode)
return self._caddyfile_lan(cell_name, service_routes, core_routes)
# ── per-mode generators ───────────────────────────────────────────────
@staticmethod
def _global_acme_block(email: Optional[str]) -> str:
"""Return the ``{ ... }`` global block for an ACME-enabled mode."""
lines = ["{"]
# Bind admin API on all interfaces so cell-api can reach cell-caddy
# across the Docker bridge (default 127.0.0.1 is unreachable cross-container).
lines.append(" admin 0.0.0.0:2019")
if email:
lines.append(f" email {email}")
# Only write acme_ca when a URL is configured — an empty ACME_CA_URL
# causes Caddy to reject the Caddyfile with "wrong argument count".
# When absent, Caddy defaults to Let's Encrypt production.
acme_ca_url = os.environ.get('ACME_CA_URL', '').strip()
if acme_ca_url:
lines.append(f" acme_ca {acme_ca_url}")
lines.append("}")
return "\n".join(lines)
def _build_registry_service_routes(self, domain: str) -> str:
"""Build named-matcher + handle blocks from the service registry.
When no registry is wired or the registry returns nothing, only the
api block is emitted (api is always infrastructure, not delegated to
the registry).
"""
routes: List[Dict] = []
if self._service_registry is not None:
try:
routes = self._service_registry.get_caddy_routes()
except Exception as exc:
logger.warning('_build_registry_service_routes: registry error: %s', exc)
# Pre-seed with reserved names so no registry entry can squat them.
seen_matchers: set = {'api', 'webui'}
blocks: List[str] = []
for route in routes:
primary_sub = route['subdomain']
backend = route['backend']
extra_subs: List[str] = route.get('extra_subdomains') or []
extra_backends: Dict[str, str] = route.get('extra_backends') or {}
if primary_sub in seen_matchers:
logger.warning('Caddy: skipping duplicate/reserved matcher %r', primary_sub)
continue
seen_matchers.add(primary_sub)
# Subdomains that share the primary backend go in one matcher block.
shared = [primary_sub] + [s for s in extra_subs if s not in extra_backends]
host_list = ' '.join(f'{s}.{domain}' for s in shared)
blocks.append(
f' @{primary_sub} host {host_list}\n'
f' handle @{primary_sub} {{\n'
f' reverse_proxy {backend}\n'
f' }}'
)
# Extra subdomains with their own backends each get their own block.
for sub, sub_backend in extra_backends.items():
if sub in seen_matchers:
logger.warning('Caddy: skipping duplicate/reserved matcher %r', sub)
continue
seen_matchers.add(sub)
blocks.append(
f' @{sub} host {sub}.{domain}\n'
f' handle @{sub} {{\n'
f' reverse_proxy {sub_backend}\n'
f' }}'
)
# The api subdomain is always infrastructure — not delegated to the registry.
blocks.append(
f' @api host api.{domain}\n'
f' handle @api {{\n'
f' reverse_proxy cell-api:3000\n'
f' }}'
)
return '\n'.join(blocks)
@staticmethod
def _indent_routes(routes: str, spaces: int = 4) -> str:
"""Indent a multi-line route block by ``spaces`` columns."""
if not routes:
return ""
prefix = " " * spaces
return "\n".join(prefix + line if line.strip() else line
for line in routes.splitlines())
def _collect_service_routes(self,
installed_services: List[Dict[str, Any]]) -> str:
"""Concatenate ``caddy_route`` strings from installed services."""
chunks: List[str] = []
for svc in installed_services or []:
route = (svc or {}).get('caddy_route')
if route:
chunks.append(route.strip("\n"))
return "\n".join(chunks)
def _caddyfile_lan(self, cell_name: str,
service_routes: str, core_routes: str) -> str:
"""LAN mode: HTTP only + internal-CA TLS, no ACME."""
body = []
if service_routes:
body.append(self._indent_routes(service_routes))
body.append(core_routes)
inner = "\n".join(body)
return (
"{\n"
" admin 0.0.0.0:2019\n"
" auto_https off\n"
"}\n"
"\n"
f"http://{cell_name}.cell, http://172.20.0.2:80 {{\n"
" tls /etc/caddy/internal/cert.pem /etc/caddy/internal/key.pem\n"
f"{inner}\n"
"}\n"
)
def _caddyfile_pic_ngo(self, cell_name: str,
service_routes: str, core_routes: str) -> str:
"""pic_ngo mode: wildcard DNS-01 via the pic_ngo plugin."""
domain = f"{cell_name}.pic.ngo"
body = [self._build_registry_service_routes(domain)]
if service_routes:
body.append(self._indent_routes(service_routes))
body.append(core_routes)
inner = "\n".join(body)
email = f"admin@{domain}"
# Resolve credentials at write time — Caddy runs in its own container
# and does not inherit the API's environment variables, so we embed the
# actual values instead of {$VAR} placeholders.
ddns_token = (os.environ.get('DDNS_TOTP_SECRET') or '').strip()
ddns_api = (os.environ.get('DDNS_URL') or 'https://ddns.pic.ngo/api/v1').strip()
return (
f"{self._global_acme_block(email)}\n"
"\n"
f"*.{domain}, {domain} {{\n"
" tls {\n"
" dns pic_ngo {\n"
f" token {ddns_token}\n"
f" api_base_url {ddns_api}\n"
" }\n"
" }\n"
f"{inner}\n"
"}\n"
)
def _caddyfile_cloudflare(self, custom_domain: str,
service_routes: str, core_routes: str) -> str:
"""cloudflare mode: wildcard DNS-01 via the cloudflare plugin."""
body = [self._build_registry_service_routes(custom_domain)]
if service_routes:
body.append(self._indent_routes(service_routes))
body.append(core_routes)
inner = "\n".join(body)
return (
f"{self._global_acme_block('{$ACME_EMAIL}')}\n"
"\n"
f"*.{custom_domain}, {custom_domain} {{\n"
" tls {\n"
" dns cloudflare {$CF_API_TOKEN}\n"
" }\n"
f"{inner}\n"
"}\n"
)
def _caddyfile_duckdns(self, cell_name: str,
service_routes: str, core_routes: str) -> str:
"""duckdns mode: DNS-01 via the duckdns plugin."""
domain = f"{cell_name}.duckdns.org"
body = [self._build_registry_service_routes(domain)]
if service_routes:
body.append(self._indent_routes(service_routes))
body.append(core_routes)
inner = "\n".join(body)
return (
f"{self._global_acme_block(None)}\n"
"\n"
f"*.{domain} {{\n"
" tls {\n"
" dns duckdns {$DUCKDNS_TOKEN}\n"
" }\n"
f"{inner}\n"
"}\n"
)
def _caddyfile_http01(self, host: str,
installed_services: List[Dict[str, Any]],
core_routes: str) -> str:
"""http01 mode: no wildcard. Each service gets its own block."""
# Main host block — only the core routes (api + webui).
out = [self._global_acme_block('{$ACME_EMAIL}'), ""]
out.append(f"{host} {{")
out.append(core_routes)
out.append("}")
# Build (subdomain, backend) pairs from registry when available.
_core_services = self._http01_service_pairs()
for subdomain, backend in _core_services:
out.append("")
out.append(f"{subdomain}.{host} {{")
out.append(f" reverse_proxy {backend}")
out.append("}")
# One block per installed (store plugin) service that has a caddy_route,
# skipping any name that conflicts with a core service.
_core_names = {s for s, _ in _core_services}
for svc in installed_services or []:
if not svc:
continue
route = svc.get('caddy_route')
name = svc.get('name') or svc.get('subdomain')
if not route or not name or name in _core_names:
continue
out.append("")
out.append(f"{name}.{host} {{")
out.append(self._indent_routes(route))
out.append("}")
return "\n".join(out) + "\n"
def _http01_service_pairs(self) -> List[tuple]:
"""Return (subdomain, backend) pairs for http01 per-host blocks."""
pairs: List[tuple] = []
if self._service_registry is not None:
try:
for route in self._service_registry.get_caddy_routes():
pairs.append((route['subdomain'], route['backend']))
extra_subs: List[str] = route.get('extra_subdomains') or []
extra_backends: Dict[str, str] = route.get('extra_backends') or {}
for sub in extra_subs:
backend = extra_backends.get(sub, route['backend'])
pairs.append((sub, backend))
except Exception as exc:
logger.warning('_http01_service_pairs: registry error: %s', exc)
pairs = []
pairs.append(('api', 'cell-api:3000'))
return pairs
# ── filesystem + admin-API operations ─────────────────────────────────
def write_caddyfile(self, caddyfile_content: str) -> bool:
"""Write the Caddyfile and reload Caddy via the admin API.
Writes in-place (same inode) so Docker bind-mounts continue to see
the file. Returns True if both write and reload succeed.
"""
try:
os.makedirs(os.path.dirname(os.path.abspath(self.caddyfile_path)),
exist_ok=True)
except (PermissionError, OSError) as e:
logger.warning("Could not create Caddyfile dir: %s", e)
try:
with open(self.caddyfile_path, 'w') as f:
f.write(caddyfile_content)
f.flush()
try:
os.fsync(f.fileno())
except OSError:
pass
logger.info("Wrote Caddyfile to %s (%d bytes)",
self.caddyfile_path, len(caddyfile_content))
except Exception as e:
logger.error("Failed to write Caddyfile: %s", e)
return False
return self.reload_caddy()
def reload_caddy(self) -> bool:
"""POST the current Caddyfile to the Caddy admin API for a hot reload.
Returns True on HTTP 200, False otherwise.
"""
try:
with open(self.caddyfile_path, 'r') as f:
caddyfile = f.read()
except Exception as e:
logger.error("Cannot read Caddyfile for reload: %s", e)
return False
url = f"{CADDY_ADMIN_URL}/load"
try:
resp = requests.post(
url,
data=caddyfile,
headers={'Content-Type': 'text/caddyfile'},
timeout=10,
)
except requests.RequestException as e:
logger.error("Caddy admin reload failed: %s", e)
return False
if resp.status_code == 200:
logger.info("Caddy reload succeeded (status=200)")
return True
logger.error(
"Caddy reload failed: status=%s body=%s",
resp.status_code, resp.text[:500],
)
return False
def check_caddy_health(self) -> bool:
"""GET the Caddy admin API root. Returns True on HTTP 200."""
try:
resp = requests.get(CADDY_ADMIN_URL + "/", timeout=5)
except requests.RequestException as e:
logger.debug("Caddy health check error: %s", e)
return False
return resp.status_code == 200
# ── consecutive-failure bookkeeping ───────────────────────────────────
def get_health_failure_count(self) -> int:
"""Return the current consecutive failure count."""
return self._health_failures
def increment_health_failure(self) -> int:
"""Increment and return the consecutive failure count."""
self._health_failures += 1
return self._health_failures
def reset_health_failures(self) -> None:
"""Reset the consecutive failure counter to zero."""
self._health_failures = 0
# ── certificate status ────────────────────────────────────────────────
def regenerate_with_installed(self, installed_services: list) -> bool:
"""Regenerate Caddyfile with installed services and reload."""
identity = self.config_manager.get_identity()
content = self.generate_caddyfile(identity, installed_services)
return self.write_caddyfile(content)
def _on_identity_changed(self, event) -> None:
"""Regenerate and reload the Caddyfile when cell identity changes."""
try:
self.regenerate_with_installed([])
except Exception as exc:
self.logger.warning('caddy_manager identity_changed handler failed: %s', exc)
def get_cert_status(self) -> Dict[str, Any]:
"""Return TLS cert status from identity['tls'] if present."""
default = {'status': 'unknown', 'expiry': None, 'days_remaining': None}
if not self.config_manager:
return default
try:
ident = self.config_manager.get_identity() or {}
except Exception as e:
logger.error("get_cert_status: failed to read identity: %s", e)
return default
tls = ident.get('tls') or {}
return {
'status': tls.get('status', 'unknown'),
'expiry': tls.get('expiry'),
'days_remaining': tls.get('days_remaining'),
}