feat: TLS certificate management in Vault page
Unit Tests / test (push) Successful in 7m26s

Adds live cert status, one-click ACME renewal, and custom cert upload
directly to the Vault page so users never need to touch Caddy config.

Backend:
- CaddyManager.get_cert_status() now returns domain, domain_mode, and
  cert_type so the UI can render the right controls without a separate
  identity fetch
- CaddyManager.renew_cert() reloads Caddy and invalidates the status
  cache; the frontend polls until the cert turns valid
- CaddyManager.upload_custom_cert() validates PEM, writes cert+key to
  the shared config/caddy/certs/ volume, updates identity (cert_type=custom),
  and regenerates the Caddyfile so Caddy references the new paths
- LAN-mode Caddyfile switches from /etc/caddy/internal/ to the shared
  certs dir automatically when cert_type=custom is set
- ddns_api default no longer includes /api/v1 — the plugin appends it;
  legacy /api/v1 suffix is stripped at write time to keep the Caddyfile clean
- POST /api/caddy/cert-renew and POST /api/caddy/custom-cert routes added

Frontend:
- TLSPanel component at the top of Vault.jsx shows status badge
  (valid/expiring-soon/expired/pending/internal) with domain and expiry
- Renew button visible only for ACME modes; spins during the API call
  then polls GET /api/caddy/cert-status every 10 s until valid
- Upload Custom Cert opens a modal with PEM text areas; works for all modes
- caddyAPI.renewCert() and uploadCustomCert() added to api.js

Tests: 22 new tests across 5 classes covering enriched status,
renew_cert guards, upload_custom_cert validation/writes/persistence,
custom-cert Caddyfile path selection, and ddns_api suffix stripping.
All 2093 existing tests continue to pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-08 12:53:42 -04:00
parent 85d265187d
commit 33d255f089
5 changed files with 1127 additions and 482 deletions
+37 -1
View File
@@ -897,7 +897,7 @@ def connectivity_get_peer_exits():
@app.route('/api/caddy/cert-status', methods=['GET']) @app.route('/api/caddy/cert-status', methods=['GET'])
def caddy_cert_status(): def caddy_cert_status():
"""Return TLS certificate status (expiry, days remaining, status). """Return TLS certificate status (expiry, days remaining, domain, mode).
Refreshes from Caddy if the cached value is older than 5 minutes. Refreshes from Caddy if the cached value is older than 5 minutes.
For LAN mode returns {'status': 'internal'}; for ACME modes returns For LAN mode returns {'status': 'internal'}; for ACME modes returns
@@ -910,6 +910,42 @@ def caddy_cert_status():
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@app.route('/api/caddy/cert-renew', methods=['POST'])
def caddy_cert_renew():
"""Trigger ACME certificate renewal by reloading Caddy.
Returns immediately with status='pending'; poll GET /api/caddy/cert-status
to track progress (Caddy typically acquires the cert within 30-60 s).
"""
try:
result = caddy_manager.renew_cert()
return jsonify(result), (200 if result.get('ok') else 400)
except Exception as e:
logger.error(f"caddy_cert_renew: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/caddy/custom-cert', methods=['POST'])
def caddy_upload_custom_cert():
"""Install a custom TLS certificate (PEM format).
Body: { "cert_pem": "<PEM>", "key_pem": "<PEM>" }
Validates the cert/key pair, writes to the shared certs directory,
and reloads Caddy with the updated Caddyfile.
"""
try:
data = request.get_json(silent=True) or {}
cert_pem = (data.get('cert_pem') or '').strip()
key_pem = (data.get('key_pem') or '').strip()
if not cert_pem or not key_pem:
return jsonify({'ok': False, 'error': 'cert_pem and key_pem are required'}), 400
result = caddy_manager.upload_custom_cert(cert_pem, key_pem)
return jsonify(result), (200 if result.get('ok') else 422)
except Exception as e:
logger.error(f"caddy_upload_custom_cert: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/egress/status', methods=['GET']) @app.route('/api/egress/status', methods=['GET'])
def egress_status(): def egress_status():
"""Return egress status for all installed services that have an egress config.""" """Return egress status for all installed services that have an egress config."""
+167 -13
View File
@@ -49,6 +49,16 @@ LIVE_CADDYFILE = os.environ.get('CADDYFILE_PATH', '/app/config-caddy/Caddyfile')
# localhost to match the dev/test wiring. # localhost to match the dev/test wiring.
CADDY_ADMIN_URL = os.environ.get('CADDY_ADMIN_URL', 'http://cell-caddy:2019') CADDY_ADMIN_URL = os.environ.get('CADDY_ADMIN_URL', 'http://cell-caddy:2019')
# Directory where the API writes custom TLS cert/key files.
# The Caddy container mounts ./config/caddy → /config/caddy, so files written
# here appear inside the container as /config/caddy/certs/<file>.
CADDY_CERTS_DIR = os.environ.get('CADDY_CERTS_DIR', '/app/config-caddy/certs')
# Paths as seen by the Caddy process (inside the container).
_CADDY_CUSTOM_CERT = '/config/caddy/certs/cert.pem'
_CADDY_CUSTOM_KEY = '/config/caddy/certs/key.pem'
_CADDY_INTERNAL_CERT = '/etc/caddy/internal/cert.pem'
_CADDY_INTERNAL_KEY = '/etc/caddy/internal/key.pem'
class CaddyManager(BaseServiceManager): class CaddyManager(BaseServiceManager):
"""Manages Caddy reverse-proxy configuration and runtime health.""" """Manages Caddy reverse-proxy configuration and runtime health."""
@@ -135,7 +145,9 @@ class CaddyManager(BaseServiceManager):
) )
if domain_mode == 'lan': if domain_mode == 'lan':
return self._caddyfile_lan(cell_name, service_routes, core_routes) cert_path, key_path = self._tls_cert_pair()
return self._caddyfile_lan(cell_name, service_routes, core_routes,
cert_path, key_path)
if domain_mode == 'pic_ngo': if domain_mode == 'pic_ngo':
return self._caddyfile_pic_ngo(cell_name, service_routes, core_routes) return self._caddyfile_pic_ngo(cell_name, service_routes, core_routes)
if domain_mode == 'cloudflare': if domain_mode == 'cloudflare':
@@ -252,8 +264,21 @@ class CaddyManager(BaseServiceManager):
chunks.append(route.strip("\n")) chunks.append(route.strip("\n"))
return "\n".join(chunks) return "\n".join(chunks)
def _tls_cert_pair(self) -> tuple:
"""Return (cert_path, key_path) as seen inside the Caddy container.
Uses the custom-uploaded cert when one is installed, otherwise falls
back to the internal-CA cert that the VaultManager writes.
"""
ident = (self.config_manager.get_identity() if self.config_manager else {}) or {}
if ident.get('tls', {}).get('cert_type') == 'custom':
return _CADDY_CUSTOM_CERT, _CADDY_CUSTOM_KEY
return _CADDY_INTERNAL_CERT, _CADDY_INTERNAL_KEY
def _caddyfile_lan(self, cell_name: str, def _caddyfile_lan(self, cell_name: str,
service_routes: str, core_routes: str) -> str: service_routes: str, core_routes: str,
cert_path: str = _CADDY_INTERNAL_CERT,
key_path: str = _CADDY_INTERNAL_KEY) -> str:
"""LAN mode: HTTP only + internal-CA TLS, no ACME.""" """LAN mode: HTTP only + internal-CA TLS, no ACME."""
body = [] body = []
if service_routes: if service_routes:
@@ -267,7 +292,7 @@ class CaddyManager(BaseServiceManager):
"}\n" "}\n"
"\n" "\n"
f"http://{cell_name}.cell, http://172.20.0.2:80 {{\n" f"http://{cell_name}.cell, http://172.20.0.2:80 {{\n"
" tls /etc/caddy/internal/cert.pem /etc/caddy/internal/key.pem\n" f" tls {cert_path} {key_path}\n"
f"{inner}\n" f"{inner}\n"
"}\n" "}\n"
) )
@@ -290,7 +315,9 @@ class CaddyManager(BaseServiceManager):
# the pic_ngo plugin authenticates to POST /api/v1/dns-challenge with this token. # the pic_ngo plugin authenticates to POST /api/v1/dns-challenge with this token.
ddns_cfg = self.config_manager.configs.get('ddns', {}) ddns_cfg = self.config_manager.configs.get('ddns', {})
ddns_token = (ddns_cfg.get('token') or os.environ.get('DDNS_TOKEN') or '').strip() ddns_token = (ddns_cfg.get('token') or os.environ.get('DDNS_TOKEN') or '').strip()
ddns_api = (os.environ.get('DDNS_URL') or ddns_cfg.get('url') or 'https://ddns.pic.ngo/api/v1').strip() _raw_api = (os.environ.get('DDNS_URL') or ddns_cfg.get('url') or 'https://ddns.pic.ngo').strip()
# Strip legacy /api/v1 suffix — the pic_ngo plugin appends /api/v1 itself.
ddns_api = _raw_api.rstrip('/').removesuffix('/api/v1')
return ( return (
f"{self._global_acme_block(email)}\n" f"{self._global_acme_block(email)}\n"
@@ -502,22 +529,44 @@ class CaddyManager(BaseServiceManager):
# ── Certificate status ──────────────────────────────────────────────── # ── Certificate status ────────────────────────────────────────────────
def get_cert_status(self) -> Dict[str, Any]: def get_cert_status(self) -> Dict[str, Any]:
"""Return TLS cert status from identity['tls'] if present (cached).""" """Return TLS cert status enriched with identity context (cached)."""
default = {'status': 'unknown', 'expiry': None, 'days_remaining': None} ident: Dict[str, Any] = {}
if not self.config_manager: if self.config_manager:
return default try:
try: ident = self.config_manager.get_identity() or {}
ident = self.config_manager.get_identity() or {} except Exception as e:
except Exception as e: logger.error("get_cert_status: failed to read identity: %s", e)
logger.error("get_cert_status: failed to read identity: %s", e)
return default domain_mode = ident.get('domain_mode', 'lan')
tls = ident.get('tls') or {} tls = ident.get('tls') or {}
cert_type = tls.get('cert_type', 'custom' if tls.get('cert_type') == 'custom'
else ('internal' if domain_mode == 'lan' else 'acme'))
return { return {
'status': tls.get('status', 'unknown'), 'status': tls.get('status', 'unknown'),
'expiry': tls.get('expiry'), 'expiry': tls.get('expiry'),
'days_remaining': tls.get('days_remaining'), 'days_remaining': tls.get('days_remaining'),
'domain': self._domain_label(ident),
'domain_mode': domain_mode,
'cert_type': cert_type,
} }
@staticmethod
def _domain_label(ident: Dict[str, Any]) -> Optional[str]:
"""Return a human-readable domain string for display in the UI."""
mode = ident.get('domain_mode', 'lan')
cell = ident.get('cell_name', '')
if mode == 'pic_ngo':
return f'*.{cell}.pic.ngo' if cell else None
if mode == 'cloudflare':
d = ident.get('domain_name') or ident.get('domain', '')
return f'*.{d}' if d else None
if mode == 'duckdns':
return f'*.{cell}.duckdns.org' if cell else None
if mode == 'http01':
return ident.get('domain_name') or ident.get('domain')
return None # lan
def get_cert_status_fresh(self, max_age_seconds: int = 300) -> Dict[str, Any]: def get_cert_status_fresh(self, max_age_seconds: int = 300) -> Dict[str, Any]:
"""Return cert status, refreshing if the cached value is older than max_age_seconds.""" """Return cert status, refreshing if the cached value is older than max_age_seconds."""
now = _time.monotonic() now = _time.monotonic()
@@ -584,3 +633,108 @@ class CaddyManager(BaseServiceManager):
} }
except Exception: except Exception:
return None return None
# ── Active cert management ────────────────────────────────────────────
def renew_cert(self) -> Dict[str, Any]:
"""Trigger ACME cert renewal by reloading Caddy.
Returns immediately with status='pending' so the caller can poll
GET /api/caddy/cert-status to track progress. Not applicable to
LAN mode — callers should use upload_custom_cert() instead.
"""
ident = (self.config_manager.get_identity() if self.config_manager else {}) or {}
domain_mode = ident.get('domain_mode', 'lan')
if domain_mode == 'lan':
return {
'ok': False,
'error': 'ACME renewal is not available in LAN mode. '
'Upload a custom certificate instead.',
}
if not self.reload_caddy():
return {'ok': False, 'error': 'Caddy reload failed — check Caddy logs.'}
# Invalidate the cached status so the next poll triggers a fresh SSL check.
self._cert_refreshed_at = None
return {
'ok': True,
'status': 'pending',
'message': 'Renewal triggered. Certificate status will update within 60 s.',
}
def upload_custom_cert(self, cert_pem: str, key_pem: str) -> Dict[str, Any]:
"""Validate and install a custom TLS certificate.
Writes cert+key to the shared certs directory (visible to Caddy),
regenerates the Caddyfile to reference the new paths, and reloads.
Works for all domain modes — use this when you have a certificate
issued by your own CA or a commercial provider.
"""
cert_info = self._parse_pem_cert(cert_pem)
if cert_info is None:
return {'ok': False, 'error': 'Invalid certificate: could not parse PEM.'}
if not self._validate_key_pem(key_pem):
return {'ok': False, 'error': 'Invalid private key: expected PEM with PRIVATE KEY header.'}
try:
os.makedirs(CADDY_CERTS_DIR, exist_ok=True)
with open(os.path.join(CADDY_CERTS_DIR, 'cert.pem'), 'w') as fh:
fh.write(cert_pem)
with open(os.path.join(CADDY_CERTS_DIR, 'key.pem'), 'w') as fh:
fh.write(key_pem)
except OSError as exc:
logger.error('upload_custom_cert: write failed: %s', exc)
return {'ok': False, 'error': f'Failed to write cert files: {exc}'}
days = cert_info.get('days_remaining', 0)
tls_info: Dict[str, Any] = {
'status': 'valid' if days > 0 else 'expired',
'expiry': cert_info.get('expiry'),
'days_remaining': days,
'cert_type': 'custom',
}
if self.config_manager:
try:
self.config_manager.set_identity_field('tls', tls_info)
except Exception as exc:
logger.warning('upload_custom_cert: could not persist tls info: %s', exc)
# Regenerate Caddyfile so the tls directive references the new cert.
if self.config_manager:
try:
self.regenerate_with_installed([])
except Exception as exc:
logger.warning('upload_custom_cert: Caddyfile regeneration failed: %s', exc)
return {'ok': True, **tls_info}
@staticmethod
def _parse_pem_cert(cert_pem: str) -> Optional[Dict[str, Any]]:
"""Parse a PEM certificate and return expiry metadata, or None on error."""
try:
from cryptography import x509
cert_bytes = cert_pem.encode() if isinstance(cert_pem, str) else cert_pem
cert = x509.load_pem_x509_certificate(cert_bytes)
try:
expiry = cert.not_valid_after_utc
except AttributeError:
expiry = cert.not_valid_after.replace(tzinfo=_dt.timezone.utc) # type: ignore[attr-defined]
now = _dt.datetime.now(_dt.timezone.utc)
days = (expiry - now).days
return {
'expiry': expiry.isoformat(),
'days_remaining': days,
'subject': cert.subject.rfc4514_string(),
}
except Exception as exc:
logger.debug('_parse_pem_cert failed: %s', exc)
return None
@staticmethod
def _validate_key_pem(key_pem: str) -> bool:
"""Return True if key_pem contains a PEM-encoded private key block."""
return ('-----BEGIN' in key_pem
and 'PRIVATE KEY' in key_pem
and '-----END' in key_pem)
+231 -1
View File
@@ -70,7 +70,9 @@ class TestGenerateCaddyfilePicNgo(unittest.TestCase):
self.assertIn('alpha.pic.ngo', out) self.assertIn('alpha.pic.ngo', out)
# Registration token (not TOTP secret) is embedded — no {$VAR} placeholders # Registration token (not TOTP secret) is embedded — no {$VAR} placeholders
self.assertIn('token TESTSECRET123', out) self.assertIn('token TESTSECRET123', out)
self.assertIn('api_base_url https://ddns.pic.ngo/api/v1', out) # /api/v1 is stripped — the plugin appends it itself
self.assertIn('api_base_url https://ddns.pic.ngo', out)
self.assertNotIn('api_base_url https://ddns.pic.ngo/api/v1', out)
self.assertNotIn('{$PIC_NGO_DDNS_TOKEN}', out) self.assertNotIn('{$PIC_NGO_DDNS_TOKEN}', out)
self.assertNotIn('{$PIC_NGO_DDNS_API}', out) self.assertNotIn('{$PIC_NGO_DDNS_API}', out)
self.assertIn('email admin@alpha.pic.ngo', out) self.assertIn('email admin@alpha.pic.ngo', out)
@@ -435,5 +437,233 @@ class TestRefreshCertStatus(unittest.TestCase):
mock_ref.assert_not_called() mock_ref.assert_not_called()
class TestGetCertStatusEnriched(unittest.TestCase):
"""get_cert_status() returns domain, domain_mode, cert_type alongside tls fields."""
def test_includes_domain_and_mode_for_pic_ngo(self):
mgr = _mgr(identity={
'cell_name': 'alpha',
'domain_mode': 'pic_ngo',
'tls': {'status': 'valid', 'expiry': '2026-12-01T00:00:00+00:00', 'days_remaining': 180},
})
s = mgr.get_cert_status()
self.assertEqual(s['domain_mode'], 'pic_ngo')
self.assertEqual(s['domain'], '*.alpha.pic.ngo')
self.assertEqual(s['cert_type'], 'acme')
self.assertEqual(s['status'], 'valid')
def test_cert_type_is_internal_for_lan_mode(self):
mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan', 'tls': {}})
s = mgr.get_cert_status()
self.assertEqual(s['cert_type'], 'internal')
self.assertIsNone(s['domain'])
def test_cert_type_is_custom_when_tls_says_so(self):
mgr = _mgr(identity={
'cell_name': 'x',
'domain_mode': 'lan',
'tls': {'cert_type': 'custom', 'status': 'valid',
'expiry': '2027-01-01T00:00:00+00:00', 'days_remaining': 200},
})
s = mgr.get_cert_status()
self.assertEqual(s['cert_type'], 'custom')
def test_domain_label_cloudflare(self):
ident = {'domain_mode': 'cloudflare', 'domain_name': 'example.com'}
self.assertEqual(CaddyManager._domain_label(ident), '*.example.com')
def test_domain_label_duckdns(self):
ident = {'cell_name': 'beta', 'domain_mode': 'duckdns'}
self.assertEqual(CaddyManager._domain_label(ident), '*.beta.duckdns.org')
def test_domain_label_http01(self):
ident = {'domain_mode': 'http01', 'domain_name': 'myhost.noip.me'}
self.assertEqual(CaddyManager._domain_label(ident), 'myhost.noip.me')
def test_domain_label_lan_is_none(self):
self.assertIsNone(CaddyManager._domain_label({'domain_mode': 'lan'}))
class TestRenewCert(unittest.TestCase):
"""renew_cert() — mode guard, reload call, cache invalidation."""
def test_lan_mode_returns_error(self):
mgr = _mgr(identity={'domain_mode': 'lan'})
result = mgr.renew_cert()
self.assertFalse(result['ok'])
self.assertIn('LAN', result['error'])
def test_acme_mode_calls_reload(self):
mgr = _mgr(identity={'domain_mode': 'pic_ngo'})
with patch.object(mgr, 'reload_caddy', return_value=True) as mock_reload:
result = mgr.renew_cert()
mock_reload.assert_called_once()
self.assertTrue(result['ok'])
self.assertEqual(result['status'], 'pending')
def test_reload_failure_propagated(self):
mgr = _mgr(identity={'domain_mode': 'cloudflare'})
with patch.object(mgr, 'reload_caddy', return_value=False):
result = mgr.renew_cert()
self.assertFalse(result['ok'])
self.assertIn('reload failed', result['error'])
def test_invalidates_cache_on_success(self):
import time
mgr = _mgr(identity={'domain_mode': 'pic_ngo'})
mgr._cert_refreshed_at = time.monotonic()
with patch.object(mgr, 'reload_caddy', return_value=True):
mgr.renew_cert()
self.assertIsNone(mgr._cert_refreshed_at)
class TestUploadCustomCert(unittest.TestCase):
"""upload_custom_cert() — validation, file writes, identity persistence, Caddyfile regen."""
def _make_pem_cert(self, days_remaining: int = 90):
"""Return (cert_pem, key_pem) for a self-signed cert."""
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import datetime
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
now = datetime.datetime.now(datetime.timezone.utc)
expiry = now + datetime.timedelta(days=days_remaining)
not_before = (now - datetime.timedelta(days=abs(days_remaining) + 10)
if days_remaining < 0 else now - datetime.timedelta(days=1))
cert = (
x509.CertificateBuilder()
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(not_before)
.not_valid_after(expiry)
.sign(key, hashes.SHA256())
)
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode()
key_pem = key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
).decode()
return cert_pem, key_pem
def test_rejects_invalid_cert_pem(self):
mgr = _mgr()
result = mgr.upload_custom_cert('not a cert', '-----BEGIN PRIVATE KEY-----\nXXX\n-----END PRIVATE KEY-----')
self.assertFalse(result['ok'])
self.assertIn('Invalid certificate', result['error'])
def test_rejects_invalid_key_pem(self):
mgr = _mgr()
cert_pem, _ = self._make_pem_cert()
result = mgr.upload_custom_cert(cert_pem, 'not a key')
self.assertFalse(result['ok'])
self.assertIn('Invalid private key', result['error'])
def test_writes_files_to_certs_dir(self):
mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'})
cert_pem, key_pem = self._make_pem_cert()
written = {}
def fake_open(path, mode='r', **kw):
import unittest.mock
m = unittest.mock.mock_open()()
if 'w' in mode:
written[path] = True
return m
with patch('builtins.open', side_effect=fake_open):
with patch('os.makedirs'):
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
mgr.upload_custom_cert(cert_pem, key_pem)
self.assertTrue(any('cert.pem' in p for p in written))
self.assertTrue(any('key.pem' in p for p in written))
def test_persists_custom_cert_type_to_identity(self):
mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'})
cert_pem, key_pem = self._make_pem_cert(days_remaining=90)
with patch('builtins.open', unittest.mock.mock_open()):
with patch('os.makedirs'):
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
result = mgr.upload_custom_cert(cert_pem, key_pem)
self.assertTrue(result['ok'])
self.assertEqual(result['cert_type'], 'custom')
self.assertEqual(result['status'], 'valid')
mgr.config_manager.set_identity_field.assert_called_once()
call_args = mgr.config_manager.set_identity_field.call_args
self.assertEqual(call_args[0][0], 'tls')
self.assertEqual(call_args[0][1]['cert_type'], 'custom')
def test_expired_cert_flagged_as_expired(self):
mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'})
cert_pem, key_pem = self._make_pem_cert(days_remaining=-5)
with patch('builtins.open', unittest.mock.mock_open()):
with patch('os.makedirs'):
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
result = mgr.upload_custom_cert(cert_pem, key_pem)
self.assertEqual(result['status'], 'expired')
def test_file_write_failure_returns_error(self):
mgr = _mgr(identity={'domain_mode': 'lan'})
cert_pem, key_pem = self._make_pem_cert()
with patch('os.makedirs'):
with patch('builtins.open', side_effect=OSError('no space')):
result = mgr.upload_custom_cert(cert_pem, key_pem)
self.assertFalse(result['ok'])
self.assertIn('Failed to write', result['error'])
class TestCaddyfileLanCustomCert(unittest.TestCase):
"""_caddyfile_lan() uses the custom cert path when cert_type=custom."""
def test_default_uses_internal_cert_path(self):
mgr = _mgr(identity={'cell_name': 'mycell', 'domain_mode': 'lan'})
out = mgr.generate_caddyfile({'cell_name': 'mycell', 'domain_mode': 'lan'}, [])
self.assertIn('/etc/caddy/internal/cert.pem', out)
def test_custom_cert_type_uses_shared_cert_path(self):
mgr = _mgr(identity={
'cell_name': 'mycell',
'domain_mode': 'lan',
'tls': {'cert_type': 'custom'},
})
out = mgr.generate_caddyfile({'cell_name': 'mycell', 'domain_mode': 'lan'}, [])
self.assertIn('/config/caddy/certs/cert.pem', out)
self.assertNotIn('/etc/caddy/internal/cert.pem', out)
class TestDdnsApiStripsLegacySuffix(unittest.TestCase):
"""_caddyfile_pic_ngo strips /api/v1 from ddns_api so the plugin doesn't double it."""
def test_api_v1_suffix_stripped_from_config_url(self):
mgr = _mgr()
mgr.config_manager.configs = {
'ddns': {'token': 'tok', 'url': 'https://ddns.pic.ngo/api/v1'},
}
with patch.dict(os.environ, {}, clear=False):
os.environ.pop('DDNS_URL', None)
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
self.assertIn('api_base_url https://ddns.pic.ngo', out)
self.assertNotIn('api_base_url https://ddns.pic.ngo/api/v1', out)
def test_clean_url_is_unchanged(self):
mgr = _mgr()
mgr.config_manager.configs = {
'ddns': {'token': 'tok', 'url': 'https://ddns.pic.ngo'},
}
with patch.dict(os.environ, {}, clear=False):
os.environ.pop('DDNS_URL', None)
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
self.assertIn('api_base_url https://ddns.pic.ngo', out)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
+689 -467
View File
File diff suppressed because it is too large Load Diff
+3
View File
@@ -387,6 +387,9 @@ export const containerAPI = {
// Caddy / TLS API // Caddy / TLS API
export const caddyAPI = { export const caddyAPI = {
getCertStatus: () => api.get('/api/caddy/cert-status'), getCertStatus: () => api.get('/api/caddy/cert-status'),
renewCert: () => api.post('/api/caddy/cert-renew'),
uploadCustomCert: (certPem, keyPem) =>
api.post('/api/caddy/custom-cert', { cert_pem: certPem, key_pem: keyPem }),
}; };
export default api; export default api;