85d265187d
Unit Tests / test (push) Successful in 7m32s
1. caddy_manager: embed ddns.token (registration bearer token) in Caddyfile, not DDNS_TOTP_SECRET. The pic_ngo plugin sends the token to POST /api/v1/dns-challenge; using the TOTP secret caused 401 on every attempt. 2. firewall_manager: add _acme-challenge.<zone> forwarding block before each split-horizon zone in the Corefile. Without this, CoreDNS was authoritative for the challenge name and returned NODATA for TXT queries (wildcard A record matches but wrong type), blocking Caddy's internal DNS pre-verification step. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
587 lines
25 KiB
Python
587 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Caddy Manager for Personal Internet Cell.
|
|
|
|
Generates a Caddyfile based on the current identity (domain mode, cell name,
|
|
domain) and the list of installed services that contribute reverse-proxy
|
|
routes. Uses Caddy's admin API on http://127.0.0.1:2019 to hot-reload the
|
|
config without restarting the container.
|
|
|
|
Domain modes supported:
|
|
lan — local-only, internal CA, HTTP + self-signed HTTPS via
|
|
/etc/caddy/internal/{cert,key}.pem
|
|
pic_ngo — DNS-01 ACME via the pic_ngo Caddy plugin (wildcard cert)
|
|
cloudflare — DNS-01 ACME via the cloudflare Caddy plugin (wildcard cert)
|
|
duckdns — DNS-01 ACME via the duckdns Caddy plugin
|
|
http01 — HTTP-01 ACME (no wildcard); each subdomain gets its own
|
|
server block (used by No-IP, FreeDNS, etc.)
|
|
|
|
For all ACME modes ``acme_ca`` is read from the ``ACME_CA_URL`` env var so
|
|
tests / staging can point at Pebble or LE-staging without a code change.
|
|
Routes for installed services are inserted before the catch-all ``handle``
|
|
in the main server block (or, for ``http01``, written as their own per-host
|
|
blocks).
|
|
"""
|
|
|
|
import datetime as _dt
|
|
import logging
|
|
import os
|
|
import socket as _socket
|
|
import ssl as _ssl
|
|
import time as _time
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import requests
|
|
|
|
from base_service_manager import BaseServiceManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Live Caddyfile path inside the cell-api container (host path is
|
|
# ./config/caddy/Caddyfile, mounted at /app/config-caddy). May be overridden
|
|
# in tests via the CADDYFILE_PATH env var.
|
|
LIVE_CADDYFILE = os.environ.get('CADDYFILE_PATH', '/app/config-caddy/Caddyfile')
|
|
|
|
# Caddy admin API base — local to the cell-api container only because Caddy
|
|
# binds 2019 on 127.0.0.1. In production the API and Caddy both run with
|
|
# host networking via the bridge, so this hostname must be set to the Caddy
|
|
# container hostname (or admin enabled cluster-wide). We default to
|
|
# localhost to match the dev/test wiring.
|
|
CADDY_ADMIN_URL = os.environ.get('CADDY_ADMIN_URL', 'http://cell-caddy:2019')
|
|
|
|
|
|
class CaddyManager(BaseServiceManager):
|
|
"""Manages Caddy reverse-proxy configuration and runtime health."""
|
|
|
|
def __init__(self, config_manager=None,
|
|
data_dir: str = '/app/data',
|
|
config_dir: str = '/app/config',
|
|
service_bus=None,
|
|
service_registry=None):
|
|
super().__init__('caddy', data_dir, config_dir)
|
|
self.config_manager = config_manager
|
|
self.container_name = 'cell-caddy'
|
|
self.caddyfile_path = LIVE_CADDYFILE
|
|
self._service_registry = service_registry
|
|
# Consecutive health-check failure counter (reset on success or when
|
|
# the caller restarts the container).
|
|
self._health_failures = 0
|
|
# Monotonic timestamp of the last successful cert status refresh.
|
|
self._cert_refreshed_at: Optional[float] = None
|
|
|
|
if service_bus is not None:
|
|
from service_bus import EventType
|
|
service_bus.subscribe_to_event(EventType.IDENTITY_CHANGED, self._on_identity_changed)
|
|
|
|
# ── BaseServiceManager required ───────────────────────────────────────
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Return basic Caddy status (running + admin-API reachable)."""
|
|
healthy = self.check_caddy_health()
|
|
return {
|
|
'service': self.service_name,
|
|
'running': healthy,
|
|
'admin_url': CADDY_ADMIN_URL,
|
|
'caddyfile_path': self.caddyfile_path,
|
|
'consecutive_failures': self._health_failures,
|
|
}
|
|
|
|
def test_connectivity(self) -> Dict[str, Any]:
|
|
"""Ping the Caddy admin API."""
|
|
ok = self.check_caddy_health()
|
|
return {
|
|
'success': ok,
|
|
'admin_url': CADDY_ADMIN_URL,
|
|
}
|
|
|
|
# ── Caddyfile generation ──────────────────────────────────────────────
|
|
|
|
def generate_caddyfile(self, identity: Dict[str, Any],
|
|
installed_services: List[Dict[str, Any]]) -> str:
|
|
"""Generate a complete Caddyfile based on identity and services.
|
|
|
|
Args:
|
|
identity: identity dict from ``ConfigManager.get_identity()``.
|
|
Expected keys: ``cell_name``, ``domain_mode``, optional
|
|
``custom_domain``, ``acme_email``.
|
|
installed_services: list of service dicts; each may have a
|
|
``caddy_route`` string with one or more
|
|
Caddyfile directives (e.g.
|
|
``"handle /calendar* {\\n reverse_proxy ..."``).
|
|
|
|
Returns:
|
|
Caddyfile text.
|
|
"""
|
|
identity = identity or {}
|
|
cell_name = identity.get('cell_name', 'cell')
|
|
domain_mode = identity.get('domain_mode', 'lan')
|
|
|
|
# Aggregate the per-service route snippets that go inside the main
|
|
# server block (everything except http01 mode). Each route is
|
|
# indented to four spaces to keep the Caddyfile readable.
|
|
service_routes = self._collect_service_routes(installed_services)
|
|
|
|
# Core routes always present in the main server block. Inserted
|
|
# *after* installed-service routes so a more specific /api/* on a
|
|
# service can never shadow the API itself (no service should use
|
|
# /api anyway, but this protects us from misconfigured plugins).
|
|
core_routes = (
|
|
" handle /api/* {\n"
|
|
" reverse_proxy cell-api:3000\n"
|
|
" }\n"
|
|
" handle {\n"
|
|
" reverse_proxy cell-webui:80\n"
|
|
" }"
|
|
)
|
|
|
|
if domain_mode == 'lan':
|
|
return self._caddyfile_lan(cell_name, service_routes, core_routes)
|
|
if domain_mode == 'pic_ngo':
|
|
return self._caddyfile_pic_ngo(cell_name, service_routes, core_routes)
|
|
if domain_mode == 'cloudflare':
|
|
custom_domain = identity.get('domain_name', identity.get('domain', f'{cell_name}.local'))
|
|
return self._caddyfile_cloudflare(
|
|
custom_domain, service_routes, core_routes
|
|
)
|
|
if domain_mode == 'duckdns':
|
|
return self._caddyfile_duckdns(cell_name, service_routes, core_routes)
|
|
if domain_mode == 'http01':
|
|
host = identity.get('domain_name', identity.get('domain', f'{cell_name}.noip.me'))
|
|
return self._caddyfile_http01(host, installed_services, core_routes)
|
|
|
|
# Fallback to lan so we always emit a valid Caddyfile.
|
|
logger.warning("Unknown domain_mode %r; falling back to 'lan'", domain_mode)
|
|
return self._caddyfile_lan(cell_name, service_routes, core_routes)
|
|
|
|
# ── per-mode generators ───────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
def _global_acme_block(email: Optional[str]) -> str:
|
|
"""Return the ``{ ... }`` global block for an ACME-enabled mode."""
|
|
lines = ["{"]
|
|
# Bind admin API on all interfaces so cell-api can reach cell-caddy
|
|
# across the Docker bridge (default 127.0.0.1 is unreachable cross-container).
|
|
lines.append(" admin 0.0.0.0:2019")
|
|
if email:
|
|
lines.append(f" email {email}")
|
|
# Only write acme_ca when a URL is configured — an empty ACME_CA_URL
|
|
# causes Caddy to reject the Caddyfile with "wrong argument count".
|
|
# When absent, Caddy defaults to Let's Encrypt production.
|
|
acme_ca_url = os.environ.get('ACME_CA_URL', '').strip()
|
|
if acme_ca_url:
|
|
lines.append(f" acme_ca {acme_ca_url}")
|
|
lines.append("}")
|
|
return "\n".join(lines)
|
|
|
|
def _build_registry_service_routes(self, domain: str) -> str:
|
|
"""Build named-matcher + handle blocks from the service registry.
|
|
|
|
When no registry is wired or the registry returns nothing, only the
|
|
api block is emitted (api is always infrastructure, not delegated to
|
|
the registry).
|
|
"""
|
|
routes: List[Dict] = []
|
|
if self._service_registry is not None:
|
|
try:
|
|
routes = self._service_registry.get_caddy_routes()
|
|
except Exception as exc:
|
|
logger.warning('_build_registry_service_routes: registry error: %s', exc)
|
|
|
|
# Pre-seed with reserved names so no registry entry can squat them.
|
|
seen_matchers: set = {'api', 'webui'}
|
|
|
|
blocks: List[str] = []
|
|
for route in routes:
|
|
primary_sub = route['subdomain']
|
|
backend = route['backend']
|
|
extra_subs: List[str] = route.get('extra_subdomains') or []
|
|
extra_backends: Dict[str, str] = route.get('extra_backends') or {}
|
|
|
|
if primary_sub in seen_matchers:
|
|
logger.warning('Caddy: skipping duplicate/reserved matcher %r', primary_sub)
|
|
continue
|
|
seen_matchers.add(primary_sub)
|
|
|
|
# Subdomains that share the primary backend go in one matcher block.
|
|
shared = [primary_sub] + [s for s in extra_subs if s not in extra_backends]
|
|
host_list = ' '.join(f'{s}.{domain}' for s in shared)
|
|
blocks.append(
|
|
f' @{primary_sub} host {host_list}\n'
|
|
f' handle @{primary_sub} {{\n'
|
|
f' reverse_proxy {backend}\n'
|
|
f' }}'
|
|
)
|
|
# Extra subdomains with their own backends each get their own block.
|
|
for sub, sub_backend in extra_backends.items():
|
|
if sub in seen_matchers:
|
|
logger.warning('Caddy: skipping duplicate/reserved matcher %r', sub)
|
|
continue
|
|
seen_matchers.add(sub)
|
|
blocks.append(
|
|
f' @{sub} host {sub}.{domain}\n'
|
|
f' handle @{sub} {{\n'
|
|
f' reverse_proxy {sub_backend}\n'
|
|
f' }}'
|
|
)
|
|
|
|
# The api subdomain is always infrastructure — not delegated to the registry.
|
|
blocks.append(
|
|
f' @api host api.{domain}\n'
|
|
f' handle @api {{\n'
|
|
f' reverse_proxy cell-api:3000\n'
|
|
f' }}'
|
|
)
|
|
return '\n'.join(blocks)
|
|
|
|
@staticmethod
|
|
def _indent_routes(routes: str, spaces: int = 4) -> str:
|
|
"""Indent a multi-line route block by ``spaces`` columns."""
|
|
if not routes:
|
|
return ""
|
|
prefix = " " * spaces
|
|
return "\n".join(prefix + line if line.strip() else line
|
|
for line in routes.splitlines())
|
|
|
|
def _collect_service_routes(self,
|
|
installed_services: List[Dict[str, Any]]) -> str:
|
|
"""Concatenate ``caddy_route`` strings from installed services."""
|
|
chunks: List[str] = []
|
|
for svc in installed_services or []:
|
|
route = (svc or {}).get('caddy_route')
|
|
if route:
|
|
chunks.append(route.strip("\n"))
|
|
return "\n".join(chunks)
|
|
|
|
def _caddyfile_lan(self, cell_name: str,
|
|
service_routes: str, core_routes: str) -> str:
|
|
"""LAN mode: HTTP only + internal-CA TLS, no ACME."""
|
|
body = []
|
|
if service_routes:
|
|
body.append(self._indent_routes(service_routes))
|
|
body.append(core_routes)
|
|
inner = "\n".join(body)
|
|
return (
|
|
"{\n"
|
|
" admin 0.0.0.0:2019\n"
|
|
" auto_https off\n"
|
|
"}\n"
|
|
"\n"
|
|
f"http://{cell_name}.cell, http://172.20.0.2:80 {{\n"
|
|
" tls /etc/caddy/internal/cert.pem /etc/caddy/internal/key.pem\n"
|
|
f"{inner}\n"
|
|
"}\n"
|
|
)
|
|
|
|
def _caddyfile_pic_ngo(self, cell_name: str,
|
|
service_routes: str, core_routes: str) -> str:
|
|
"""pic_ngo mode: wildcard DNS-01 via the pic_ngo plugin."""
|
|
domain = f"{cell_name}.pic.ngo"
|
|
body = [self._build_registry_service_routes(domain)]
|
|
if service_routes:
|
|
body.append(self._indent_routes(service_routes))
|
|
body.append(core_routes)
|
|
inner = "\n".join(body)
|
|
email = f"admin@{domain}"
|
|
|
|
# Resolve credentials at write time — Caddy runs in its own container
|
|
# and does not inherit the API's environment variables, so we embed the
|
|
# actual values instead of {$VAR} placeholders.
|
|
# Use the registration bearer token (ddns.token), NOT the TOTP secret —
|
|
# the pic_ngo plugin authenticates to POST /api/v1/dns-challenge with this token.
|
|
ddns_cfg = self.config_manager.configs.get('ddns', {})
|
|
ddns_token = (ddns_cfg.get('token') or os.environ.get('DDNS_TOKEN') or '').strip()
|
|
ddns_api = (os.environ.get('DDNS_URL') or ddns_cfg.get('url') or 'https://ddns.pic.ngo/api/v1').strip()
|
|
|
|
return (
|
|
f"{self._global_acme_block(email)}\n"
|
|
"\n"
|
|
f"*.{domain}, {domain} {{\n"
|
|
" tls {\n"
|
|
" dns pic_ngo {\n"
|
|
f" token {ddns_token}\n"
|
|
f" api_base_url {ddns_api}\n"
|
|
" }\n"
|
|
" }\n"
|
|
f"{inner}\n"
|
|
"}\n"
|
|
)
|
|
|
|
def _caddyfile_cloudflare(self, custom_domain: str,
|
|
service_routes: str, core_routes: str) -> str:
|
|
"""cloudflare mode: wildcard DNS-01 via the cloudflare plugin."""
|
|
body = [self._build_registry_service_routes(custom_domain)]
|
|
if service_routes:
|
|
body.append(self._indent_routes(service_routes))
|
|
body.append(core_routes)
|
|
inner = "\n".join(body)
|
|
return (
|
|
f"{self._global_acme_block('{$ACME_EMAIL}')}\n"
|
|
"\n"
|
|
f"*.{custom_domain}, {custom_domain} {{\n"
|
|
" tls {\n"
|
|
" dns cloudflare {$CF_API_TOKEN}\n"
|
|
" }\n"
|
|
f"{inner}\n"
|
|
"}\n"
|
|
)
|
|
|
|
def _caddyfile_duckdns(self, cell_name: str,
|
|
service_routes: str, core_routes: str) -> str:
|
|
"""duckdns mode: DNS-01 via the duckdns plugin."""
|
|
domain = f"{cell_name}.duckdns.org"
|
|
body = [self._build_registry_service_routes(domain)]
|
|
if service_routes:
|
|
body.append(self._indent_routes(service_routes))
|
|
body.append(core_routes)
|
|
inner = "\n".join(body)
|
|
return (
|
|
f"{self._global_acme_block(None)}\n"
|
|
"\n"
|
|
f"*.{domain} {{\n"
|
|
" tls {\n"
|
|
" dns duckdns {$DUCKDNS_TOKEN}\n"
|
|
" }\n"
|
|
f"{inner}\n"
|
|
"}\n"
|
|
)
|
|
|
|
def _caddyfile_http01(self, host: str,
|
|
installed_services: List[Dict[str, Any]],
|
|
core_routes: str) -> str:
|
|
"""http01 mode: no wildcard. Each service gets its own block."""
|
|
# Main host block — only the core routes (api + webui).
|
|
out = [self._global_acme_block('{$ACME_EMAIL}'), ""]
|
|
out.append(f"{host} {{")
|
|
out.append(core_routes)
|
|
out.append("}")
|
|
|
|
# Build (subdomain, backend) pairs from registry when available.
|
|
_core_services = self._http01_service_pairs()
|
|
for subdomain, backend in _core_services:
|
|
out.append("")
|
|
out.append(f"{subdomain}.{host} {{")
|
|
out.append(f" reverse_proxy {backend}")
|
|
out.append("}")
|
|
|
|
# One block per installed (store plugin) service that has a caddy_route,
|
|
# skipping any name that conflicts with a core service.
|
|
_core_names = {s for s, _ in _core_services}
|
|
for svc in installed_services or []:
|
|
if not svc:
|
|
continue
|
|
route = svc.get('caddy_route')
|
|
name = svc.get('name') or svc.get('subdomain')
|
|
if not route or not name or name in _core_names:
|
|
continue
|
|
out.append("")
|
|
out.append(f"{name}.{host} {{")
|
|
out.append(self._indent_routes(route))
|
|
out.append("}")
|
|
return "\n".join(out) + "\n"
|
|
|
|
def _http01_service_pairs(self) -> List[tuple]:
|
|
"""Return (subdomain, backend) pairs for http01 per-host blocks."""
|
|
pairs: List[tuple] = []
|
|
if self._service_registry is not None:
|
|
try:
|
|
for route in self._service_registry.get_caddy_routes():
|
|
pairs.append((route['subdomain'], route['backend']))
|
|
extra_subs: List[str] = route.get('extra_subdomains') or []
|
|
extra_backends: Dict[str, str] = route.get('extra_backends') or {}
|
|
for sub in extra_subs:
|
|
backend = extra_backends.get(sub, route['backend'])
|
|
pairs.append((sub, backend))
|
|
except Exception as exc:
|
|
logger.warning('_http01_service_pairs: registry error: %s', exc)
|
|
pairs = []
|
|
pairs.append(('api', 'cell-api:3000'))
|
|
return pairs
|
|
|
|
# ── filesystem + admin-API operations ─────────────────────────────────
|
|
|
|
def write_caddyfile(self, caddyfile_content: str) -> bool:
|
|
"""Write the Caddyfile and reload Caddy via the admin API.
|
|
|
|
Writes in-place (same inode) so Docker bind-mounts continue to see
|
|
the file. Returns True if both write and reload succeed.
|
|
"""
|
|
try:
|
|
os.makedirs(os.path.dirname(os.path.abspath(self.caddyfile_path)),
|
|
exist_ok=True)
|
|
except (PermissionError, OSError) as e:
|
|
logger.warning("Could not create Caddyfile dir: %s", e)
|
|
|
|
try:
|
|
with open(self.caddyfile_path, 'w') as f:
|
|
f.write(caddyfile_content)
|
|
f.flush()
|
|
try:
|
|
os.fsync(f.fileno())
|
|
except OSError:
|
|
pass
|
|
logger.info("Wrote Caddyfile to %s (%d bytes)",
|
|
self.caddyfile_path, len(caddyfile_content))
|
|
except Exception as e:
|
|
logger.error("Failed to write Caddyfile: %s", e)
|
|
return False
|
|
|
|
return self.reload_caddy()
|
|
|
|
def reload_caddy(self) -> bool:
|
|
"""POST the current Caddyfile to the Caddy admin API for a hot reload.
|
|
|
|
Returns True on HTTP 200, False otherwise.
|
|
"""
|
|
try:
|
|
with open(self.caddyfile_path, 'r') as f:
|
|
caddyfile = f.read()
|
|
except Exception as e:
|
|
logger.error("Cannot read Caddyfile for reload: %s", e)
|
|
return False
|
|
|
|
url = f"{CADDY_ADMIN_URL}/load"
|
|
try:
|
|
resp = requests.post(
|
|
url,
|
|
data=caddyfile,
|
|
headers={'Content-Type': 'text/caddyfile'},
|
|
timeout=10,
|
|
)
|
|
except requests.RequestException as e:
|
|
logger.error("Caddy admin reload failed: %s", e)
|
|
return False
|
|
|
|
if resp.status_code == 200:
|
|
logger.info("Caddy reload succeeded (status=200)")
|
|
return True
|
|
logger.error(
|
|
"Caddy reload failed: status=%s body=%s",
|
|
resp.status_code, resp.text[:500],
|
|
)
|
|
return False
|
|
|
|
def check_caddy_health(self) -> bool:
|
|
"""GET the Caddy admin API root. Returns True on HTTP 200."""
|
|
try:
|
|
resp = requests.get(CADDY_ADMIN_URL + "/", timeout=5)
|
|
except requests.RequestException as e:
|
|
logger.debug("Caddy health check error: %s", e)
|
|
return False
|
|
return resp.status_code == 200
|
|
|
|
# ── consecutive-failure bookkeeping ───────────────────────────────────
|
|
|
|
def get_health_failure_count(self) -> int:
|
|
"""Return the current consecutive failure count."""
|
|
return self._health_failures
|
|
|
|
def increment_health_failure(self) -> int:
|
|
"""Increment and return the consecutive failure count."""
|
|
self._health_failures += 1
|
|
return self._health_failures
|
|
|
|
def reset_health_failures(self) -> None:
|
|
"""Reset the consecutive failure counter to zero."""
|
|
self._health_failures = 0
|
|
|
|
# ── certificate status ────────────────────────────────────────────────
|
|
|
|
def regenerate_with_installed(self, installed_services: list) -> bool:
|
|
"""Regenerate Caddyfile with installed services and reload."""
|
|
identity = self.config_manager.get_identity()
|
|
content = self.generate_caddyfile(identity, installed_services)
|
|
return self.write_caddyfile(content)
|
|
|
|
def _on_identity_changed(self, event) -> None:
|
|
"""Regenerate and reload the Caddyfile when cell identity changes."""
|
|
try:
|
|
self.regenerate_with_installed([])
|
|
except Exception as exc:
|
|
self.logger.warning('caddy_manager identity_changed handler failed: %s', exc)
|
|
|
|
# ── Certificate status ────────────────────────────────────────────────
|
|
|
|
def get_cert_status(self) -> Dict[str, Any]:
|
|
"""Return TLS cert status from identity['tls'] if present (cached)."""
|
|
default = {'status': 'unknown', 'expiry': None, 'days_remaining': None}
|
|
if not self.config_manager:
|
|
return default
|
|
try:
|
|
ident = self.config_manager.get_identity() or {}
|
|
except Exception as e:
|
|
logger.error("get_cert_status: failed to read identity: %s", e)
|
|
return default
|
|
tls = ident.get('tls') or {}
|
|
return {
|
|
'status': tls.get('status', 'unknown'),
|
|
'expiry': tls.get('expiry'),
|
|
'days_remaining': tls.get('days_remaining'),
|
|
}
|
|
|
|
def get_cert_status_fresh(self, max_age_seconds: int = 300) -> Dict[str, Any]:
|
|
"""Return cert status, refreshing if the cached value is older than max_age_seconds."""
|
|
now = _time.monotonic()
|
|
if self._cert_refreshed_at is None or (now - self._cert_refreshed_at) > max_age_seconds:
|
|
self.refresh_cert_status()
|
|
return self.get_cert_status()
|
|
|
|
def refresh_cert_status(self) -> Dict[str, Any]:
|
|
"""Check TLS cert expiry via SSL and persist to identity['tls'].
|
|
|
|
For LAN mode (no ACME): immediately returns {'status': 'internal'}.
|
|
For ACME modes: opens an SSL connection to Caddy on port 443 and
|
|
reads the cert expiry from the TLS handshake. On any error (cert
|
|
not yet issued, network unreachable): returns {'status': 'unknown'}.
|
|
"""
|
|
identity = self.config_manager.get_identity() if self.config_manager else {}
|
|
domain_mode = (identity or {}).get('domain_mode', 'lan')
|
|
|
|
if domain_mode == 'lan':
|
|
status: Dict[str, Any] = {'status': 'internal', 'expiry': None, 'days_remaining': None}
|
|
else:
|
|
caddy_host = os.environ.get('CADDY_CERT_HOST', 'cell-caddy')
|
|
caddy_port = int(os.environ.get('CADDY_HTTPS_PORT', '443'))
|
|
result = self._check_cert_via_ssl(caddy_host, caddy_port)
|
|
status = result if result is not None else {
|
|
'status': 'unknown', 'expiry': None, 'days_remaining': None
|
|
}
|
|
|
|
if self.config_manager:
|
|
try:
|
|
self.config_manager.set_identity_field('tls', status)
|
|
except Exception as exc:
|
|
logger.warning('refresh_cert_status: failed to persist tls status: %s', exc)
|
|
|
|
self._cert_refreshed_at = _time.monotonic()
|
|
return status
|
|
|
|
@staticmethod
|
|
def _check_cert_via_ssl(hostname: str, port: int = 443) -> Optional[Dict[str, Any]]:
|
|
"""Open an SSL connection and return cert expiry info, or None on failure."""
|
|
ctx = _ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = _ssl.CERT_NONE
|
|
try:
|
|
with _socket.create_connection((hostname, port), timeout=5) as raw:
|
|
with ctx.wrap_socket(raw, server_hostname=hostname) as tls:
|
|
der = tls.getpeercert(binary_form=True)
|
|
if not der:
|
|
return None
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
cert = x509.load_der_x509_certificate(der, default_backend())
|
|
# Use not_valid_after_utc (cryptography ≥42) with fallback for older builds.
|
|
try:
|
|
expiry = cert.not_valid_after_utc
|
|
except AttributeError:
|
|
expiry = cert.not_valid_after.replace(tzinfo=_dt.timezone.utc) # type: ignore[attr-defined]
|
|
now = _dt.datetime.now(_dt.timezone.utc)
|
|
days = (expiry - now).days
|
|
return {
|
|
'status': 'valid' if days > 0 else 'expired',
|
|
'expiry': expiry.isoformat(),
|
|
'days_remaining': days,
|
|
}
|
|
except Exception:
|
|
return None
|