0267dce73d
Unit Tests / test (push) Successful in 11m18s
- CaddyManager: add refresh_cert_status() and get_cert_status_fresh() that open a live TLS connection to cell-caddy:443 to read cert expiry; avoids needing a volume mount into the API container - CaddyManager: periodic cert refresh in health_monitor_loop (every 60 cycles) - config.py PUT /api/ddns: publish IDENTITY_CHANGED so CaddyManager regenerates the Caddyfile immediately after any domain/cell_name change — previously the event was never fired from this route - config.py: remove all ip_utils.write_caddyfile() calls; CaddyManager is now the sole authority for Caddyfile generation - app.py: add GET /api/caddy/cert-status route - app.py: add GET /api/egress/status and PUT /api/egress/services/<id>/exit routes - Settings.jsx: display cert status badge (valid/expired/internal/unknown) with expiry date and days-remaining in the domain section - Tests: TestRefreshCertStatus (8 tests), TestDdnsConfigUpdatesFiresIdentityChanged, TestCaddyCertStatusRoute added; fix expired-cert helper to set not_valid_before relative to expiry so it's always earlier Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
442 lines
19 KiB
Python
442 lines
19 KiB
Python
"""Tests for CaddyManager — Caddyfile generation per domain mode plus
|
|
admin-API reload, health check, and consecutive-failure bookkeeping.
|
|
"""
|
|
import os
|
|
import sys
|
|
import unittest
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import requests
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
|
|
|
|
from caddy_manager import CaddyManager # noqa: E402
|
|
|
|
|
|
def _mgr(tmpdir=None, identity=None):
|
|
"""Build a CaddyManager backed by a mock config_manager."""
|
|
cm = MagicMock()
|
|
cm.get_identity.return_value = identity or {}
|
|
mgr = CaddyManager(
|
|
config_manager=cm,
|
|
data_dir=tmpdir or '/tmp/pic-test-data',
|
|
config_dir=tmpdir or '/tmp/pic-test-config',
|
|
)
|
|
return mgr
|
|
|
|
|
|
CALENDAR_ROUTE = (
|
|
"handle /calendar* {\n"
|
|
" reverse_proxy cell-radicale:5232\n"
|
|
"}"
|
|
)
|
|
FILES_ROUTE = (
|
|
"handle /files* {\n"
|
|
" reverse_proxy cell-filegator:8080\n"
|
|
"}"
|
|
)
|
|
|
|
|
|
class TestGenerateCaddyfileLan(unittest.TestCase):
|
|
def test_lan_mode_has_auto_https_off_and_no_acme(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'mycell', 'domain_mode': 'lan'}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('auto_https off', out)
|
|
# No ACME anywhere
|
|
self.assertNotIn('acme_ca', out)
|
|
self.assertNotIn('acme_email', out)
|
|
self.assertNotIn('dns pic_ngo', out)
|
|
self.assertNotIn('dns cloudflare', out)
|
|
# Internal-CA TLS pair
|
|
self.assertIn('tls /etc/caddy/internal/cert.pem '
|
|
'/etc/caddy/internal/key.pem', out)
|
|
# Cell hostname plus virtual IP listener
|
|
self.assertIn('http://mycell.cell', out)
|
|
self.assertIn('http://172.20.0.2:80', out)
|
|
|
|
|
|
class TestGenerateCaddyfilePicNgo(unittest.TestCase):
|
|
def test_pic_ngo_has_dns_plugin_and_wildcard(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}
|
|
import os
|
|
with unittest.mock.patch.dict(os.environ, {
|
|
'DDNS_TOTP_SECRET': 'TESTSECRET123',
|
|
'DDNS_URL': 'https://ddns.pic.ngo/api/v1',
|
|
}):
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('dns pic_ngo', out)
|
|
self.assertIn('*.alpha.pic.ngo', out)
|
|
self.assertIn('alpha.pic.ngo', out)
|
|
# Credentials are resolved at write time and embedded — no {$VAR} placeholders
|
|
self.assertIn('token TESTSECRET123', out)
|
|
self.assertIn('api_base_url https://ddns.pic.ngo/api/v1', out)
|
|
self.assertNotIn('{$PIC_NGO_DDNS_TOKEN}', out)
|
|
self.assertNotIn('{$PIC_NGO_DDNS_API}', out)
|
|
self.assertIn('email admin@alpha.pic.ngo', out)
|
|
# acme_ca is omitted when ACME_CA_URL is not set
|
|
self.assertNotIn('acme_ca', out)
|
|
|
|
def test_pic_ngo_acme_ca_included_when_env_set(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}
|
|
import os
|
|
with unittest.mock.patch.dict(os.environ, {
|
|
'DDNS_TOTP_SECRET': 'TESTSECRET123',
|
|
'DDNS_URL': 'https://ddns.pic.ngo/api/v1',
|
|
'ACME_CA_URL': 'https://acme-staging-v02.api.letsencrypt.org/directory',
|
|
}):
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('acme_ca https://acme-staging-v02.api.letsencrypt.org/directory', out)
|
|
|
|
def test_pic_ngo_has_api_route_without_registry(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
# Without a registry only the api block is present
|
|
self.assertIn('@api host api.alpha.pic.ngo', out)
|
|
self.assertIn('reverse_proxy cell-api:3000', out)
|
|
self.assertNotIn('@calendar', out)
|
|
self.assertNotIn('@mail', out)
|
|
self.assertNotIn('@files', out)
|
|
|
|
|
|
class TestGenerateCaddyfileCloudflare(unittest.TestCase):
|
|
def test_cloudflare_has_dns_cloudflare(self):
|
|
mgr = _mgr()
|
|
identity = {
|
|
'cell_name': 'beta',
|
|
'domain_mode': 'cloudflare',
|
|
'domain_name': 'example.com',
|
|
}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('dns cloudflare {$CF_API_TOKEN}', out)
|
|
self.assertIn('*.example.com', out)
|
|
self.assertIn('email {$ACME_EMAIL}', out)
|
|
# acme_ca is omitted when ACME_CA_URL is not set in the environment
|
|
self.assertNotIn('acme_ca', out)
|
|
|
|
def test_caddyfile_cloudflare_uses_domain_name(self):
|
|
"""Caddyfile must use domain_name for TLS host, not any 'custom_domain' key."""
|
|
mgr = _mgr()
|
|
identity = {
|
|
'cell_name': 'beta',
|
|
'domain_mode': 'cloudflare',
|
|
'domain_name': 'home.example.com',
|
|
'domain': 'home.local',
|
|
}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('*.home.example.com', out)
|
|
self.assertIn('home.example.com', out)
|
|
# Must not use the internal domain for TLS
|
|
self.assertNotIn('*.home.local', out)
|
|
# 'custom_domain' must not appear literally as a key in the output
|
|
self.assertNotIn('custom_domain', out)
|
|
# Without a registry only the api block is emitted for subdomain routing
|
|
self.assertIn('@api host api.home.example.com', out)
|
|
self.assertNotIn('@calendar', out)
|
|
self.assertNotIn('@files', out)
|
|
|
|
|
|
class TestGenerateCaddyfileDuckDns(unittest.TestCase):
|
|
def test_duckdns_has_dns_duckdns(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'gamma', 'domain_mode': 'duckdns'}
|
|
out = mgr.generate_caddyfile(identity, [])
|
|
self.assertIn('dns duckdns {$DUCKDNS_TOKEN}', out)
|
|
self.assertIn('*.gamma.duckdns.org', out)
|
|
self.assertIn('@api host api.gamma.duckdns.org', out)
|
|
self.assertNotIn('@calendar', out)
|
|
self.assertNotIn('@files', out)
|
|
|
|
|
|
class TestGenerateCaddyfileHttp01(unittest.TestCase):
|
|
def test_http01_no_tls_block_and_per_service_blocks(self):
|
|
mgr = _mgr()
|
|
identity = {
|
|
'cell_name': 'delta',
|
|
'domain_mode': 'http01',
|
|
'domain_name': 'delta.noip.me',
|
|
}
|
|
# Store-plugin service (not a core service name)
|
|
services = [
|
|
{'name': 'chat', 'caddy_route': 'reverse_proxy cell-chat:8090'},
|
|
]
|
|
out = mgr.generate_caddyfile(identity, services)
|
|
# No wildcard, no DNS-01 plugins.
|
|
self.assertNotIn('*.delta', out)
|
|
self.assertNotIn('dns ', out)
|
|
# No explicit tls block — Caddy uses HTTP-01 by default.
|
|
self.assertNotIn('tls {', out)
|
|
# Without a registry only the api block is generated
|
|
self.assertIn('api.delta.noip.me {', out)
|
|
self.assertNotIn('calendar.delta.noip.me {', out)
|
|
self.assertNotIn('files.delta.noip.me {', out)
|
|
self.assertNotIn('mail.delta.noip.me {', out)
|
|
# Installed plugin service block still works
|
|
self.assertIn('chat.delta.noip.me {', out)
|
|
self.assertIn('reverse_proxy cell-chat:8090', out)
|
|
|
|
def test_http01_installed_service_with_caddy_route_appears(self):
|
|
"""An installed service with a caddy_route produces its own per-host block."""
|
|
mgr = _mgr()
|
|
identity = {
|
|
'cell_name': 'delta',
|
|
'domain_mode': 'http01',
|
|
'domain_name': 'delta.noip.me',
|
|
}
|
|
services = [{'name': 'notes', 'caddy_route': 'reverse_proxy cell-other:9000'}]
|
|
out = mgr.generate_caddyfile(identity, services)
|
|
self.assertIn('notes.delta.noip.me {', out)
|
|
self.assertIn('reverse_proxy cell-other:9000', out)
|
|
|
|
|
|
class TestServiceRoutesIncluded(unittest.TestCase):
|
|
def test_installed_service_route_appears_in_output(self):
|
|
mgr = _mgr()
|
|
identity = {'cell_name': 'eps', 'domain_mode': 'lan'}
|
|
services = [
|
|
{'name': 'calendar', 'caddy_route': CALENDAR_ROUTE},
|
|
{'name': 'files', 'caddy_route': FILES_ROUTE},
|
|
]
|
|
out = mgr.generate_caddyfile(identity, services)
|
|
self.assertIn('handle /calendar*', out)
|
|
self.assertIn('reverse_proxy cell-radicale:5232', out)
|
|
self.assertIn('handle /files*', out)
|
|
self.assertIn('reverse_proxy cell-filegator:8080', out)
|
|
# Core routes still emitted
|
|
self.assertIn('reverse_proxy cell-api:3000', out)
|
|
self.assertIn('reverse_proxy cell-webui:80', out)
|
|
|
|
|
|
class TestReloadCaddyAdminAPI(unittest.TestCase):
|
|
def test_reload_calls_admin_api_load_endpoint(self):
|
|
mgr = _mgr()
|
|
# Point at a tmp Caddyfile so we can read it back during reload.
|
|
import tempfile
|
|
tmp = tempfile.NamedTemporaryFile('w', delete=False, suffix='.caddyfile')
|
|
tmp.write(":80 { reverse_proxy cell-webui:80 }\n")
|
|
tmp.close()
|
|
mgr.caddyfile_path = tmp.name
|
|
|
|
with patch('caddy_manager.requests.post') as mock_post:
|
|
mock_post.return_value = MagicMock(status_code=200, text='ok')
|
|
ok = mgr.reload_caddy()
|
|
|
|
self.assertTrue(ok)
|
|
mock_post.assert_called_once()
|
|
args, kwargs = mock_post.call_args
|
|
# First positional arg is the URL
|
|
self.assertEqual(args[0], 'http://cell-caddy:2019/load')
|
|
self.assertEqual(kwargs['headers']['Content-Type'], 'text/caddyfile')
|
|
self.assertIn('cell-webui:80', kwargs['data'])
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
class TestHealthCheck(unittest.TestCase):
|
|
def test_returns_true_on_200(self):
|
|
mgr = _mgr()
|
|
with patch('caddy_manager.requests.get') as mock_get:
|
|
mock_get.return_value = MagicMock(status_code=200)
|
|
self.assertTrue(mgr.check_caddy_health())
|
|
mock_get.assert_called_once()
|
|
# URL must be the admin API root
|
|
self.assertIn('cell-caddy:2019', mock_get.call_args[0][0])
|
|
|
|
def test_returns_false_on_connection_error(self):
|
|
mgr = _mgr()
|
|
with patch('caddy_manager.requests.get',
|
|
side_effect=requests.ConnectionError('refused')):
|
|
self.assertFalse(mgr.check_caddy_health())
|
|
|
|
def test_returns_false_on_non_200(self):
|
|
mgr = _mgr()
|
|
with patch('caddy_manager.requests.get') as mock_get:
|
|
mock_get.return_value = MagicMock(status_code=500)
|
|
self.assertFalse(mgr.check_caddy_health())
|
|
|
|
|
|
class TestFailureCounter(unittest.TestCase):
|
|
def test_increments_and_resets(self):
|
|
mgr = _mgr()
|
|
self.assertEqual(mgr.get_health_failure_count(), 0)
|
|
self.assertEqual(mgr.increment_health_failure(), 1)
|
|
self.assertEqual(mgr.increment_health_failure(), 2)
|
|
self.assertEqual(mgr.increment_health_failure(), 3)
|
|
self.assertEqual(mgr.get_health_failure_count(), 3)
|
|
mgr.reset_health_failures()
|
|
self.assertEqual(mgr.get_health_failure_count(), 0)
|
|
|
|
|
|
class TestCertStatus(unittest.TestCase):
|
|
def test_returns_default_when_no_tls_in_identity(self):
|
|
mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan'})
|
|
out = mgr.get_cert_status()
|
|
self.assertEqual(out['status'], 'unknown')
|
|
self.assertIsNone(out['expiry'])
|
|
self.assertIsNone(out['days_remaining'])
|
|
|
|
def test_returns_tls_block_when_present(self):
|
|
mgr = _mgr(identity={
|
|
'cell_name': 'x',
|
|
'domain_mode': 'pic_ngo',
|
|
'tls': {
|
|
'status': 'valid',
|
|
'expiry': '2026-08-01T00:00:00Z',
|
|
'days_remaining': 84,
|
|
},
|
|
})
|
|
out = mgr.get_cert_status()
|
|
self.assertEqual(out['status'], 'valid')
|
|
self.assertEqual(out['expiry'], '2026-08-01T00:00:00Z')
|
|
self.assertEqual(out['days_remaining'], 84)
|
|
|
|
|
|
class TestCaddyManagerIdentityChangedSubscription(unittest.TestCase):
|
|
def test_subscribes_to_identity_changed_on_init(self):
|
|
"""When service_bus is provided, CaddyManager subscribes to IDENTITY_CHANGED."""
|
|
from service_bus import EventType
|
|
mock_bus = MagicMock()
|
|
mgr = CaddyManager(config_manager=MagicMock(), service_bus=mock_bus)
|
|
mock_bus.subscribe_to_event.assert_called_once_with(
|
|
EventType.IDENTITY_CHANGED, mgr._on_identity_changed
|
|
)
|
|
|
|
def test_no_subscription_without_service_bus(self):
|
|
"""When service_bus is omitted, no subscription is attempted."""
|
|
mock_bus = MagicMock()
|
|
CaddyManager(config_manager=MagicMock())
|
|
mock_bus.subscribe_to_event.assert_not_called()
|
|
|
|
def test_on_identity_changed_calls_regenerate_with_installed(self):
|
|
"""_on_identity_changed calls regenerate_with_installed([])."""
|
|
mgr = _mgr()
|
|
with patch.object(mgr, 'regenerate_with_installed', return_value=True) as mock_regen:
|
|
event = MagicMock()
|
|
mgr._on_identity_changed(event)
|
|
mock_regen.assert_called_once_with([])
|
|
|
|
def test_on_identity_changed_swallows_exceptions(self):
|
|
"""_on_identity_changed must not propagate exceptions."""
|
|
mgr = _mgr()
|
|
with patch.object(mgr, 'regenerate_with_installed', side_effect=Exception('boom')):
|
|
event = MagicMock()
|
|
mgr._on_identity_changed(event) # must not raise
|
|
|
|
|
|
class TestRefreshCertStatus(unittest.TestCase):
|
|
"""refresh_cert_status() + _check_cert_via_ssl()."""
|
|
|
|
def _make_der_cert(self, days_remaining: int) -> bytes:
|
|
"""Return a minimal self-signed DER cert valid for *days_remaining* days."""
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
import datetime
|
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
expiry = now + datetime.timedelta(days=days_remaining)
|
|
cert = (
|
|
x509.CertificateBuilder()
|
|
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
|
|
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')]))
|
|
.public_key(key.public_key())
|
|
.serial_number(x509.random_serial_number())
|
|
.not_valid_before(expiry - datetime.timedelta(days=30))
|
|
.not_valid_after(expiry)
|
|
.sign(key, hashes.SHA256())
|
|
)
|
|
return cert.public_bytes(serialization.Encoding.DER)
|
|
|
|
def test_check_cert_via_ssl_returns_none_on_connection_error(self):
|
|
"""_check_cert_via_ssl returns None when connection fails."""
|
|
with patch('caddy_manager._socket.create_connection', side_effect=OSError('refused')):
|
|
result = CaddyManager._check_cert_via_ssl('host', 443)
|
|
self.assertIsNone(result)
|
|
|
|
def test_check_cert_via_ssl_returns_valid_status(self):
|
|
"""_check_cert_via_ssl returns valid status for a future-dated cert."""
|
|
der = self._make_der_cert(60)
|
|
mock_tls = MagicMock()
|
|
mock_tls.__enter__ = MagicMock(return_value=mock_tls)
|
|
mock_tls.__exit__ = MagicMock(return_value=False)
|
|
mock_tls.getpeercert.return_value = der
|
|
mock_raw = MagicMock()
|
|
mock_raw.__enter__ = MagicMock(return_value=mock_raw)
|
|
mock_raw.__exit__ = MagicMock(return_value=False)
|
|
with patch('caddy_manager._socket.create_connection', return_value=mock_raw):
|
|
with patch('caddy_manager._ssl.create_default_context') as mock_ctx:
|
|
mock_ctx.return_value.wrap_socket.return_value = mock_tls
|
|
result = CaddyManager._check_cert_via_ssl('host', 443)
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result['status'], 'valid')
|
|
self.assertGreater(result['days_remaining'], 50)
|
|
|
|
def test_check_cert_via_ssl_returns_expired_for_past_cert(self):
|
|
"""_check_cert_via_ssl returns expired when cert is in the past."""
|
|
der = self._make_der_cert(-5)
|
|
mock_tls = MagicMock()
|
|
mock_tls.__enter__ = MagicMock(return_value=mock_tls)
|
|
mock_tls.__exit__ = MagicMock(return_value=False)
|
|
mock_tls.getpeercert.return_value = der
|
|
mock_raw = MagicMock()
|
|
mock_raw.__enter__ = MagicMock(return_value=mock_raw)
|
|
mock_raw.__exit__ = MagicMock(return_value=False)
|
|
with patch('caddy_manager._socket.create_connection', return_value=mock_raw):
|
|
with patch('caddy_manager._ssl.create_default_context') as mock_ctx:
|
|
mock_ctx.return_value.wrap_socket.return_value = mock_tls
|
|
result = CaddyManager._check_cert_via_ssl('host', 443)
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result['status'], 'expired')
|
|
self.assertLess(result['days_remaining'], 0)
|
|
|
|
def test_refresh_cert_status_lan_mode_returns_internal(self):
|
|
"""LAN mode always returns status='internal' without SSL check."""
|
|
mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan'})
|
|
with patch.object(CaddyManager, '_check_cert_via_ssl') as mock_ssl:
|
|
result = mgr.refresh_cert_status()
|
|
mock_ssl.assert_not_called()
|
|
self.assertEqual(result['status'], 'internal')
|
|
|
|
def test_refresh_cert_status_acme_mode_calls_ssl_check(self):
|
|
"""ACME mode calls _check_cert_via_ssl and persists the result."""
|
|
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
|
|
expected = {'status': 'valid', 'expiry': '2026-12-01T00:00:00+00:00', 'days_remaining': 179}
|
|
with patch.object(CaddyManager, '_check_cert_via_ssl', return_value=expected):
|
|
result = mgr.refresh_cert_status()
|
|
self.assertEqual(result['status'], 'valid')
|
|
# Should have been persisted to identity
|
|
mgr.config_manager.set_identity_field.assert_called_with('tls', expected)
|
|
|
|
def test_refresh_cert_status_ssl_failure_returns_unknown(self):
|
|
"""When SSL check returns None, status is 'unknown'."""
|
|
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
|
|
with patch.object(CaddyManager, '_check_cert_via_ssl', return_value=None):
|
|
result = mgr.refresh_cert_status()
|
|
self.assertEqual(result['status'], 'unknown')
|
|
|
|
def test_get_cert_status_fresh_refreshes_when_stale(self):
|
|
"""get_cert_status_fresh triggers a refresh when cache is None."""
|
|
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
|
|
mgr._cert_refreshed_at = None
|
|
with patch.object(mgr, 'refresh_cert_status', return_value={'status': 'valid'}) as mock_ref:
|
|
with patch.object(mgr, 'get_cert_status', return_value={'status': 'valid'}):
|
|
mgr.get_cert_status_fresh()
|
|
mock_ref.assert_called_once()
|
|
|
|
def test_get_cert_status_fresh_skips_refresh_when_recent(self):
|
|
"""get_cert_status_fresh skips refresh when cache is fresh."""
|
|
import time
|
|
mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'})
|
|
mgr._cert_refreshed_at = time.monotonic() # just refreshed
|
|
with patch.object(mgr, 'refresh_cert_status') as mock_ref:
|
|
with patch.object(mgr, 'get_cert_status', return_value={'status': 'valid'}):
|
|
mgr.get_cert_status_fresh(max_age_seconds=300)
|
|
mock_ref.assert_not_called()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|