da302b5d54
Unit Tests / test (push) Successful in 7m32s
A stale or empty-token Caddyfile on disk caused Caddy to reject the /load request, so the Renew button appeared to do nothing. Now renew_cert() calls regenerate_with_installed([]) first, which writes a fresh Caddyfile from current identity/config before reloading Caddy. This ensures a broken on-disk file never blocks ACME renewal. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
695 lines
30 KiB
Python
695 lines
30 KiB
Python
"""Tests for CaddyManager — Caddyfile generation per domain mode plus
|
|
admin-API reload, health check, and consecutive-failure bookkeeping.
|
|
"""
|
|
import os
|
|
import sys
|
|
import unittest
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import requests
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
|
|
|
|
from caddy_manager import CaddyManager # noqa: E402
|
|
|
|
|
|
def _mgr(tmpdir=None, identity=None):
|
|
"""Build a CaddyManager backed by a mock config_manager."""
|
|
cm = MagicMock()
|
|
cm.get_identity.return_value = identity or {}
|
|
mgr = CaddyManager(
|
|
config_manager=cm,
|
|
data_dir=tmpdir or '/tmp/pic-test-data',
|
|
config_dir=tmpdir or '/tmp/pic-test-config',
|
|
)
|
|
return mgr
|
|
|
|
|
|
CALENDAR_ROUTE = (
|
|
"handle /calendar* {\n"
|
|
" reverse_proxy cell-radicale:5232\n"
|
|
"}"
|
|
)
|
|
FILES_ROUTE = (
|
|
"handle /files* {\n"
|
|
" reverse_proxy cell-filegator:8080\n"
|
|
"}"
|
|
)
|
|
|
|
|
|
class TestGenerateCaddyfileLan(unittest.TestCase):
|
|
def test_lan_mode_has_auto_https_off_and_no_acme(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'mycell', 'domain_mode': 'lan'}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('auto_https off', out)
|
|
# No ACME anywhere
|
|
self.assertNotIn('acme_ca', out)
|
|
self.assertNotIn('acme_email', out)
|
|
self.assertNotIn('dns pic_ngo', out)
|
|
self.assertNotIn('dns cloudflare', out)
|
|
# Internal-CA TLS pair
|
|
self.assertIn('tls /etc/caddy/internal/cert.pem '
|
|
'/etc/caddy/internal/key.pem', out)
|
|
# Cell hostname plus virtual IP listener
|
|
self.assertIn('http://mycell.cell', out)
|
|
self.assertIn('http://172.20.0.2:80', out)
|
|
|
|
|
|
class TestGenerateCaddyfilePicNgo(unittest.TestCase):
|
|
def test_pic_ngo_has_dns_plugin_and_wildcard(self):
|
|
mgr = _mgr()
|
|
mgr.config_manager.configs = {
|
|
'ddns': {'token': 'TESTSECRET123', 'url': 'https://ddns.pic.ngo/api/v1'},
|
|
}
|
|
identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}
|
|
with unittest.mock.patch.dict(os.environ, {'DDNS_URL': 'https://ddns.pic.ngo/api/v1'}):
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('dns pic_ngo', out)
|
|
self.assertIn('*.alpha.pic.ngo', out)
|
|
self.assertIn('alpha.pic.ngo', out)
|
|
# Registration token (not TOTP secret) is embedded — no {$VAR} placeholders
|
|
self.assertIn('token TESTSECRET123', out)
|
|
# /api/v1 is stripped — the plugin appends it itself
|
|
self.assertIn('api_base_url https://ddns.pic.ngo', out)
|
|
self.assertNotIn('api_base_url https://ddns.pic.ngo/api/v1', out)
|
|
self.assertNotIn('{$PIC_NGO_DDNS_TOKEN}', out)
|
|
self.assertNotIn('{$PIC_NGO_DDNS_API}', out)
|
|
self.assertIn('email admin@alpha.pic.ngo', out)
|
|
# acme_ca is omitted when ACME_CA_URL is not set
|
|
self.assertNotIn('acme_ca', out)
|
|
|
|
def test_pic_ngo_acme_ca_included_when_env_set(self):
|
|
mgr = _mgr()
|
|
mgr.config_manager.configs = {'ddns': {'token': 'TESTSECRET123'}}
|
|
identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}
|
|
with unittest.mock.patch.dict(os.environ, {
|
|
'DDNS_URL': 'https://ddns.pic.ngo/api/v1',
|
|
'ACME_CA_URL': 'https://acme-staging-v02.api.letsencrypt.org/directory',
|
|
}):
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('acme_ca https://acme-staging-v02.api.letsencrypt.org/directory', out)
|
|
|
|
def test_pic_ngo_has_api_route_without_registry(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
# Without a registry only the api block is present
|
|
self.assertIn('@api host api.alpha.pic.ngo', out)
|
|
self.assertIn('reverse_proxy cell-api:3000', out)
|
|
self.assertNotIn('@calendar', out)
|
|
self.assertNotIn('@mail', out)
|
|
self.assertNotIn('@files', out)
|
|
|
|
|
|
class TestGenerateCaddyfileCloudflare(unittest.TestCase):
|
|
def test_cloudflare_has_dns_cloudflare(self):
|
|
mgr = _mgr()
|
|
identity = {
|
|
'cell_name': 'beta',
|
|
'domain_mode': 'cloudflare',
|
|
'domain_name': 'example.com',
|
|
}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('dns cloudflare {$CF_API_TOKEN}', out)
|
|
self.assertIn('*.example.com', out)
|
|
self.assertIn('email {$ACME_EMAIL}', out)
|
|
# acme_ca is omitted when ACME_CA_URL is not set in the environment
|
|
self.assertNotIn('acme_ca', out)
|
|
|
|
def test_caddyfile_cloudflare_uses_domain_name(self):
|
|
"""Caddyfile must use domain_name for TLS host, not any 'custom_domain' key."""
|
|
mgr = _mgr()
|
|
identity = {
|
|
'cell_name': 'beta',
|
|
'domain_mode': 'cloudflare',
|
|
'domain_name': 'home.example.com',
|
|
'domain': 'home.local',
|
|
}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('*.home.example.com', out)
|
|
self.assertIn('home.example.com', out)
|
|
# Must not use the internal domain for TLS
|
|
self.assertNotIn('*.home.local', out)
|
|
# 'custom_domain' must not appear literally as a key in the output
|
|
self.assertNotIn('custom_domain', out)
|
|
# Without a registry only the api block is emitted for subdomain routing
|
|
self.assertIn('@api host api.home.example.com', out)
|
|
self.assertNotIn('@calendar', out)
|
|
self.assertNotIn('@files', out)
|
|
|
|
|
|
class TestGenerateCaddyfileDuckDns(unittest.TestCase):
|
|
def test_duckdns_has_dns_duckdns(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'gamma', 'domain_mode': 'duckdns'}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('dns duckdns {$DUCKDNS_TOKEN}', out)
|
|
self.assertIn('*.gamma.duckdns.org', out)
|
|
self.assertIn('@api host api.gamma.duckdns.org', out)
|
|
self.assertNotIn('@calendar', out)
|
|
self.assertNotIn('@files', out)
|
|
|
|
|
|
class TestGenerateCaddyfileHttp01(unittest.TestCase):
|
|
def test_http01_no_tls_block_and_per_service_blocks(self):
|
|
mgr = _mgr()
|
|
identity = {
|
|
'cell_name': 'delta',
|
|
'domain_mode': 'http01',
|
|
'domain_name': 'delta.noip.me',
|
|
}
|
|
# Store-plugin service (not a core service name)
|
|
services = [
|
|
{'name': 'chat', 'caddy_route': 'reverse_proxy cell-chat:8090'},
|
|
]
|
|
out = mgr.generate_caddyfile(identity, services)
|
|
# No wildcard, no DNS-01 plugins.
|
|
self.assertNotIn('*.delta', out)
|
|
self.assertNotIn('dns ', out)
|
|
# No explicit tls block — Caddy uses HTTP-01 by default.
|
|
self.assertNotIn('tls {', out)
|
|
# Without a registry only the api block is generated
|
|
self.assertIn('api.delta.noip.me {', out)
|
|
self.assertNotIn('calendar.delta.noip.me {', out)
|
|
self.assertNotIn('files.delta.noip.me {', out)
|
|
self.assertNotIn('mail.delta.noip.me {', out)
|
|
# Installed plugin service block still works
|
|
self.assertIn('chat.delta.noip.me {', out)
|
|
self.assertIn('reverse_proxy cell-chat:8090', out)
|
|
|
|
def test_http01_installed_service_with_caddy_route_appears(self):
|
|
"""An installed service with a caddy_route produces its own per-host block."""
|
|
mgr = _mgr()
|
|
identity = {
|
|
'cell_name': 'delta',
|
|
'domain_mode': 'http01',
|
|
'domain_name': 'delta.noip.me',
|
|
}
|
|
services = [{'name': 'notes', 'caddy_route': 'reverse_proxy cell-other:9000'}]
|
|
out = mgr.generate_caddyfile(identity, services)
|
|
self.assertIn('notes.delta.noip.me {', out)
|
|
self.assertIn('reverse_proxy cell-other:9000', out)
|
|
|
|
|
|
class TestServiceRoutesIncluded(unittest.TestCase):
|
|
def test_installed_service_route_appears_in_output(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'eps', 'domain_mode': 'lan'}
|
|
services = [
|
|
{'name': 'calendar', 'caddy_route': CALENDAR_ROUTE},
|
|
{'name': 'files', 'caddy_route': FILES_ROUTE},
|
|
]
|
|
out = mgr.generate_caddyfile(identity, services)
|
|
self.assertIn('handle /calendar*', out)
|
|
self.assertIn('reverse_proxy cell-radicale:5232', out)
|
|
self.assertIn('handle /files*', out)
|
|
self.assertIn('reverse_proxy cell-filegator:8080', out)
|
|
# Core routes still emitted
|
|
self.assertIn('reverse_proxy cell-api:3000', out)
|
|
self.assertIn('reverse_proxy cell-webui:80', out)
|
|
|
|
|
|
class TestReloadCaddyAdminAPI(unittest.TestCase):
|
|
def test_reload_calls_admin_api_load_endpoint(self):
|
|
mgr = _mgr()
|
|
# Point at a tmp Caddyfile so we can read it back during reload.
|
|
import tempfile
|
|
tmp = tempfile.NamedTemporaryFile('w', delete=False, suffix='.caddyfile')
|
|
tmp.write(":80 { reverse_proxy cell-webui:80 }\n")
|
|
tmp.close()
|
|
mgr.caddyfile_path = tmp.name
|
|
|
|
with patch('caddy_manager.requests.post') as mock_post:
|
|
mock_post.return_value = MagicMock(status_code=200, text='ok')
|
|
ok = mgr.reload_caddy()
|
|
|
|
self.assertTrue(ok)
|
|
mock_post.assert_called_once()
|
|
args, kwargs = mock_post.call_args
|
|
# First positional arg is the URL
|
|
self.assertEqual(args[0], 'http://cell-caddy:2019/load')
|
|
self.assertEqual(kwargs['headers']['Content-Type'], 'text/caddyfile')
|
|
self.assertIn('cell-webui:80', kwargs['data'])
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
class TestHealthCheck(unittest.TestCase):
|
|
def test_returns_true_on_200(self):
|
|
mgr = _mgr()
|
|
with patch('caddy_manager.requests.get') as mock_get:
|
|
mock_get.return_value = MagicMock(status_code=200)
|
|
self.assertTrue(mgr.check_caddy_health())
|
|
mock_get.assert_called_once()
|
|
# URL must be the admin API root
|
|
self.assertIn('cell-caddy:2019', mock_get.call_args[0][0])
|
|
|
|
def test_returns_false_on_connection_error(self):
|
|
mgr = _mgr()
|
|
with patch('caddy_manager.requests.get',
|
|
side_effect=requests.ConnectionError('refused')):
|
|
self.assertFalse(mgr.check_caddy_health())
|
|
|
|
def test_returns_false_on_non_200(self):
|
|
mgr = _mgr()
|
|
with patch('caddy_manager.requests.get') as mock_get:
|
|
mock_get.return_value = MagicMock(status_code=500)
|
|
self.assertFalse(mgr.check_caddy_health())
|
|
|
|
|
|
class TestFailureCounter(unittest.TestCase):
|
|
def test_increments_and_resets(self):
|
|
mgr = _mgr()
|
|
self.assertEqual(mgr.get_health_failure_count(), 0)
|
|
self.assertEqual(mgr.increment_health_failure(), 1)
|
|
self.assertEqual(mgr.increment_health_failure(), 2)
|
|
self.assertEqual(mgr.increment_health_failure(), 3)
|
|
self.assertEqual(mgr.get_health_failure_count(), 3)
|
|
mgr.reset_health_failures()
|
|
self.assertEqual(mgr.get_health_failure_count(), 0)
|
|
|
|
|
|
class TestCertStatus(unittest.TestCase):
|
|
def test_returns_default_when_no_tls_in_identity(self):
|
|
mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan'})
|
|
out = mgr.get_cert_status()
|
|
self.assertEqual(out['status'], 'unknown')
|
|
self.assertIsNone(out['expiry'])
|
|
self.assertIsNone(out['days_remaining'])
|
|
|
|
def test_returns_tls_block_when_present(self):
|
|
mgr = _mgr(identity={
|
|
'cell_name': 'x',
|
|
'domain_mode': 'pic_ngo',
|
|
'tls': {
|
|
'status': 'valid',
|
|
'expiry': '2026-08-01T00:00:00Z',
|
|
'days_remaining': 84,
|
|
},
|
|
})
|
|
out = mgr.get_cert_status()
|
|
self.assertEqual(out['status'], 'valid')
|
|
self.assertEqual(out['expiry'], '2026-08-01T00:00:00Z')
|
|
self.assertEqual(out['days_remaining'], 84)
|
|
|
|
|
|
class TestCaddyManagerIdentityChangedSubscription(unittest.TestCase):
|
|
def test_subscribes_to_identity_changed_on_init(self):
|
|
"""When service_bus is provided, CaddyManager subscribes to IDENTITY_CHANGED."""
|
|
from service_bus import EventType
|
|
mock_bus = MagicMock()
|
|
mgr = CaddyManager(config_manager=MagicMock(), service_bus=mock_bus)
|
|
mock_bus.subscribe_to_event.assert_called_once_with(
|
|
EventType.IDENTITY_CHANGED, mgr._on_identity_changed
|
|
)
|
|
|
|
def test_no_subscription_without_service_bus(self):
|
|
"""When service_bus is omitted, no subscription is attempted."""
|
|
mock_bus = MagicMock()
|
|
CaddyManager(config_manager=MagicMock())
|
|
mock_bus.subscribe_to_event.assert_not_called()
|
|
|
|
def test_on_identity_changed_calls_regenerate_with_installed(self):
|
|
"""_on_identity_changed calls regenerate_with_installed([])."""
|
|
mgr = _mgr()
|
|
with patch.object(mgr, 'regenerate_with_installed', return_value=True) as mock_regen:
|
|
event = MagicMock()
|
|
mgr._on_identity_changed(event)
|
|
mock_regen.assert_called_once_with([])
|
|
|
|
def test_on_identity_changed_swallows_exceptions(self):
|
|
"""_on_identity_changed must not propagate exceptions."""
|
|
mgr = _mgr()
|
|
with patch.object(mgr, 'regenerate_with_installed', side_effect=Exception('boom')):
|
|
event = MagicMock()
|
|
mgr._on_identity_changed(event) # must not raise
|
|
|
|
|
|
class TestRefreshCertStatus(unittest.TestCase):
|
|
"""refresh_cert_status() + _check_cert_via_ssl()."""
|
|
|
|
def _make_der_cert(self, days_remaining: int) -> bytes:
|
|
"""Return a minimal self-signed DER cert valid for *days_remaining* days."""
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
import datetime
|
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
expiry = now + datetime.timedelta(days=days_remaining)
|
|
cert = (
|
|
x509.CertificateBuilder()
|
|
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
|
|
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
|
|
.public_key(key.public_key())
|
|
.serial_number(x509.random_serial_number())
|
|
.not_valid_before(expiry - datetime.timedelta(days=30))
|
|
.not_valid_after(expiry)
|
|
.sign(key, hashes.SHA256())
|
|
)
|
|
return cert.public_bytes(serialization.Encoding.DER)
|
|
|
|
def test_check_cert_via_ssl_returns_none_on_connection_error(self):
|
|
"""_check_cert_via_ssl returns None when connection fails."""
|
|
with patch('caddy_manager._socket.create_connection', side_effect=OSError('refused')):
|
|
result = CaddyManager._check_cert_via_ssl('host', 443)
|
|
self.assertIsNone(result)
|
|
|
|
def test_check_cert_via_ssl_returns_valid_status(self):
|
|
"""_check_cert_via_ssl returns valid status for a future-dated cert."""
|
|
der = self._make_der_cert(60)
|
|
mock_tls = MagicMock()
|
|
mock_tls.__enter__ = MagicMock(return_value=mock_tls)
|
|
mock_tls.__exit__ = MagicMock(return_value=False)
|
|
mock_tls.getpeercert.return_value = der
|
|
mock_raw = MagicMock()
|
|
mock_raw.__enter__ = MagicMock(return_value=mock_raw)
|
|
mock_raw.__exit__ = MagicMock(return_value=False)
|
|
with patch('caddy_manager._socket.create_connection', return_value=mock_raw):
|
|
with patch('caddy_manager._ssl.create_default_context') as mock_ctx:
|
|
mock_ctx.return_value.wrap_socket.return_value = mock_tls
|
|
result = CaddyManager._check_cert_via_ssl('host', 443)
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result['status'], 'valid')
|
|
self.assertGreater(result['days_remaining'], 50)
|
|
|
|
def test_check_cert_via_ssl_returns_expired_for_past_cert(self):
|
|
"""_check_cert_via_ssl returns expired when cert is in the past."""
|
|
der = self._make_der_cert(-5)
|
|
mock_tls = MagicMock()
|
|
mock_tls.__enter__ = MagicMock(return_value=mock_tls)
|
|
mock_tls.__exit__ = MagicMock(return_value=False)
|
|
mock_tls.getpeercert.return_value = der
|
|
mock_raw = MagicMock()
|
|
mock_raw.__enter__ = MagicMock(return_value=mock_raw)
|
|
mock_raw.__exit__ = MagicMock(return_value=False)
|
|
with patch('caddy_manager._socket.create_connection', return_value=mock_raw):
|
|
with patch('caddy_manager._ssl.create_default_context') as mock_ctx:
|
|
mock_ctx.return_value.wrap_socket.return_value = mock_tls
|
|
result = CaddyManager._check_cert_via_ssl('host', 443)
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result['status'], 'expired')
|
|
self.assertLess(result['days_remaining'], 0)
|
|
|
|
def test_refresh_cert_status_lan_mode_returns_internal(self):
|
|
"""LAN mode always returns status='internal' without SSL check."""
|
|
mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan'})
|
|
with patch.object(CaddyManager, '_check_cert_via_ssl') as mock_ssl:
|
|
result = mgr.refresh_cert_status()
|
|
mock_ssl.assert_not_called()
|
|
self.assertEqual(result['status'], 'internal')
|
|
|
|
def test_refresh_cert_status_acme_mode_calls_ssl_check(self):
|
|
"""ACME mode calls _check_cert_via_ssl and persists the result."""
|
|
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
|
|
expected = {'status': 'valid', 'expiry': '2026-12-01T00:00:00+00:00', 'days_remaining': 179}
|
|
with patch.object(CaddyManager, '_check_cert_via_ssl', return_value=expected):
|
|
result = mgr.refresh_cert_status()
|
|
self.assertEqual(result['status'], 'valid')
|
|
# Should have been persisted to identity
|
|
mgr.config_manager.set_identity_field.assert_called_with('tls', expected)
|
|
|
|
def test_refresh_cert_status_ssl_failure_returns_unknown(self):
|
|
"""When SSL check returns None, status is 'unknown'."""
|
|
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
|
|
with patch.object(CaddyManager, '_check_cert_via_ssl', return_value=None):
|
|
result = mgr.refresh_cert_status()
|
|
self.assertEqual(result['status'], 'unknown')
|
|
|
|
def test_get_cert_status_fresh_refreshes_when_stale(self):
|
|
"""get_cert_status_fresh triggers a refresh when cache is None."""
|
|
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
|
|
mgr._cert_refreshed_at = None
|
|
with patch.object(mgr, 'refresh_cert_status', return_value={'status': 'valid'}) as mock_ref:
|
|
with patch.object(mgr, 'get_cert_status', return_value={'status': 'valid'}):
|
|
mgr.get_cert_status_fresh()
|
|
mock_ref.assert_called_once()
|
|
|
|
def test_get_cert_status_fresh_skips_refresh_when_recent(self):
|
|
"""get_cert_status_fresh skips refresh when cache is fresh."""
|
|
import time
|
|
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
|
|
mgr._cert_refreshed_at = time.monotonic() # just refreshed
|
|
with patch.object(mgr, 'refresh_cert_status') as mock_ref:
|
|
with patch.object(mgr, 'get_cert_status', return_value={'status': 'valid'}):
|
|
mgr.get_cert_status_fresh(max_age_seconds=300)
|
|
mock_ref.assert_not_called()
|
|
|
|
|
|
class TestGetCertStatusEnriched(unittest.TestCase):
|
|
"""get_cert_status() returns domain, domain_mode, cert_type alongside tls fields."""
|
|
|
|
def test_includes_domain_and_mode_for_pic_ngo(self):
|
|
mgr = _mgr(identity={
|
|
'cell_name': 'alpha',
|
|
'domain_mode': 'pic_ngo',
|
|
'tls': {'status': 'valid', 'expiry': '2026-12-01T00:00:00+00:00', 'days_remaining': 180},
|
|
})
|
|
s = mgr.get_cert_status()
|
|
self.assertEqual(s['domain_mode'], 'pic_ngo')
|
|
self.assertEqual(s['domain'], '*.alpha.pic.ngo')
|
|
self.assertEqual(s['cert_type'], 'acme')
|
|
self.assertEqual(s['status'], 'valid')
|
|
|
|
def test_cert_type_is_internal_for_lan_mode(self):
|
|
mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan', 'tls': {}})
|
|
s = mgr.get_cert_status()
|
|
self.assertEqual(s['cert_type'], 'internal')
|
|
self.assertIsNone(s['domain'])
|
|
|
|
def test_cert_type_is_custom_when_tls_says_so(self):
|
|
mgr = _mgr(identity={
|
|
'cell_name': 'x',
|
|
'domain_mode': 'lan',
|
|
'tls': {'cert_type': 'custom', 'status': 'valid',
|
|
'expiry': '2027-01-01T00:00:00+00:00', 'days_remaining': 200},
|
|
})
|
|
s = mgr.get_cert_status()
|
|
self.assertEqual(s['cert_type'], 'custom')
|
|
|
|
def test_domain_label_cloudflare(self):
|
|
ident = {'domain_mode': 'cloudflare', 'domain_name': 'example.com'}
|
|
self.assertEqual(CaddyManager._domain_label(ident), '*.example.com')
|
|
|
|
def test_domain_label_duckdns(self):
|
|
ident = {'cell_name': 'beta', 'domain_mode': 'duckdns'}
|
|
self.assertEqual(CaddyManager._domain_label(ident), '*.beta.duckdns.org')
|
|
|
|
def test_domain_label_http01(self):
|
|
ident = {'domain_mode': 'http01', 'domain_name': 'myhost.noip.me'}
|
|
self.assertEqual(CaddyManager._domain_label(ident), 'myhost.noip.me')
|
|
|
|
def test_domain_label_lan_is_none(self):
|
|
self.assertIsNone(CaddyManager._domain_label({'domain_mode': 'lan'}))
|
|
|
|
|
|
class TestRenewCert(unittest.TestCase):
|
|
"""renew_cert() — mode guard, reload call, cache invalidation."""
|
|
|
|
def test_lan_mode_returns_error(self):
|
|
mgr = _mgr(identity={'domain_mode': 'lan'})
|
|
result = mgr.renew_cert()
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('LAN', result['error'])
|
|
|
|
def test_acme_mode_calls_regenerate(self):
|
|
mgr = _mgr(identity={'domain_mode': 'pic_ngo'})
|
|
with patch.object(mgr, 'regenerate_with_installed', return_value=True) as mock_regen:
|
|
result = mgr.renew_cert()
|
|
mock_regen.assert_called_once_with([])
|
|
self.assertTrue(result['ok'])
|
|
self.assertEqual(result['status'], 'pending')
|
|
|
|
def test_reload_failure_propagated(self):
|
|
mgr = _mgr(identity={'domain_mode': 'cloudflare'})
|
|
with patch.object(mgr, 'regenerate_with_installed', return_value=False):
|
|
result = mgr.renew_cert()
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('reload failed', result['error'])
|
|
|
|
def test_invalidates_cache_on_success(self):
|
|
import time
|
|
mgr = _mgr(identity={'domain_mode': 'pic_ngo'})
|
|
mgr._cert_refreshed_at = time.monotonic()
|
|
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
|
|
mgr.renew_cert()
|
|
self.assertIsNone(mgr._cert_refreshed_at)
|
|
|
|
|
|
class TestUploadCustomCert(unittest.TestCase):
|
|
"""upload_custom_cert() — validation, file writes, identity persistence, Caddyfile regen."""
|
|
|
|
def _make_pem_cert(self, days_remaining: int = 90):
|
|
"""Return (cert_pem, key_pem) for a self-signed cert."""
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
import datetime
|
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
expiry = now + datetime.timedelta(days=days_remaining)
|
|
not_before = (now - datetime.timedelta(days=abs(days_remaining) + 10)
|
|
if days_remaining < 0 else now - datetime.timedelta(days=1))
|
|
cert = (
|
|
x509.CertificateBuilder()
|
|
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
|
|
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
|
|
.public_key(key.public_key())
|
|
.serial_number(x509.random_serial_number())
|
|
.not_valid_before(not_before)
|
|
.not_valid_after(expiry)
|
|
.sign(key, hashes.SHA256())
|
|
)
|
|
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode()
|
|
key_pem = key.private_bytes(
|
|
serialization.Encoding.PEM,
|
|
serialization.PrivateFormat.TraditionalOpenSSL,
|
|
serialization.NoEncryption(),
|
|
).decode()
|
|
return cert_pem, key_pem
|
|
|
|
def test_rejects_invalid_cert_pem(self):
|
|
mgr = _mgr()
|
|
result = mgr.upload_custom_cert('not a cert', '-----BEGIN PRIVATE KEY-----\nXXX\n-----END PRIVATE KEY-----')
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('Invalid certificate', result['error'])
|
|
|
|
def test_rejects_invalid_key_pem(self):
|
|
mgr = _mgr()
|
|
cert_pem, _ = self._make_pem_cert()
|
|
result = mgr.upload_custom_cert(cert_pem, 'not a key')
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('Invalid private key', result['error'])
|
|
|
|
def test_writes_files_to_certs_dir(self):
|
|
mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'})
|
|
cert_pem, key_pem = self._make_pem_cert()
|
|
written = {}
|
|
|
|
def fake_open(path, mode='r', **kw):
|
|
import unittest.mock
|
|
m = unittest.mock.mock_open()()
|
|
if 'w' in mode:
|
|
written[path] = True
|
|
return m
|
|
|
|
with patch('builtins.open', side_effect=fake_open):
|
|
with patch('os.makedirs'):
|
|
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
|
|
mgr.upload_custom_cert(cert_pem, key_pem)
|
|
|
|
self.assertTrue(any('cert.pem' in p for p in written))
|
|
self.assertTrue(any('key.pem' in p for p in written))
|
|
|
|
def test_persists_custom_cert_type_to_identity(self):
|
|
mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'})
|
|
cert_pem, key_pem = self._make_pem_cert(days_remaining=90)
|
|
|
|
with patch('builtins.open', unittest.mock.mock_open()):
|
|
with patch('os.makedirs'):
|
|
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
|
|
result = mgr.upload_custom_cert(cert_pem, key_pem)
|
|
|
|
self.assertTrue(result['ok'])
|
|
self.assertEqual(result['cert_type'], 'custom')
|
|
self.assertEqual(result['status'], 'valid')
|
|
mgr.config_manager.set_identity_field.assert_called_once()
|
|
call_args = mgr.config_manager.set_identity_field.call_args
|
|
self.assertEqual(call_args[0][0], 'tls')
|
|
self.assertEqual(call_args[0][1]['cert_type'], 'custom')
|
|
|
|
def test_expired_cert_flagged_as_expired(self):
|
|
mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'})
|
|
cert_pem, key_pem = self._make_pem_cert(days_remaining=-5)
|
|
|
|
with patch('builtins.open', unittest.mock.mock_open()):
|
|
with patch('os.makedirs'):
|
|
with patch.object(mgr, 'regenerate_with_installed', return_value=True):
|
|
result = mgr.upload_custom_cert(cert_pem, key_pem)
|
|
|
|
self.assertEqual(result['status'], 'expired')
|
|
|
|
def test_file_write_failure_returns_error(self):
|
|
mgr = _mgr(identity={'domain_mode': 'lan'})
|
|
cert_pem, key_pem = self._make_pem_cert()
|
|
with patch('os.makedirs'):
|
|
with patch('builtins.open', side_effect=OSError('no space')):
|
|
result = mgr.upload_custom_cert(cert_pem, key_pem)
|
|
self.assertFalse(result['ok'])
|
|
self.assertIn('Failed to write', result['error'])
|
|
|
|
|
|
class TestCaddyfileLanCustomCert(unittest.TestCase):
|
|
"""_caddyfile_lan() uses the custom cert path when cert_type=custom."""
|
|
|
|
def test_default_uses_internal_cert_path(self):
|
|
mgr = _mgr(identity={'cell_name': 'mycell', 'domain_mode': 'lan'})
|
|
out = mgr.generate_caddyfile({'cell_name': 'mycell', 'domain_mode': 'lan'}, [])
|
|
self.assertIn('/etc/caddy/internal/cert.pem', out)
|
|
|
|
def test_custom_cert_type_uses_shared_cert_path(self):
|
|
mgr = _mgr(identity={
|
|
'cell_name': 'mycell',
|
|
'domain_mode': 'lan',
|
|
'tls': {'cert_type': 'custom'},
|
|
})
|
|
out = mgr.generate_caddyfile({'cell_name': 'mycell', 'domain_mode': 'lan'}, [])
|
|
self.assertIn('/config/caddy/certs/cert.pem', out)
|
|
self.assertNotIn('/etc/caddy/internal/cert.pem', out)
|
|
|
|
|
|
class TestPicNgoNoTokenFallback(unittest.TestCase):
|
|
"""pic_ngo mode with no token falls back to lan so Caddy starts cleanly."""
|
|
|
|
def test_empty_token_generates_lan_caddyfile(self):
|
|
mgr = _mgr()
|
|
mgr.config_manager.configs = {'ddns': {'token': '', 'url': 'https://ddns.pic.ngo'}}
|
|
with patch.dict(os.environ, {}, clear=False):
|
|
os.environ.pop('DDNS_TOKEN', None)
|
|
os.environ.pop('DDNS_URL', None)
|
|
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
|
|
self.assertIn('auto_https off', out)
|
|
self.assertNotIn('dns pic_ngo', out)
|
|
self.assertNotIn('token', out)
|
|
|
|
def test_missing_ddns_config_generates_lan_caddyfile(self):
|
|
mgr = _mgr()
|
|
mgr.config_manager.configs = {}
|
|
with patch.dict(os.environ, {}, clear=False):
|
|
os.environ.pop('DDNS_TOKEN', None)
|
|
os.environ.pop('DDNS_URL', None)
|
|
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
|
|
self.assertIn('auto_https off', out)
|
|
self.assertNotIn('dns pic_ngo', out)
|
|
|
|
|
|
class TestDdnsApiStripsLegacySuffix(unittest.TestCase):
|
|
"""_caddyfile_pic_ngo strips /api/v1 from ddns_api so the plugin doesn't double it."""
|
|
|
|
def test_api_v1_suffix_stripped_from_config_url(self):
|
|
mgr = _mgr()
|
|
mgr.config_manager.configs = {
|
|
'ddns': {'token': 'tok', 'url': 'https://ddns.pic.ngo/api/v1'},
|
|
}
|
|
with patch.dict(os.environ, {}, clear=False):
|
|
os.environ.pop('DDNS_URL', None)
|
|
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
|
|
self.assertIn('api_base_url https://ddns.pic.ngo', out)
|
|
self.assertNotIn('api_base_url https://ddns.pic.ngo/api/v1', out)
|
|
|
|
def test_clean_url_is_unchanged(self):
|
|
mgr = _mgr()
|
|
mgr.config_manager.configs = {
|
|
'ddns': {'token': 'tok', 'url': 'https://ddns.pic.ngo'},
|
|
}
|
|
with patch.dict(os.environ, {}, clear=False):
|
|
os.environ.pop('DDNS_URL', None)
|
|
out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, [])
|
|
self.assertIn('api_base_url https://ddns.pic.ngo', out)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|