"""Tests for CaddyManager — Caddyfile generation per domain mode plus admin-API reload, health check, and consecutive-failure bookkeeping. """ import os import sys import unittest from unittest.mock import MagicMock, patch import requests sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api')) from caddy_manager import CaddyManager # noqa: E402 def _mgr(tmpdir=None, identity=None): """Build a CaddyManager backed by a mock config_manager.""" cm = MagicMock() cm.get_identity.return_value = identity or {} mgr = CaddyManager( config_manager=cm, data_dir=tmpdir or '/tmp/pic-test-data', config_dir=tmpdir or '/tmp/pic-test-config', ) return mgr CALENDAR_ROUTE = ( "handle /calendar* {\n" " reverse_proxy cell-radicale:5232\n" "}" ) FILES_ROUTE = ( "handle /files* {\n" " reverse_proxy cell-filegator:8080\n" "}" ) class TestGenerateCaddyfileLan(unittest.TestCase): def test_lan_mode_has_auto_https_off_and_no_acme(self): mgr = _mgr() identity = {'cell_name': 'mycell', 'domain_mode': 'lan'} out = mgr.generate_caddyfile(identity, []) self.assertIn('auto_https off', out) # No ACME anywhere self.assertNotIn('acme_ca', out) self.assertNotIn('acme_email', out) self.assertNotIn('dns pic_ngo', out) self.assertNotIn('dns cloudflare', out) # Internal-CA TLS pair self.assertIn('tls /etc/caddy/internal/cert.pem ' '/etc/caddy/internal/key.pem', out) # Cell hostname plus virtual IP listener self.assertIn('http://mycell.cell', out) self.assertIn('http://172.20.0.2:80', out) class TestGenerateCaddyfilePicNgo(unittest.TestCase): def test_pic_ngo_has_dns_plugin_and_wildcard(self): mgr = _mgr() mgr.config_manager.configs = { 'ddns': {'url': 'https://ddns.pic.ngo/api/v1'}, } mgr.config_manager.get_ddns_token.return_value = 'TESTSECRET123' identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'} with unittest.mock.patch.dict(os.environ, {'DDNS_URL': 'https://ddns.pic.ngo/api/v1'}): out = mgr.generate_caddyfile(identity, []) self.assertIn('dns pic_ngo', out) self.assertIn('*.alpha.pic.ngo', out) self.assertIn('alpha.pic.ngo', out) # Registration token (not TOTP secret) is embedded — no {$VAR} placeholders self.assertIn('token TESTSECRET123', out) # /api/v1 is stripped — the plugin appends it itself self.assertIn('api_base_url https://ddns.pic.ngo', out) self.assertNotIn('api_base_url https://ddns.pic.ngo/api/v1', out) self.assertNotIn('{$PIC_NGO_DDNS_TOKEN}', out) self.assertNotIn('{$PIC_NGO_DDNS_API}', out) self.assertIn('email admin@alpha.pic.ngo', out) # acme_ca is omitted when ACME_CA_URL is not set self.assertNotIn('acme_ca', out) def test_pic_ngo_acme_ca_included_when_env_set(self): mgr = _mgr() mgr.config_manager.configs = {'ddns': {}} mgr.config_manager.get_ddns_token.return_value = 'TESTSECRET123' identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'} with unittest.mock.patch.dict(os.environ, { 'DDNS_URL': 'https://ddns.pic.ngo/api/v1', 'ACME_CA_URL': 'https://acme-staging-v02.api.letsencrypt.org/directory', }): out = mgr.generate_caddyfile(identity, []) self.assertIn('acme_ca https://acme-staging-v02.api.letsencrypt.org/directory', out) def test_pic_ngo_has_api_route_without_registry(self): mgr = _mgr() identity = {'cell_name': 'alpha', 'domain_mode': 'pic_ngo'} out = mgr.generate_caddyfile(identity, []) # Without a registry only the api block is present self.assertIn('@api host api.alpha.pic.ngo', out) self.assertIn('reverse_proxy cell-api:3000', out) self.assertNotIn('@calendar', out) self.assertNotIn('@mail', out) self.assertNotIn('@files', out) class TestGenerateCaddyfileCloudflare(unittest.TestCase): def test_cloudflare_has_dns_cloudflare(self): mgr = _mgr() identity = { 'cell_name': 'beta', 'domain_mode': 'cloudflare', 'domain_name': 'example.com', } out = mgr.generate_caddyfile(identity, []) self.assertIn('dns cloudflare {$CF_API_TOKEN}', out) self.assertIn('*.example.com', out) self.assertIn('email {$ACME_EMAIL}', out) # acme_ca is omitted when ACME_CA_URL is not set in the environment self.assertNotIn('acme_ca', out) def test_caddyfile_cloudflare_uses_domain_name(self): """Caddyfile must use domain_name for TLS host, not any 'custom_domain' key.""" mgr = _mgr() identity = { 'cell_name': 'beta', 'domain_mode': 'cloudflare', 'domain_name': 'home.example.com', 'domain': 'home.local', } out = mgr.generate_caddyfile(identity, []) self.assertIn('*.home.example.com', out) self.assertIn('home.example.com', out) # Must not use the internal domain for TLS self.assertNotIn('*.home.local', out) # 'custom_domain' must not appear literally as a key in the output self.assertNotIn('custom_domain', out) # Without a registry only the api block is emitted for subdomain routing self.assertIn('@api host api.home.example.com', out) self.assertNotIn('@calendar', out) self.assertNotIn('@files', out) class TestGenerateCaddyfileDuckDns(unittest.TestCase): def test_duckdns_has_dns_duckdns(self): mgr = _mgr() identity = {'cell_name': 'gamma', 'domain_mode': 'duckdns'} out = mgr.generate_caddyfile(identity, []) self.assertIn('dns duckdns {$DUCKDNS_TOKEN}', out) self.assertIn('*.gamma.duckdns.org', out) self.assertIn('@api host api.gamma.duckdns.org', out) self.assertNotIn('@calendar', out) self.assertNotIn('@files', out) class TestGenerateCaddyfileHttp01(unittest.TestCase): def test_http01_no_tls_block_and_per_service_blocks(self): mgr = _mgr() identity = { 'cell_name': 'delta', 'domain_mode': 'http01', 'domain_name': 'delta.noip.me', } # Store-plugin service (not a core service name) services = [ {'name': 'chat', 'caddy_route': 'reverse_proxy cell-chat:8090'}, ] out = mgr.generate_caddyfile(identity, services) # No wildcard, no DNS-01 plugins. self.assertNotIn('*.delta', out) self.assertNotIn('dns ', out) # No explicit tls block — Caddy uses HTTP-01 by default. self.assertNotIn('tls {', out) # Without a registry only the api block is generated self.assertIn('api.delta.noip.me {', out) self.assertNotIn('calendar.delta.noip.me {', out) self.assertNotIn('files.delta.noip.me {', out) self.assertNotIn('mail.delta.noip.me {', out) # Installed plugin service block still works self.assertIn('chat.delta.noip.me {', out) self.assertIn('reverse_proxy cell-chat:8090', out) def test_http01_installed_service_with_caddy_route_appears(self): """An installed service with a caddy_route produces its own per-host block.""" mgr = _mgr() identity = { 'cell_name': 'delta', 'domain_mode': 'http01', 'domain_name': 'delta.noip.me', } services = [{'name': 'notes', 'caddy_route': 'reverse_proxy cell-other:9000'}] out = mgr.generate_caddyfile(identity, services) self.assertIn('notes.delta.noip.me {', out) self.assertIn('reverse_proxy cell-other:9000', out) class TestServiceRoutesIncluded(unittest.TestCase): def test_installed_service_route_appears_in_output(self): mgr = _mgr() identity = {'cell_name': 'eps', 'domain_mode': 'lan'} services = [ {'name': 'calendar', 'caddy_route': CALENDAR_ROUTE}, {'name': 'files', 'caddy_route': FILES_ROUTE}, ] out = mgr.generate_caddyfile(identity, services) self.assertIn('handle /calendar*', out) self.assertIn('reverse_proxy cell-radicale:5232', out) self.assertIn('handle /files*', out) self.assertIn('reverse_proxy cell-filegator:8080', out) # Core routes still emitted self.assertIn('reverse_proxy cell-api:3000', out) self.assertIn('reverse_proxy cell-webui:80', out) class TestReloadCaddyAdminAPI(unittest.TestCase): def test_reload_calls_admin_api_load_endpoint(self): mgr = _mgr() # Point at a tmp Caddyfile so we can read it back during reload. import tempfile tmp = tempfile.NamedTemporaryFile('w', delete=False, suffix='.caddyfile') tmp.write(":80 { reverse_proxy cell-webui:80 }\n") tmp.close() mgr.caddyfile_path = tmp.name with patch('caddy_manager.requests.post') as mock_post: mock_post.return_value = MagicMock(status_code=200, text='ok') ok = mgr.reload_caddy() self.assertTrue(ok) mock_post.assert_called_once() args, kwargs = mock_post.call_args # First positional arg is the URL self.assertEqual(args[0], 'http://cell-caddy:2019/load') self.assertEqual(kwargs['headers']['Content-Type'], 'text/caddyfile') self.assertIn('cell-webui:80', kwargs['data']) os.unlink(tmp.name) class TestHealthCheck(unittest.TestCase): def test_returns_true_on_200(self): mgr = _mgr() with patch('caddy_manager.requests.get') as mock_get: mock_get.return_value = MagicMock(status_code=200) self.assertTrue(mgr.check_caddy_health()) mock_get.assert_called_once() # Must hit /config/ — not the root which returns 404 self.assertIn('/config/', mock_get.call_args[0][0]) def test_returns_false_on_connection_error(self): mgr = _mgr() with patch('caddy_manager.requests.get', side_effect=requests.ConnectionError('refused')): self.assertFalse(mgr.check_caddy_health()) def test_returns_false_on_non_200(self): mgr = _mgr() with patch('caddy_manager.requests.get') as mock_get: mock_get.return_value = MagicMock(status_code=500) self.assertFalse(mgr.check_caddy_health()) class TestFailureCounter(unittest.TestCase): def test_increments_and_resets(self): mgr = _mgr() self.assertEqual(mgr.get_health_failure_count(), 0) self.assertEqual(mgr.increment_health_failure(), 1) self.assertEqual(mgr.increment_health_failure(), 2) self.assertEqual(mgr.increment_health_failure(), 3) self.assertEqual(mgr.get_health_failure_count(), 3) mgr.reset_health_failures() self.assertEqual(mgr.get_health_failure_count(), 0) class TestCertStatus(unittest.TestCase): def test_returns_default_when_no_tls_in_identity(self): mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan'}) out = mgr.get_cert_status() self.assertEqual(out['status'], 'unknown') self.assertIsNone(out['expiry']) self.assertIsNone(out['days_remaining']) def test_returns_tls_block_when_present(self): mgr = _mgr(identity={ 'cell_name': 'x', 'domain_mode': 'pic_ngo', 'tls': { 'status': 'valid', 'expiry': '2026-08-01T00:00:00Z', 'days_remaining': 84, }, }) out = mgr.get_cert_status() self.assertEqual(out['status'], 'valid') self.assertEqual(out['expiry'], '2026-08-01T00:00:00Z') self.assertEqual(out['days_remaining'], 84) class TestCaddyManagerIdentityChangedSubscription(unittest.TestCase): def test_subscribes_to_identity_changed_on_init(self): """When service_bus is provided, CaddyManager subscribes to IDENTITY_CHANGED.""" from service_bus import EventType mock_bus = MagicMock() mgr = CaddyManager(config_manager=MagicMock(), service_bus=mock_bus) mock_bus.subscribe_to_event.assert_called_once_with( EventType.IDENTITY_CHANGED, mgr._on_identity_changed ) def test_no_subscription_without_service_bus(self): """When service_bus is omitted, no subscription is attempted.""" mock_bus = MagicMock() CaddyManager(config_manager=MagicMock()) mock_bus.subscribe_to_event.assert_not_called() def test_on_identity_changed_calls_regenerate_with_installed(self): """_on_identity_changed calls regenerate_with_installed([]).""" mgr = _mgr() with patch.object(mgr, 'regenerate_with_installed', return_value=True) as mock_regen: event = MagicMock() mgr._on_identity_changed(event) mock_regen.assert_called_once_with([]) def test_on_identity_changed_swallows_exceptions(self): """_on_identity_changed must not propagate exceptions.""" mgr = _mgr() with patch.object(mgr, 'regenerate_with_installed', side_effect=Exception('boom')): event = MagicMock() mgr._on_identity_changed(event) # must not raise class TestRefreshCertStatus(unittest.TestCase): """refresh_cert_status() + _check_cert_via_ssl().""" def _make_der_cert(self, days_remaining: int) -> bytes: """Return a minimal self-signed DER cert valid for *days_remaining* days.""" from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa import datetime key = rsa.generate_private_key(public_exponent=65537, key_size=2048) now = datetime.datetime.now(datetime.timezone.utc) expiry = now + datetime.timedelta(days=days_remaining) cert = ( x509.CertificateBuilder() .subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')])) .issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')])) .public_key(key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(expiry - datetime.timedelta(days=30)) .not_valid_after(expiry) .sign(key, hashes.SHA256()) ) return cert.public_bytes(serialization.Encoding.DER) def test_check_cert_via_ssl_returns_none_on_connection_error(self): """_check_cert_via_ssl returns None when connection fails.""" with patch('caddy_manager._socket.create_connection', side_effect=OSError('refused')): result = CaddyManager._check_cert_via_ssl('host', 443) self.assertIsNone(result) def test_check_cert_via_ssl_returns_valid_status(self): """_check_cert_via_ssl returns valid status for a future-dated cert.""" der = self._make_der_cert(60) mock_tls = MagicMock() mock_tls.__enter__ = MagicMock(return_value=mock_tls) mock_tls.__exit__ = MagicMock(return_value=False) mock_tls.getpeercert.return_value = der mock_raw = MagicMock() mock_raw.__enter__ = MagicMock(return_value=mock_raw) mock_raw.__exit__ = MagicMock(return_value=False) with patch('caddy_manager._socket.create_connection', return_value=mock_raw): with patch('caddy_manager._ssl.create_default_context') as mock_ctx: mock_ctx.return_value.wrap_socket.return_value = mock_tls result = CaddyManager._check_cert_via_ssl('host', 443) self.assertIsNotNone(result) self.assertEqual(result['status'], 'valid') self.assertGreater(result['days_remaining'], 50) def test_check_cert_via_ssl_returns_expired_for_past_cert(self): """_check_cert_via_ssl returns expired when cert is in the past.""" der = self._make_der_cert(-5) mock_tls = MagicMock() mock_tls.__enter__ = MagicMock(return_value=mock_tls) mock_tls.__exit__ = MagicMock(return_value=False) mock_tls.getpeercert.return_value = der mock_raw = MagicMock() mock_raw.__enter__ = MagicMock(return_value=mock_raw) mock_raw.__exit__ = MagicMock(return_value=False) with patch('caddy_manager._socket.create_connection', return_value=mock_raw): with patch('caddy_manager._ssl.create_default_context') as mock_ctx: mock_ctx.return_value.wrap_socket.return_value = mock_tls result = CaddyManager._check_cert_via_ssl('host', 443) self.assertIsNotNone(result) self.assertEqual(result['status'], 'expired') self.assertLess(result['days_remaining'], 0) def test_refresh_cert_status_lan_mode_returns_internal(self): """LAN mode always returns status='internal' without SSL check.""" mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan'}) with patch.object(CaddyManager, '_check_cert_via_ssl') as mock_ssl: result = mgr.refresh_cert_status() mock_ssl.assert_not_called() self.assertEqual(result['status'], 'internal') def test_refresh_cert_status_acme_mode_calls_ssl_check(self): """ACME mode calls _check_cert_via_ssl and persists the result.""" mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}) expected = {'status': 'valid', 'expiry': '2026-12-01T00:00:00+00:00', 'days_remaining': 179} with patch.object(CaddyManager, '_check_cert_via_ssl', return_value=expected): result = mgr.refresh_cert_status() self.assertEqual(result['status'], 'valid') # Should have been persisted to identity mgr.config_manager.set_identity_field.assert_called_with('tls', expected) def test_refresh_cert_status_ssl_failure_returns_unknown(self): """When SSL check returns None, status is 'unknown'.""" mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}) with patch.object(CaddyManager, '_check_cert_via_ssl', return_value=None): result = mgr.refresh_cert_status() self.assertEqual(result['status'], 'unknown') def test_get_cert_status_fresh_refreshes_when_stale(self): """get_cert_status_fresh triggers a refresh when cache is None.""" mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}) mgr._cert_refreshed_at = None with patch.object(mgr, 'refresh_cert_status', return_value={'status': 'valid'}) as mock_ref: with patch.object(mgr, 'get_cert_status', return_value={'status': 'valid'}): mgr.get_cert_status_fresh() mock_ref.assert_called_once() def test_get_cert_status_fresh_skips_refresh_when_recent(self): """get_cert_status_fresh skips refresh when cache is fresh.""" import time mgr = _mgr(identity={'cell_name': 'alpha', 'domain_mode': 'pic_ngo'}) mgr._cert_refreshed_at = time.monotonic() # just refreshed with patch.object(mgr, 'refresh_cert_status') as mock_ref: with patch.object(mgr, 'get_cert_status', return_value={'status': 'valid'}): mgr.get_cert_status_fresh(max_age_seconds=300) mock_ref.assert_not_called() class TestGetCertStatusEnriched(unittest.TestCase): """get_cert_status() returns domain, domain_mode, cert_type alongside tls fields.""" def test_includes_domain_and_mode_for_pic_ngo(self): mgr = _mgr(identity={ 'cell_name': 'alpha', 'domain_mode': 'pic_ngo', 'tls': {'status': 'valid', 'expiry': '2026-12-01T00:00:00+00:00', 'days_remaining': 180}, }) s = mgr.get_cert_status() self.assertEqual(s['domain_mode'], 'pic_ngo') self.assertEqual(s['domain'], '*.alpha.pic.ngo') self.assertEqual(s['cert_type'], 'acme') self.assertEqual(s['status'], 'valid') def test_cert_type_is_internal_for_lan_mode(self): mgr = _mgr(identity={'cell_name': 'x', 'domain_mode': 'lan', 'tls': {}}) s = mgr.get_cert_status() self.assertEqual(s['cert_type'], 'internal') self.assertIsNone(s['domain']) def test_cert_type_is_custom_when_tls_says_so(self): mgr = _mgr(identity={ 'cell_name': 'x', 'domain_mode': 'lan', 'tls': {'cert_type': 'custom', 'status': 'valid', 'expiry': '2027-01-01T00:00:00+00:00', 'days_remaining': 200}, }) s = mgr.get_cert_status() self.assertEqual(s['cert_type'], 'custom') def test_domain_label_cloudflare(self): ident = {'domain_mode': 'cloudflare', 'domain_name': 'example.com'} self.assertEqual(CaddyManager._domain_label(ident), '*.example.com') def test_domain_label_duckdns(self): ident = {'cell_name': 'beta', 'domain_mode': 'duckdns'} self.assertEqual(CaddyManager._domain_label(ident), '*.beta.duckdns.org') def test_domain_label_http01(self): ident = {'domain_mode': 'http01', 'domain_name': 'myhost.noip.me'} self.assertEqual(CaddyManager._domain_label(ident), 'myhost.noip.me') def test_domain_label_lan_is_none(self): self.assertIsNone(CaddyManager._domain_label({'domain_mode': 'lan'})) class TestRenewCert(unittest.TestCase): """renew_cert() — mode guard, reload call, cache invalidation.""" def test_lan_mode_returns_error(self): mgr = _mgr(identity={'domain_mode': 'lan'}) result = mgr.renew_cert() self.assertFalse(result['ok']) self.assertIn('LAN', result['error']) def test_acme_mode_calls_regenerate(self): mgr = _mgr(identity={'domain_mode': 'pic_ngo'}) with patch.object(mgr, 'regenerate_with_installed', return_value=True) as mock_regen: result = mgr.renew_cert() mock_regen.assert_called_once_with([]) self.assertTrue(result['ok']) self.assertEqual(result['status'], 'pending') def test_reload_failure_propagated(self): mgr = _mgr(identity={'domain_mode': 'cloudflare'}) with patch.object(mgr, 'regenerate_with_installed', return_value=False): result = mgr.renew_cert() self.assertFalse(result['ok']) self.assertIn('reload failed', result['error']) def test_invalidates_cache_on_success(self): import time mgr = _mgr(identity={'domain_mode': 'pic_ngo'}) mgr._cert_refreshed_at = time.monotonic() with patch.object(mgr, 'regenerate_with_installed', return_value=True): mgr.renew_cert() self.assertIsNone(mgr._cert_refreshed_at) class TestUploadCustomCert(unittest.TestCase): """upload_custom_cert() — validation, file writes, identity persistence, Caddyfile regen.""" def _make_pem_cert(self, days_remaining: int = 90): """Return (cert_pem, key_pem) for a self-signed cert.""" from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa import datetime key = rsa.generate_private_key(public_exponent=65537, key_size=2048) now = datetime.datetime.now(datetime.timezone.utc) expiry = now + datetime.timedelta(days=days_remaining) not_before = (now - datetime.timedelta(days=abs(days_remaining) + 10) if days_remaining < 0 else now - datetime.timedelta(days=1)) cert = ( x509.CertificateBuilder() .subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')])) .issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'test.example.com')])) .public_key(key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(not_before) .not_valid_after(expiry) .sign(key, hashes.SHA256()) ) cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode() key_pem = key.private_bytes( serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption(), ).decode() return cert_pem, key_pem def test_rejects_invalid_cert_pem(self): mgr = _mgr() result = mgr.upload_custom_cert('not a cert', '-----BEGIN PRIVATE KEY-----\nXXX\n-----END PRIVATE KEY-----') self.assertFalse(result['ok']) self.assertIn('Invalid certificate', result['error']) def test_rejects_invalid_key_pem(self): mgr = _mgr() cert_pem, _ = self._make_pem_cert() result = mgr.upload_custom_cert(cert_pem, 'not a key') self.assertFalse(result['ok']) self.assertIn('Invalid private key', result['error']) def test_writes_files_to_certs_dir(self): mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'}) cert_pem, key_pem = self._make_pem_cert() written = {} def fake_open(path, mode='r', **kw): import unittest.mock m = unittest.mock.mock_open()() if 'w' in mode: written[path] = True return m with patch('builtins.open', side_effect=fake_open): with patch('os.makedirs'): with patch.object(mgr, 'regenerate_with_installed', return_value=True): mgr.upload_custom_cert(cert_pem, key_pem) self.assertTrue(any('cert.pem' in p for p in written)) self.assertTrue(any('key.pem' in p for p in written)) def test_persists_custom_cert_type_to_identity(self): mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'}) cert_pem, key_pem = self._make_pem_cert(days_remaining=90) with patch('builtins.open', unittest.mock.mock_open()): with patch('os.makedirs'): with patch.object(mgr, 'regenerate_with_installed', return_value=True): result = mgr.upload_custom_cert(cert_pem, key_pem) self.assertTrue(result['ok']) self.assertEqual(result['cert_type'], 'custom') self.assertEqual(result['status'], 'valid') mgr.config_manager.set_identity_field.assert_called_once() call_args = mgr.config_manager.set_identity_field.call_args self.assertEqual(call_args[0][0], 'tls') self.assertEqual(call_args[0][1]['cert_type'], 'custom') def test_expired_cert_flagged_as_expired(self): mgr = _mgr(identity={'domain_mode': 'lan', 'cell_name': 'x'}) cert_pem, key_pem = self._make_pem_cert(days_remaining=-5) with patch('builtins.open', unittest.mock.mock_open()): with patch('os.makedirs'): with patch.object(mgr, 'regenerate_with_installed', return_value=True): result = mgr.upload_custom_cert(cert_pem, key_pem) self.assertEqual(result['status'], 'expired') def test_file_write_failure_returns_error(self): mgr = _mgr(identity={'domain_mode': 'lan'}) cert_pem, key_pem = self._make_pem_cert() with patch('os.makedirs'): with patch('builtins.open', side_effect=OSError('no space')): result = mgr.upload_custom_cert(cert_pem, key_pem) self.assertFalse(result['ok']) self.assertIn('Failed to write', result['error']) class TestCaddyfileLanCustomCert(unittest.TestCase): """_caddyfile_lan() uses the custom cert path when cert_type=custom.""" def test_default_uses_internal_cert_path(self): mgr = _mgr(identity={'cell_name': 'mycell', 'domain_mode': 'lan'}) out = mgr.generate_caddyfile({'cell_name': 'mycell', 'domain_mode': 'lan'}, []) self.assertIn('/etc/caddy/internal/cert.pem', out) def test_custom_cert_type_uses_shared_cert_path(self): mgr = _mgr(identity={ 'cell_name': 'mycell', 'domain_mode': 'lan', 'tls': {'cert_type': 'custom'}, }) out = mgr.generate_caddyfile({'cell_name': 'mycell', 'domain_mode': 'lan'}, []) self.assertIn('/config/caddy/certs/cert.pem', out) self.assertNotIn('/etc/caddy/internal/cert.pem', out) class TestPicNgoNoTokenFallback(unittest.TestCase): """pic_ngo mode with no token falls back to lan so Caddy starts cleanly.""" def test_empty_token_generates_lan_caddyfile(self): mgr = _mgr() mgr.config_manager.configs = {'ddns': {'url': 'https://ddns.pic.ngo'}} mgr.config_manager.get_ddns_token.return_value = '' with patch.dict(os.environ, {}, clear=False): os.environ.pop('DDNS_TOKEN', None) os.environ.pop('DDNS_URL', None) out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, []) self.assertIn('auto_https off', out) self.assertNotIn('dns pic_ngo', out) self.assertNotIn('token', out) def test_missing_ddns_config_generates_lan_caddyfile(self): mgr = _mgr() mgr.config_manager.configs = {} mgr.config_manager.get_ddns_token.return_value = '' with patch.dict(os.environ, {}, clear=False): os.environ.pop('DDNS_TOKEN', None) os.environ.pop('DDNS_URL', None) out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, []) self.assertIn('auto_https off', out) self.assertNotIn('dns pic_ngo', out) class TestDdnsApiStripsLegacySuffix(unittest.TestCase): """_caddyfile_pic_ngo strips /api/v1 from ddns_api so the plugin doesn't double it.""" def test_api_v1_suffix_stripped_from_config_url(self): mgr = _mgr() mgr.config_manager.configs = { 'ddns': {'url': 'https://ddns.pic.ngo/api/v1'}, } mgr.config_manager.get_ddns_token.return_value = 'tok' with patch.dict(os.environ, {}, clear=False): os.environ.pop('DDNS_URL', None) out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, []) self.assertIn('api_base_url https://ddns.pic.ngo', out) self.assertNotIn('api_base_url https://ddns.pic.ngo/api/v1', out) def test_clean_url_is_unchanged(self): mgr = _mgr() mgr.config_manager.configs = { 'ddns': {'url': 'https://ddns.pic.ngo'}, } mgr.config_manager.get_ddns_token.return_value = 'tok' with patch.dict(os.environ, {}, clear=False): os.environ.pop('DDNS_URL', None) out = mgr.generate_caddyfile({'cell_name': 'x', 'domain_mode': 'pic_ngo'}, []) self.assertIn('api_base_url https://ddns.pic.ngo', out) if __name__ == '__main__': unittest.main()