218 lines
7.9 KiB
Python
218 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for ServiceBus
|
|
"""
|
|
|
|
import unittest
|
|
import json
|
|
import tempfile
|
|
import os
|
|
import shutil
|
|
import time
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add api directory to path
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
from service_bus import ServiceBus, EventType, Event
|
|
|
|
class TestServiceBus(unittest.TestCase):
|
|
"""Test the service bus functionality"""
|
|
|
|
def setUp(self):
|
|
self.service_bus = ServiceBus()
|
|
|
|
def test_initialization(self):
|
|
"""Test service bus initialization"""
|
|
self.assertIsNotNone(self.service_bus)
|
|
self.assertEqual(len(self.service_bus.service_registry), 0)
|
|
self.assertFalse(self.service_bus.running)
|
|
|
|
def test_start_stop(self):
|
|
"""Test start and stop functionality"""
|
|
self.service_bus.start()
|
|
self.assertTrue(self.service_bus.running)
|
|
|
|
self.service_bus.stop()
|
|
self.assertFalse(self.service_bus.running)
|
|
|
|
def test_register_unregister_service(self):
|
|
"""Test service registration and unregistration"""
|
|
mock_service = Mock()
|
|
mock_service.name = 'test_service'
|
|
|
|
# Register service
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
self.assertIn('test_service', self.service_bus.service_registry)
|
|
self.assertEqual(self.service_bus.service_registry['test_service'], mock_service)
|
|
|
|
# List services
|
|
services = self.service_bus.list_services()
|
|
self.assertIn('test_service', services)
|
|
|
|
# Get service
|
|
service = self.service_bus.get_service('test_service')
|
|
self.assertEqual(service, mock_service)
|
|
|
|
# Unregister service
|
|
self.service_bus.unregister_service('test_service')
|
|
|
|
def test_publish_subscribe_events(self):
|
|
"""Test event publishing and subscription"""
|
|
events_received = []
|
|
|
|
def event_handler(event):
|
|
events_received.append(event)
|
|
|
|
# Subscribe to events
|
|
self.service_bus.subscribe_to_event(EventType.SERVICE_STARTED, event_handler)
|
|
|
|
# Publish event
|
|
test_data = {'service': 'test_service', 'timestamp': '2023-01-01T00:00:00Z'}
|
|
self.service_bus.publish_event(EventType.SERVICE_STARTED, 'test_source', test_data)
|
|
|
|
# Give some time for event processing
|
|
time.sleep(0.1)
|
|
|
|
# Check if event was received
|
|
self.assertIsInstance(events_received, list)
|
|
# Note: Event processing might be async, so we can't guarantee immediate reception
|
|
|
|
def test_call_service(self):
|
|
"""Test service method calling"""
|
|
mock_service = Mock()
|
|
mock_service.test_method.return_value = 'test_result'
|
|
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
|
|
# Call service method
|
|
result = self.service_bus.call_service('test_service', 'test_method', param1='value1')
|
|
self.assertEqual(result, 'test_result')
|
|
mock_service.test_method.assert_called_once_with(param1='value1')
|
|
|
|
# Test calling non-existent service
|
|
with self.assertRaises(ValueError):
|
|
self.service_bus.call_service('nonexistent_service', 'test_method')
|
|
|
|
# Test calling non-existent method - Mock objects have all attributes by default
|
|
# So we need to explicitly make the attribute not exist
|
|
mock_service.nonexistent_method = None
|
|
delattr(mock_service, 'nonexistent_method')
|
|
|
|
with self.assertRaises(ValueError):
|
|
self.service_bus.call_service('test_service', 'nonexistent_method')
|
|
|
|
# Clean up
|
|
self.service_bus.unregister_service('test_service')
|
|
|
|
def test_service_orchestration(self):
|
|
"""Test service orchestration"""
|
|
mock_service = Mock()
|
|
mock_service.start = Mock()
|
|
mock_service.stop = Mock()
|
|
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
|
|
# Test orchestrated start
|
|
success = self.service_bus.orchestrate_service_start('test_service')
|
|
self.assertTrue(success)
|
|
mock_service.start.assert_called_once()
|
|
|
|
# Test orchestrated stop
|
|
success = self.service_bus.orchestrate_service_stop('test_service')
|
|
self.assertTrue(success)
|
|
mock_service.stop.assert_called_once()
|
|
|
|
# Clean up
|
|
self.service_bus.unregister_service('test_service')
|
|
|
|
def test_event_history(self):
|
|
"""Test event history functionality"""
|
|
# Publish some events
|
|
for i in range(5):
|
|
self.service_bus.publish_event(
|
|
EventType.SERVICE_STARTED,
|
|
f'source_{i}',
|
|
{'index': i}
|
|
)
|
|
|
|
# Get event history
|
|
history = self.service_bus.get_event_history()
|
|
self.assertIsInstance(history, list)
|
|
|
|
# Get filtered history
|
|
filtered_history = self.service_bus.get_event_history(
|
|
event_type=EventType.SERVICE_STARTED,
|
|
limit=3
|
|
)
|
|
self.assertIsInstance(filtered_history, list)
|
|
|
|
# Clear history
|
|
self.service_bus.clear_event_history()
|
|
history = self.service_bus.get_event_history()
|
|
self.assertEqual(len(history), 0)
|
|
|
|
def test_service_dependencies(self):
|
|
"""Test service dependency management"""
|
|
# Clear any existing dependencies for email service
|
|
if 'email' in self.service_bus.service_dependencies:
|
|
self.service_bus.service_dependencies['email'] = []
|
|
|
|
# Add dependency
|
|
self.service_bus.add_service_dependency('email', 'network')
|
|
dependencies = self.service_bus.get_service_dependencies('email')
|
|
self.assertIn('network', dependencies)
|
|
|
|
# Remove dependency
|
|
self.service_bus.remove_service_dependency('email', 'network')
|
|
dependencies = self.service_bus.get_service_dependencies('email')
|
|
self.assertNotIn('network', dependencies)
|
|
|
|
def test_service_status_summary(self):
|
|
"""Test service status summary"""
|
|
mock_service = Mock()
|
|
mock_service.get_status.return_value = {'running': True, 'status': 'online'}
|
|
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
|
|
summary = self.service_bus.get_service_status_summary()
|
|
self.assertIsInstance(summary, dict)
|
|
self.assertIn('total_services', summary)
|
|
|
|
def test_lifecycle_hooks(self):
|
|
"""Test lifecycle hooks"""
|
|
mock_hook = Mock()
|
|
|
|
# Add lifecycle hook
|
|
self.service_bus.add_lifecycle_hook('test_service', 'pre_start', mock_hook)
|
|
|
|
# Verify hook was added
|
|
self.assertIn('pre_start', self.service_bus.lifecycle_hooks['test_service'])
|
|
|
|
# Remove lifecycle hook
|
|
self.service_bus.remove_lifecycle_hook('test_service', 'pre_start')
|
|
|
|
# Verify hook was removed
|
|
self.assertNotIn('pre_start', self.service_bus.lifecycle_hooks['test_service'])
|
|
|
|
def test_service_restart(self):
|
|
"""Test service restart orchestration"""
|
|
mock_service = Mock()
|
|
mock_service.start = Mock()
|
|
mock_service.stop = Mock()
|
|
|
|
self.service_bus.register_service('test_service', mock_service)
|
|
|
|
# Test orchestrated restart
|
|
success = self.service_bus.orchestrate_service_restart('test_service')
|
|
self.assertTrue(success)
|
|
|
|
# Verify stop and start were called
|
|
mock_service.stop.assert_called_once()
|
|
mock_service.start.assert_called_once()
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |