Files
pic/tests/test_caddy_manager.py
T
roof e9077b2633
Unit Tests / test (push) Successful in 7m35s
fix: Caddy health check must hit /config/ not /
GET http://cell-caddy:2019/ returns 404 because Caddy's admin API has no
root handler.  The health monitor interpreted every response as a failure,
restarted Caddy every 3 minutes, and prevented ACME from ever completing.

/config/ returns 200 + the running config JSON whenever Caddy is up and
serving — that is the correct liveness indicator.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-08 15:57:32 -04:00

695 lines
30 KiB
Python

"""Tests for CaddyManager — Caddyfile generation per domain mode plus
admin-API reload, health check, and consecutive-failure bookkeeping.
"""
import os
import sys
import unittest
from unittest.mock import MagicMock, patch
import requests
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
from caddy_manager import CaddyManager # noqa: E402
def _mgr(tmpdir=None, identity=None):
"""Build a CaddyManager backed by a mock config_manager."""
cm = MagicMock()
cm.get_identity.return_value = identity or {}
mgr = CaddyManager(
config_manager=cm,
data_dir=tmpdir or '/tmp/pic-test-data',
config_dir=tmpdir or '/tmp/pic-test-config',
)
return mgr
CALENDAR_ROUTE = (
"handle /calendar* {\n"
" reverse_proxy cell-radicale:5232\n"
"}"
)
FILES_ROUTE = (
"handle /files* {\n"
" reverse_proxy cell-filegator:8080\n"
"}"
)
class TestGenerateCaddyfileLan(unittest.TestCase):
def test_lan_mode_has_auto_https_off_and_no_acme(self):
mgr = _mgr()
identity = {'cell_name': 'mycell', 'domain_mode': 'lan'}
out = mgr.generate_caddyfile(identity, [])
self.assertIn('auto_https off', out)
# No ACME anywhere
self.assertNotIn('acme_ca', out)
self.assertNotIn('acme_email', out)
self.assertNotIn('dns pic_ngo', out)
self.assertNotIn('dns cloudflare', out)
# Internal-CA TLS pair
self.assertIn('tls /etc/caddy/internal/cert.pem '
'/etc/caddy/internal/key.pem', out)
# Cell hostname plus virtual IP listener
self.assertIn('http://mycell.cell', out)
self.assertIn('http://172.20.0.2:80', out)
class TestGenerateCaddyfilePicNgo(unittest.TestCase):
def test_pic_ngo_has_dns_plugin_and_wildcard(self):
mgr = _mgr()
mgr.config_manager.configs = {
'ddns': {'token': 'TESTSECRET123', 'url': 'https://ddns.pic.ngo/api/v1'},
}
identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}
with unittest.mock.patch.dict(os.environ, {'DDNS_URL': 'https://ddns.pic.ngo/api/v1'}):
out = mgr.generate_caddyfile(identity, [])
self.assertIn('dns pic_ngo', out)
self.assertIn('*.alpha.pic.ngo', out)
self.assertIn('alpha.pic.ngo', out)
# Registration token (not TOTP secret) is embedded — no {$VAR} placeholders
self.assertIn('token TESTSECRET123', out)
# /api/v1 is stripped — the plugin appends it itself
self.assertIn('api_base_url https://ddns.pic.ngo', out)
self.assertNotIn('api_base_url https://ddns.pic.ngo/api/v1', out)
self.assertNotIn('{$PIC_NGO_DDNS_TOKEN}', out)
self.assertNotIn('{$PIC_NGO_DDNS_API}', out)
self.assertIn('email admin@alpha.pic.ngo', out)
# acme_ca is omitted when ACME_CA_URL is not set
self.assertNotIn('acme_ca', out)
def test_pic_ngo_acme_ca_included_when_env_set(self):
mgr = _mgr()
mgr.config_manager.configs = {'ddns': {'token': 'TESTSECRET123'}}
identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}
with unittest.mock.patch.dict(os.environ, {
'DDNS_URL': 'https://ddns.pic.ngo/api/v1',
'ACME_CA_URL': 'https://acme-staging-v02.api.letsencrypt.org/directory',
}):
out = mgr.generate_caddyfile(identity, [])
self.assertIn('acme_ca https://acme-staging-v02.api.letsencrypt.org/directory', out)
def test_pic_ngo_has_api_route_without_registry(self):
mgr = _mgr()
identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}
out = mgr.generate_caddyfile(identity, [])
# Without a registry only the api block is present
self.assertIn('@api host api.alpha.pic.ngo', out)
self.assertIn('reverse_proxy cell-api:3000', out)
self.assertNotIn('@calendar', out)
self.assertNotIn('@mail', out)
self.assertNotIn('@files', out)
class TestGenerateCaddyfileCloudflare(unittest.TestCase):
def test_cloudflare_has_dns_cloudflare(self):
mgr = _mgr()
identity = {
'cell_name': 'beta',
'domain_mode': 'cloudflare',
'domain_name': 'example.com',
}
out = mgr.generate_caddyfile(identity, [])
self.assertIn('dns cloudflare {$CF_API_TOKEN}', out)
self.assertIn('*.example.com', out)
self.assertIn('email {$ACME_EMAIL}', out)
# acme_ca is omitted when ACME_CA_URL is not set in the environment
self.assertNotIn('acme_ca', out)
def test_caddyfile_cloudflare_uses_domain_name(self):
"""Caddyfile must use domain_name for TLS host, not any 'custom_domain' key."""
mgr = _mgr()
identity = {
'cell_name': 'beta',
'domain_mode': 'cloudflare',
'domain_name': 'home.example.com',
'domain': 'home.local',
}
out = mgr.generate_caddyfile(identity, [])
self.assertIn('*.home.example.com', out)
self.assertIn('home.example.com', out)
# Must not use the internal domain for TLS
self.assertNotIn('*.home.local', out)
# 'custom_domain' must not appear literally as a key in the output
self.assertNotIn('custom_domain', out)
# Without a registry only the api block is emitted for subdomain routing
self.assertIn('@api host api.home.example.com', out)
self.assertNotIn('@calendar', out)
self.assertNotIn('@files', out)
class TestGenerateCaddyfileDuckDns(unittest.TestCase):
def test_duckdns_has_dns_duckdns(self):
mgr = _mgr()
identity = {'cell_name': 'gamma', 'domain_mode': 'duckdns'}
out = mgr.generate_caddyfile(identity, [])
self.assertIn('dns duckdns {$DUCKDNS_TOKEN}', out)
self.assertIn('*.gamma.duckdns.org', out)
self.assertIn('@api host api.gamma.duckdns.org', out)
self.assertNotIn('@calendar', out)
self.assertNotIn('@files', out)
class TestGenerateCaddyfileHttp01(unittest.TestCase):
def test_http01_no_tls_block_and_per_service_blocks(self):
mgr = _mgr()
identity = {
'cell_name': 'delta',
'domain_mode': 'http01',
'domain_name': 'delta.noip.me',
}
# Store-plugin service (not a core service name)
services = [
{'name': 'chat', 'caddy_route': 'reverse_proxy cell-chat:8090'},
]
out = mgr.generate_caddyfile(identity, services)
# No wildcard, no DNS-01 plugins.
self.assertNotIn('*.delta', out)
self.assertNotIn('dns ', out)
# No explicit tls block — Caddy uses HTTP-01 by default.
self.assertNotIn('tls {', out)
# Without a registry only the api block is generated
self.assertIn('api.delta.noip.me {', out)
self.assertNotIn('calendar.delta.noip.me {', out)
self.assertNotIn('files.delta.noip.me {', out)
self.assertNotIn('mail.delta.noip.me {', out)
# Installed plugin service block still works
self.assertIn('chat.delta.noip.me {', out)
self.assertIn('reverse_proxy cell-chat:8090', out)
def test_http01_installed_service_with_caddy_route_appears(self):
"""An installed service with a caddy_route produces its own per-host block."""
mgr = _mgr()
identity = {
'cell_name': 'delta',
'domain_mode': 'http01',
'domain_name': 'delta.noip.me',
}
services = [{'name': 'notes', 'caddy_route': 'reverse_proxy cell-other:9000'}]
out = mgr.generate_caddyfile(identity, services)
self.assertIn('notes.delta.noip.me {', out)
self.assertIn('reverse_proxy cell-other:9000', out)
class TestServiceRoutesIncluded(unittest.TestCase):
def test_installed_service_route_appears_in_output(self):
mgr = _mgr()
identity = {'cell_name': 'eps', 'domain_mode': 'lan'}
services = [
{'name': 'calendar', 'caddy_route': CALENDAR_ROUTE},
{'name': 'files', 'caddy_route': FILES_ROUTE},
]
out = mgr.generate_caddyfile(identity, services)
self.assertIn('handle /calendar*', out)
self.assertIn('reverse_proxy cell-radicale:5232', out)
self.assertIn('handle /files*', out)
self.assertIn('reverse_proxy cell-filegator:8080', out)
# Core routes still emitted
self.assertIn('reverse_proxy cell-api:3000', out)
self.assertIn('reverse_proxy cell-webui:80', out)
class TestReloadCaddyAdminAPI(unittest.TestCase):
def test_reload_calls_admin_api_load_endpoint(self):
mgr = _mgr()
# Point at a tmp Caddyfile so we can read it back during reload.
import tempfile
tmp = tempfile.NamedTemporaryFile('w', delete=False, suffix='.caddyfile')
tmp.write(":80 { reverse_proxy cell-webui:80 }\n")
tmp.close()
mgr.caddyfile_path = tmp.name
with patch('caddy_manager.requests.post') as mock_post:
mock_post.return_value = MagicMock(status_code=200, text='ok')
ok = mgr.reload_caddy()
self.assertTrue(ok)
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
# First positional arg is the URL
self.assertEqual(args[0], 'http://cell-caddy:2019/load')
self.assertEqual(kwargs['headers']['Content-Type'], 'text/caddyfile')
self.assertIn('cell-webui:80', kwargs['data'])
os.unlink(tmp.name)
class TestHealthCheck(unittest.TestCase):
def test_returns_true_on_200(self):
mgr = _mgr()
with patch('caddy_manager.requests.get') as mock_get:
mock_get.return_value = MagicMock(status_code=200)
self.assertTrue(mgr.check_caddy_health())
mock_get.assert_called_once()
# Must hit /config/ — not the root which returns 404
self.assertIn('/config/', mock_get.call_args[0][0])
def test_returns_false_on_connection_error(self):
mgr = _mgr()
with patch('caddy_manager.requests.get',
side_effect=requests.ConnectionError('refused')):
self.assertFalse(mgr.check_caddy_health())
def test_returns_false_on_non_200(self):
mgr = _mgr()
with patch('caddy_manager.requests.get') as mock_get:
mock_get.return_value = MagicMock(status_code=500)
self.assertFalse(mgr.check_caddy_health())
class TestFailureCounter(unittest.TestCase):
def test_increments_and_resets(self):
mgr = _mgr()
self.assertEqual(mgr.get_health_failure_count(), 0)
self.assertEqual(mgr.increment_health_failure(), 1)
self.assertEqual(mgr.increment_health_failure(), 2)
self.assertEqual(mgr.increment_health_failure(), 3)
self.assertEqual(mgr.get_health_failure_count(), 3)
mgr.reset_health_failures()
self.assertEqual(mgr.get_health_failure_count(), 0)
class TestCertStatus(unittest.TestCase):
def test_returns_default_when_no_tls_in_identity(self):
mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan'})
out = mgr.get_cert_status()
self.assertEqual(out['status'], 'unknown')
self.assertIsNone(out['expiry'])
self.assertIsNone(out['days_remaining'])
def test_returns_tls_block_when_present(self):
mgr = _mgr(identity={
'cell_name': 'x',
'domain_mode': 'pic_ngo',
'tls': {
'status': 'valid',
'expiry': '2026-08-01T00:00:00Z',
'days_remaining': 84,
},
})
out = mgr.get_cert_status()
self.assertEqual(out['status'], 'valid')
self.assertEqual(out['expiry'], '2026-08-01T00:00:00Z')
self.assertEqual(out['days_remaining'], 84)
class TestCaddyManagerIdentityChangedSubscription(unittest.TestCase):
def test_subscribes_to_identity_changed_on_init(self):
"""When service_bus is provided, CaddyManager subscribes to IDENTITY_CHANGED."""
from service_bus import EventType
mock_bus = MagicMock()
mgr = CaddyManager(config_manager=MagicMock(), service_bus=mock_bus)
mock_bus.subscribe_to_event.assert_called_once_with(
EventType.IDENTITY_CHANGED, mgr._on_identity_changed
)
def test_no_subscription_without_service_bus(self):
"""When service_bus is omitted, no subscription is attempted."""
mock_bus = MagicMock()
CaddyManager(config_manager=MagicMock())
mock_bus.subscribe_to_event.assert_not_called()
def test_on_identity_changed_calls_regenerate_with_installed(self):
"""_on_identity_changed calls regenerate_with_installed([])."""
mgr = _mgr()
with patch.object(mgr, 'regenerate_with_installed', return_value=True) as mock_regen:
event = MagicMock()
mgr._on_identity_changed(event)
mock_regen.assert_called_once_with([])
def test_on_identity_changed_swallows_exceptions(self):
"""_on_identity_changed must not propagate exceptions."""
mgr = _mgr()
with patch.object(mgr, 'regenerate_with_installed', side_effect=Exception('boom')):
event = MagicMock()
mgr._on_identity_changed(event) # must not raise
class TestRefreshCertStatus(unittest.TestCase):
"""refresh_cert_status() + _check_cert_via_ssl()."""
def _make_der_cert(self, days_remaining: int) -> bytes:
"""Return a minimal self-signed DER cert valid for *days_remaining* days."""
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import datetime
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
now = datetime.datetime.now(datetime.timezone.utc)
expiry = now + datetime.timedelta(days=days_remaining)
cert = (
x509.CertificateBuilder()
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(expiry - datetime.timedelta(days=30))
.not_valid_after(expiry)
.sign(key, hashes.SHA256())
)
return cert.public_bytes(serialization.Encoding.DER)
def test_check_cert_via_ssl_returns_none_on_connection_error(self):
"""_check_cert_via_ssl returns None when connection fails."""
with patch('caddy_manager._socket.create_connection', side_effect=OSError('refused')):
result = CaddyManager._check_cert_via_ssl('host', 443)
self.assertIsNone(result)
def test_check_cert_via_ssl_returns_valid_status(self):
"""_check_cert_via_ssl returns valid status for a future-dated cert."""
der = self._make_der_cert(60)
mock_tls = MagicMock()
mock_tls.__enter__ = MagicMock(return_value=mock_tls)
mock_tls.__exit__ = MagicMock(return_value=False)
mock_tls.getpeercert.return_value = der
mock_raw = MagicMock()
mock_raw.__enter__ = MagicMock(return_value=mock_raw)
mock_raw.__exit__ = MagicMock(return_value=False)
with patch('caddy_manager._socket.create_connection', return_value=mock_raw):
with patch('caddy_manager._ssl.create_default_context') as mock_ctx:
mock_ctx.return_value.wrap_socket.return_value = mock_tls
result = CaddyManager._check_cert_via_ssl('host', 443)
self.assertIsNotNone(result)
self.assertEqual(result['status'], 'valid')
self.assertGreater(result['days_remaining'], 50)
def test_check_cert_via_ssl_returns_expired_for_past_cert(self):
"""_check_cert_via_ssl returns expired when cert is in the past."""
der = self._make_der_cert(-5)
mock_tls = MagicMock()
mock_tls.__enter__ = MagicMock(return_value=mock_tls)
mock_tls.__exit__ = MagicMock(return_value=False)
mock_tls.getpeercert.return_value = der
mock_raw = MagicMock()
mock_raw.__enter__ = MagicMock(return_value=mock_raw)
mock_raw.__exit__ = MagicMock(return_value=False)
with patch('caddy_manager._socket.create_connection', return_value=mock_raw):
with patch('caddy_manager._ssl.create_default_context') as mock_ctx:
mock_ctx.return_value.wrap_socket.return_value = mock_tls
result = CaddyManager._check_cert_via_ssl('host', 443)
self.assertIsNotNone(result)
self.assertEqual(result['status'], 'expired')
self.assertLess(result['days_remaining'], 0)
def test_refresh_cert_status_lan_mode_returns_internal(self):
"""LAN mode always returns status='internal' without SSL check."""
mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan'})
with patch.object(CaddyManager, '_check_cert_via_ssl') as mock_ssl:
result = mgr.refresh_cert_status()
mock_ssl.assert_not_called()
self.assertEqual(result['status'], 'internal')
def test_refresh_cert_status_acme_mode_calls_ssl_check(self):
"""ACME mode calls _check_cert_via_ssl and persists the result."""
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
expected = {'status': 'valid', 'expiry': '2026-12-01T00:00:00+00:00', 'days_remaining': 179}
with patch.object(CaddyManager, '_check_cert_via_ssl', return_value=expected):
result = mgr.refresh_cert_status()
self.assertEqual(result['status'], 'valid')
# Should have been persisted to identity
mgr.config_manager.set_identity_field.assert_called_with('tls', expected)
def test_refresh_cert_status_ssl_failure_returns_unknown(self):
"""When SSL check returns None, status is 'unknown'."""
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
with patch.object(CaddyManager, '_check_cert_via_ssl', return_value=None):
result = mgr.refresh_cert_status()
self.assertEqual(result['status'], 'unknown')
def test_get_cert_status_fresh_refreshes_when_stale(self):
"""get_cert_status_fresh triggers a refresh when cache is None."""
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
mgr._cert_refreshed_at = None
with patch.object(mgr, 'refresh_cert_status', return_value={'status': 'valid'}) as mock_ref:
with patch.object(mgr, 'get_cert_status', return_value={'status': 'valid'}):
mgr.get_cert_status_fresh()
mock_ref.assert_called_once()
def test_get_cert_status_fresh_skips_refresh_when_recent(self):
"""get_cert_status_fresh skips refresh when cache is fresh."""
import time
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
mgr._cert_refreshed_at = time.monotonic() # just refreshed
with patch.object(mgr, 'refresh_cert_status') as mock_ref:
with patch.object(mgr, 'get_cert_status', return_value={'status': 'valid'}):
mgr.get_cert_status_fresh(max_age_seconds=300)
mock_ref.assert_not_called()
class TestGetCertStatusEnriched(unittest.TestCase):
"""get_cert_status() returns domain, domain_mode, cert_type alongside tls fields."""
def test_includes_domain_and_mode_for_pic_ngo(self):
mgr = _mgr(identity={
'cell_name': 'alpha',
'domain_mode': 'pic_ngo',
'tls': {'status': 'valid', 'expiry': '2026-12-01T00:00:00+00:00', 'days_remaining': 180},
})
s = mgr.get_cert_status()
self.assertEqual(s['domain_mode'], 'pic_ngo')
self.assertEqual(s['domain'], '*.alpha.pic.ngo')
self.assertEqual(s['cert_type'], 'acme')
self.assertEqual(s['status'], 'valid')
def test_cert_type_is_internal_for_lan_mode(self):
mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan', 'tls': {}})
s = mgr.get_cert_status()
self.assertEqual(s['cert_type'], 'internal')
self.assertIsNone(s['domain'])
def test_cert_type_is_custom_when_tls_says_so(self):
mgr = _mgr(identity={
'cell_name': 'x',
'domain_mode': 'lan',
'tls': {'cert_type': 'custom', 'status': 'valid',
'expiry': '2027-01-01T00:00:00+00:00', 'days_remaining': 200},
})
s = mgr.get_cert_status()
self.assertEqual(s['cert_type'], 'custom')
def test_domain_label_cloudflare(self):
ident = {'domain_mode': 'cloudflare', 'domain_name': 'example.com'}
self.assertEqual(CaddyManager._domain_label(ident), '*.example.com')
def test_domain_label_duckdns(self):
ident = {'cell_name': 'beta', 'domain_mode': 'duckdns'}
self.assertEqual(CaddyManager._domain_label(ident), '*.beta.duckdns.org')
def test_domain_label_http01(self):
ident = {'domain_mode': 'http01', 'domain_name': 'myhost.noip.me'}
self.assertEqual(CaddyManager._domain_label(ident), 'myhost.noip.me')
def test_domain_label_lan_is_none(self):
self.assertIsNone(CaddyManager._domain_label({'domain_mode': 'lan'}))
class TestRenewCert(unittest.TestCase):
"""renew_cert() — mode guard, reload call, cache invalidation."""
def test_lan_mode_returns_error(self):
mgr = _mgr(identity={'domain_mode': 'lan'})
result = mgr.renew_cert()
self.assertFalse(result['ok'])
self.assertIn('LAN', result['error'])
def test_acme_mode_calls_regenerate(self):
mgr = _mgr(identity={'domain_mode': 'pic_ngo'})
with patch.object(mgr, 'regenerate_with_installed', return_value=True) as mock_regen:
result = mgr.renew_cert()
mock_regen.assert_called_once_with([])
self.assertTrue(result['ok'])
self.assertEqual(result['status'], 'pending')
def test_reload_failure_propagated(self):
mgr = _mgr(identity={'domain_mode': 'cloudflare'})
with patch.object(mgr, 'regenerate_with_installed', return_value=False):
result = mgr.renew_cert()
self.assertFalse(result['ok'])
self.assertIn('reload failed', result['error'])
def test_invalidates_cache_on_success(self):
import time
mgr = _mgr(identity={'domain_mode': 'pic_ngo'})
mgr._cert_refreshed_at = time.monotonic()
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
mgr.renew_cert()
self.assertIsNone(mgr._cert_refreshed_at)
class TestUploadCustomCert(unittest.TestCase):
"""upload_custom_cert() — validation, file writes, identity persistence, Caddyfile regen."""
def _make_pem_cert(self, days_remaining: int = 90):
"""Return (cert_pem, key_pem) for a self-signed cert."""
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import datetime
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
now = datetime.datetime.now(datetime.timezone.utc)
expiry = now + datetime.timedelta(days=days_remaining)
not_before = (now - datetime.timedelta(days=abs(days_remaining) + 10)
if days_remaining < 0 else now - datetime.timedelta(days=1))
cert = (
x509.CertificateBuilder()
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(not_before)
.not_valid_after(expiry)
.sign(key, hashes.SHA256())
)
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode()
key_pem = key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
).decode()
return cert_pem, key_pem
def test_rejects_invalid_cert_pem(self):
mgr = _mgr()
result = mgr.upload_custom_cert('not a cert', '-----BEGIN PRIVATE KEY-----\nXXX\n-----END PRIVATE KEY-----')
self.assertFalse(result['ok'])
self.assertIn('Invalid certificate', result['error'])
def test_rejects_invalid_key_pem(self):
mgr = _mgr()
cert_pem, _ = self._make_pem_cert()
result = mgr.upload_custom_cert(cert_pem, 'not a key')
self.assertFalse(result['ok'])
self.assertIn('Invalid private key', result['error'])
def test_writes_files_to_certs_dir(self):
mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'})
cert_pem, key_pem = self._make_pem_cert()
written = {}
def fake_open(path, mode='r', **kw):
import unittest.mock
m = unittest.mock.mock_open()()
if 'w' in mode:
written[path] = True
return m
with patch('builtins.open', side_effect=fake_open):
with patch('os.makedirs'):
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
mgr.upload_custom_cert(cert_pem, key_pem)
self.assertTrue(any('cert.pem' in p for p in written))
self.assertTrue(any('key.pem' in p for p in written))
def test_persists_custom_cert_type_to_identity(self):
mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'})
cert_pem, key_pem = self._make_pem_cert(days_remaining=90)
with patch('builtins.open', unittest.mock.mock_open()):
with patch('os.makedirs'):
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
result = mgr.upload_custom_cert(cert_pem, key_pem)
self.assertTrue(result['ok'])
self.assertEqual(result['cert_type'], 'custom')
self.assertEqual(result['status'], 'valid')
mgr.config_manager.set_identity_field.assert_called_once()
call_args = mgr.config_manager.set_identity_field.call_args
self.assertEqual(call_args[0][0], 'tls')
self.assertEqual(call_args[0][1]['cert_type'], 'custom')
def test_expired_cert_flagged_as_expired(self):
mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'})
cert_pem, key_pem = self._make_pem_cert(days_remaining=-5)
with patch('builtins.open', unittest.mock.mock_open()):
with patch('os.makedirs'):
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
result = mgr.upload_custom_cert(cert_pem, key_pem)
self.assertEqual(result['status'], 'expired')
def test_file_write_failure_returns_error(self):
mgr = _mgr(identity={'domain_mode': 'lan'})
cert_pem, key_pem = self._make_pem_cert()
with patch('os.makedirs'):
with patch('builtins.open', side_effect=OSError('no space')):
result = mgr.upload_custom_cert(cert_pem, key_pem)
self.assertFalse(result['ok'])
self.assertIn('Failed to write', result['error'])
class TestCaddyfileLanCustomCert(unittest.TestCase):
"""_caddyfile_lan() uses the custom cert path when cert_type=custom."""
def test_default_uses_internal_cert_path(self):
mgr = _mgr(identity={'cell_name': 'mycell', 'domain_mode': 'lan'})
out = mgr.generate_caddyfile({'cell_name': 'mycell', 'domain_mode': 'lan'}, [])
self.assertIn('/etc/caddy/internal/cert.pem', out)
def test_custom_cert_type_uses_shared_cert_path(self):
mgr = _mgr(identity={
'cell_name': 'mycell',
'domain_mode': 'lan',
'tls': {'cert_type': 'custom'},
})
out = mgr.generate_caddyfile({'cell_name': 'mycell', 'domain_mode': 'lan'}, [])
self.assertIn('/config/caddy/certs/cert.pem', out)
self.assertNotIn('/etc/caddy/internal/cert.pem', out)
class TestPicNgoNoTokenFallback(unittest.TestCase):
"""pic_ngo mode with no token falls back to lan so Caddy starts cleanly."""
def test_empty_token_generates_lan_caddyfile(self):
mgr = _mgr()
mgr.config_manager.configs = {'ddns': {'token': '', 'url': 'https://ddns.pic.ngo'}}
with patch.dict(os.environ, {}, clear=False):
os.environ.pop('DDNS_TOKEN', None)
os.environ.pop('DDNS_URL', None)
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
self.assertIn('auto_https off', out)
self.assertNotIn('dns pic_ngo', out)
self.assertNotIn('token', out)
def test_missing_ddns_config_generates_lan_caddyfile(self):
mgr = _mgr()
mgr.config_manager.configs = {}
with patch.dict(os.environ, {}, clear=False):
os.environ.pop('DDNS_TOKEN', None)
os.environ.pop('DDNS_URL', None)
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
self.assertIn('auto_https off', out)
self.assertNotIn('dns pic_ngo', out)
class TestDdnsApiStripsLegacySuffix(unittest.TestCase):
"""_caddyfile_pic_ngo strips /api/v1 from ddns_api so the plugin doesn't double it."""
def test_api_v1_suffix_stripped_from_config_url(self):
mgr = _mgr()
mgr.config_manager.configs = {
'ddns': {'token': 'tok', 'url': 'https://ddns.pic.ngo/api/v1'},
}
with patch.dict(os.environ, {}, clear=False):
os.environ.pop('DDNS_URL', None)
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
self.assertIn('api_base_url https://ddns.pic.ngo', out)
self.assertNotIn('api_base_url https://ddns.pic.ngo/api/v1', out)
def test_clean_url_is_unchanged(self):
mgr = _mgr()
mgr.config_manager.configs = {
'ddns': {'token': 'tok', 'url': 'https://ddns.pic.ngo'},
}
with patch.dict(os.environ, {}, clear=False):
os.environ.pop('DDNS_URL', None)
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
self.assertIn('api_base_url https://ddns.pic.ngo', out)
if __name__ == '__main__':
unittest.main()