1f016de855
Unit Tests / test (push) Successful in 11m35s
- ConfigManager.get_effective_domain(): returns domain_name when DDNS
active (pic_ngo/cloudflare/duckdns), domain otherwise. Used by all
public-facing services so they use the real registered FQDN.
- ConfigManager.get_internal_domain(): always returns _identity.domain
(CoreDNS zone name, dnsmasq, cell-link invites — stays internal).
- Silent migration: if domain_mode != lan and domain is generic "cell",
auto-set to {cell_name}.local for unique CoreDNS zone naming.
- caddy_manager: fix custom_domain bug — cloudflare/http01 modes were
reading identity.get('custom_domain') which never exists; now reads
domain_name correctly.
- routes/config, app: expose effective_domain in GET /api/config and
/api/status responses.
- email_manager, routes/email: use get_effective_domain() for
OVERRIDE_HOSTNAME, POSTMASTER_ADDRESS, and new-user email defaults.
- ServiceBus.IDENTITY_CHANGED event: emitted from PUT /api/config and
POST /api/ddns/register after identity writes; caddy_manager and
email_manager subscribe to regenerate config automatically.
- Settings.jsx: hide Local Domain input in non-LAN modes; show
read-only effective_domain with "managed by DDNS" badge and an
Advanced toggle for the internal CoreDNS zone name.
- 11 new test classes covering all new helpers, event subscriptions,
caddy/email handlers, and the custom_domain fix.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
251 lines
9.2 KiB
Python
251 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for ServiceBus
|
|
"""
|
|
|
|
import unittest
|
|
import json
|
|
import tempfile
|
|
import os
|
|
import shutil
|
|
import time
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add api directory to path
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
from service_bus import ServiceBus, EventType, Event
|
|
|
|
class TestServiceBus(unittest.TestCase):
|
|
"""Test the service bus functionality"""
|
|
|
|
def setUp(self):
|
|
self.service_bus = ServiceBus()
|
|
|
|
def test_initialization(self):
|
|
"""Test service bus initialization"""
|
|
self.assertIsNotNone(self.service_bus)
|
|
self.assertEqual(len(self.service_bus.service_registry), 0)
|
|
self.assertFalse(self.service_bus.running)
|
|
|
|
def test_start_stop(self):
|
|
"""Test start and stop functionality"""
|
|
self.service_bus.start()
|
|
self.assertTrue(self.service_bus.running)
|
|
|
|
self.service_bus.stop()
|
|
self.assertFalse(self.service_bus.running)
|
|
|
|
def test_register_unregister_service(self):
|
|
"""Test service registration and unregistration"""
|
|
mock_service = Mock()
|
|
mock_service.name = 'test_service'
|
|
|
|
# Register service
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
self.assertIn('test_service', self.service_bus.service_registry)
|
|
self.assertEqual(self.service_bus.service_registry['test_service'], mock_service)
|
|
|
|
# List services
|
|
services = self.service_bus.list_services()
|
|
self.assertIn('test_service', services)
|
|
|
|
# Get service
|
|
service = self.service_bus.get_service('test_service')
|
|
self.assertEqual(service, mock_service)
|
|
|
|
# Unregister service
|
|
self.service_bus.unregister_service('test_service')
|
|
|
|
def test_publish_subscribe_events(self):
|
|
"""Test event publishing and subscription"""
|
|
events_received = []
|
|
|
|
def event_handler(event):
|
|
events_received.append(event)
|
|
|
|
# Subscribe to events
|
|
self.service_bus.subscribe_to_event(EventType.SERVICE_STARTED, event_handler)
|
|
|
|
# Publish event
|
|
test_data = {'service': 'test_service', 'timestamp': '2023-01-01T00:00:00Z'}
|
|
self.service_bus.publish_event(EventType.SERVICE_STARTED, 'test_source', test_data)
|
|
|
|
# Give some time for event processing
|
|
time.sleep(0.1)
|
|
|
|
# Check if event was received
|
|
self.assertIsInstance(events_received, list)
|
|
# Note: Event processing might be async, so we can't guarantee immediate reception
|
|
|
|
def test_call_service(self):
|
|
"""Test service method calling"""
|
|
mock_service = Mock()
|
|
mock_service.test_method.return_value = 'test_result'
|
|
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
|
|
# Call service method
|
|
result = self.service_bus.call_service('test_service', 'test_method', param1='value1')
|
|
self.assertEqual(result, 'test_result')
|
|
mock_service.test_method.assert_called_once_with(param1='value1')
|
|
|
|
# Test calling non-existent service
|
|
with self.assertRaises(ValueError):
|
|
self.service_bus.call_service('nonexistent_service', 'test_method')
|
|
|
|
# Test calling non-existent method - Mock objects have all attributes by default
|
|
# So we need to explicitly make the attribute not exist
|
|
mock_service.nonexistent_method = None
|
|
delattr(mock_service, 'nonexistent_method')
|
|
|
|
with self.assertRaises(ValueError):
|
|
self.service_bus.call_service('test_service', 'nonexistent_method')
|
|
|
|
# Clean up
|
|
self.service_bus.unregister_service('test_service')
|
|
|
|
def test_service_orchestration(self):
|
|
"""Test service orchestration"""
|
|
mock_service = Mock()
|
|
mock_service.start = Mock()
|
|
mock_service.stop = Mock()
|
|
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
|
|
# Test orchestrated start
|
|
success = self.service_bus.orchestrate_service_start('test_service')
|
|
self.assertTrue(success)
|
|
mock_service.start.assert_called_once()
|
|
|
|
# Test orchestrated stop
|
|
success = self.service_bus.orchestrate_service_stop('test_service')
|
|
self.assertTrue(success)
|
|
mock_service.stop.assert_called_once()
|
|
|
|
# Clean up
|
|
self.service_bus.unregister_service('test_service')
|
|
|
|
def test_event_history(self):
|
|
"""Test event history functionality"""
|
|
# Publish some events
|
|
for i in range(5):
|
|
self.service_bus.publish_event(
|
|
EventType.SERVICE_STARTED,
|
|
f'source_{i}',
|
|
{'index': i}
|
|
)
|
|
|
|
# Get event history
|
|
history = self.service_bus.get_event_history()
|
|
self.assertIsInstance(history, list)
|
|
|
|
# Get filtered history
|
|
filtered_history = self.service_bus.get_event_history(
|
|
event_type=EventType.SERVICE_STARTED,
|
|
limit=3
|
|
)
|
|
self.assertIsInstance(filtered_history, list)
|
|
|
|
# Clear history
|
|
self.service_bus.clear_event_history()
|
|
history = self.service_bus.get_event_history()
|
|
self.assertEqual(len(history), 0)
|
|
|
|
def test_service_dependencies(self):
|
|
"""Test service dependency management"""
|
|
# Clear any existing dependencies for email service
|
|
if 'email' in self.service_bus.service_dependencies:
|
|
self.service_bus.service_dependencies['email'] = []
|
|
|
|
# Add dependency
|
|
self.service_bus.add_service_dependency('email', 'network')
|
|
dependencies = self.service_bus.get_service_dependencies('email')
|
|
self.assertIn('network', dependencies)
|
|
|
|
# Remove dependency
|
|
self.service_bus.remove_service_dependency('email', 'network')
|
|
dependencies = self.service_bus.get_service_dependencies('email')
|
|
self.assertNotIn('network', dependencies)
|
|
|
|
def test_service_status_summary(self):
|
|
"""Test service status summary"""
|
|
mock_service = Mock()
|
|
mock_service.get_status.return_value = {'running': True, 'status': 'online'}
|
|
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
|
|
summary = self.service_bus.get_service_status_summary()
|
|
self.assertIsInstance(summary, dict)
|
|
self.assertIn('total_services', summary)
|
|
|
|
def test_lifecycle_hooks(self):
|
|
"""Test lifecycle hooks"""
|
|
mock_hook = Mock()
|
|
|
|
# Add lifecycle hook
|
|
self.service_bus.add_lifecycle_hook('test_service', 'pre_start', mock_hook)
|
|
|
|
# Verify hook was added
|
|
self.assertIn('pre_start', self.service_bus.lifecycle_hooks['test_service'])
|
|
|
|
# Remove lifecycle hook
|
|
self.service_bus.remove_lifecycle_hook('test_service', 'pre_start')
|
|
|
|
# Verify hook was removed
|
|
self.assertNotIn('pre_start', self.service_bus.lifecycle_hooks['test_service'])
|
|
|
|
def test_service_restart(self):
|
|
"""Test service restart orchestration"""
|
|
mock_service = Mock()
|
|
mock_service.start = Mock()
|
|
mock_service.stop = Mock()
|
|
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
|
|
# Test orchestrated restart
|
|
success = self.service_bus.orchestrate_service_restart('test_service')
|
|
self.assertTrue(success)
|
|
|
|
# Verify stop and start were called
|
|
mock_service.stop.assert_called_once()
|
|
mock_service.start.assert_called_once()
|
|
|
|
class TestIdentityChangedEventType(unittest.TestCase):
|
|
"""Tests for the IDENTITY_CHANGED event type."""
|
|
|
|
def test_identity_changed_event_type_exists(self):
|
|
self.assertEqual(EventType.IDENTITY_CHANGED.value, "identity_changed")
|
|
|
|
def test_identity_changed_published_and_received(self):
|
|
"""Publish IDENTITY_CHANGED and verify the subscriber receives it."""
|
|
bus = ServiceBus()
|
|
bus.start()
|
|
try:
|
|
received = []
|
|
|
|
def handler(event):
|
|
received.append(event)
|
|
|
|
bus.subscribe_to_event(EventType.IDENTITY_CHANGED, handler)
|
|
bus.publish_event(EventType.IDENTITY_CHANGED, 'test', {
|
|
'cell_name': 'mycell',
|
|
'domain': 'cell',
|
|
'domain_name': 'mycell.pic.ngo',
|
|
'domain_mode': 'pic_ngo',
|
|
'effective_domain': 'mycell.pic.ngo',
|
|
})
|
|
time.sleep(0.2)
|
|
self.assertEqual(len(received), 1)
|
|
self.assertEqual(received[0].event_type, EventType.IDENTITY_CHANGED)
|
|
self.assertEqual(received[0].data['cell_name'], 'mycell')
|
|
self.assertEqual(received[0].data['effective_domain'], 'mycell.pic.ngo')
|
|
finally:
|
|
bus.stop()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |