#!/usr/bin/env python3 """ Tests for ServiceBus """ import unittest import json import tempfile import os import shutil import time from unittest.mock import Mock, patch, MagicMock import sys from pathlib import Path # Add api directory to path api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) from service_bus import ServiceBus, EventType, Event class TestServiceBus(unittest.TestCase): """Test the service bus functionality""" def setUp(self): self.service_bus = ServiceBus() def test_initialization(self): """Test service bus initialization""" self.assertIsNotNone(self.service_bus) self.assertEqual(len(self.service_bus.service_registry), 0) self.assertFalse(self.service_bus.running) def test_start_stop(self): """Test start and stop functionality""" self.service_bus.start() self.assertTrue(self.service_bus.running) self.service_bus.stop() self.assertFalse(self.service_bus.running) def test_register_unregister_service(self): """Test service registration and unregistration""" mock_service = Mock() mock_service.name = 'test_service' # Register service self.service_bus.register_service('test_service', mock_service) self.assertIn('test_service', self.service_bus.service_registry) self.assertEqual(self.service_bus.service_registry['test_service'], mock_service) # List services services = self.service_bus.list_services() self.assertIn('test_service', services) # Get service service = self.service_bus.get_service('test_service') self.assertEqual(service, mock_service) # Unregister service self.service_bus.unregister_service('test_service') def test_publish_subscribe_events(self): """Test event publishing and subscription""" events_received = [] def event_handler(event): events_received.append(event) # Subscribe to events self.service_bus.subscribe_to_event(EventType.SERVICE_STARTED, event_handler) # Publish event test_data = {'service': 'test_service', 'timestamp': '2023-01-01T00:00:00Z'} self.service_bus.publish_event(EventType.SERVICE_STARTED, 'test_source', test_data) # Give some time for event processing time.sleep(0.1) # Check if event was received self.assertIsInstance(events_received, list) # Note: Event processing might be async, so we can't guarantee immediate reception def test_call_service(self): """Test service method calling""" mock_service = Mock() mock_service.test_method.return_value = 'test_result' self.service_bus.register_service('test_service', mock_service) # Call service method result = self.service_bus.call_service('test_service', 'test_method', param1='value1') self.assertEqual(result, 'test_result') mock_service.test_method.assert_called_once_with(param1='value1') # Test calling non-existent service with self.assertRaises(ValueError): self.service_bus.call_service('nonexistent_service', 'test_method') # Test calling non-existent method - Mock objects have all attributes by default # So we need to explicitly make the attribute not exist mock_service.nonexistent_method = None delattr(mock_service, 'nonexistent_method') with self.assertRaises(ValueError): self.service_bus.call_service('test_service', 'nonexistent_method') # Clean up self.service_bus.unregister_service('test_service') def test_service_orchestration(self): """Test service orchestration""" mock_service = Mock() mock_service.start = Mock() mock_service.stop = Mock() self.service_bus.register_service('test_service', mock_service) # Test orchestrated start success = self.service_bus.orchestrate_service_start('test_service') self.assertTrue(success) mock_service.start.assert_called_once() # Test orchestrated stop success = self.service_bus.orchestrate_service_stop('test_service') self.assertTrue(success) mock_service.stop.assert_called_once() # Clean up self.service_bus.unregister_service('test_service') def test_event_history(self): """Test event history functionality""" # Publish some events for i in range(5): self.service_bus.publish_event( EventType.SERVICE_STARTED, f'source_{i}', {'index': i} ) # Get event history history = self.service_bus.get_event_history() self.assertIsInstance(history, list) # Get filtered history filtered_history = self.service_bus.get_event_history( event_type=EventType.SERVICE_STARTED, limit=3 ) self.assertIsInstance(filtered_history, list) # Clear history self.service_bus.clear_event_history() history = self.service_bus.get_event_history() self.assertEqual(len(history), 0) def test_service_dependencies(self): """Test service dependency management""" # Clear any existing dependencies for email service if 'email' in self.service_bus.service_dependencies: self.service_bus.service_dependencies['email'] = [] # Add dependency self.service_bus.add_service_dependency('email', 'network') dependencies = self.service_bus.get_service_dependencies('email') self.assertIn('network', dependencies) # Remove dependency self.service_bus.remove_service_dependency('email', 'network') dependencies = self.service_bus.get_service_dependencies('email') self.assertNotIn('network', dependencies) def test_service_status_summary(self): """Test service status summary""" mock_service = Mock() mock_service.get_status.return_value = {'running': True, 'status': 'online'} self.service_bus.register_service('test_service', mock_service) summary = self.service_bus.get_service_status_summary() self.assertIsInstance(summary, dict) self.assertIn('total_services', summary) def test_lifecycle_hooks(self): """Test lifecycle hooks""" mock_hook = Mock() # Add lifecycle hook self.service_bus.add_lifecycle_hook('test_service', 'pre_start', mock_hook) # Verify hook was added self.assertIn('pre_start', self.service_bus.lifecycle_hooks['test_service']) # Remove lifecycle hook self.service_bus.remove_lifecycle_hook('test_service', 'pre_start') # Verify hook was removed self.assertNotIn('pre_start', self.service_bus.lifecycle_hooks['test_service']) def test_service_restart(self): """Test service restart orchestration""" mock_service = Mock() mock_service.start = Mock() mock_service.stop = Mock() self.service_bus.register_service('test_service', mock_service) # Test orchestrated restart success = self.service_bus.orchestrate_service_restart('test_service') self.assertTrue(success) # Verify stop and start were called mock_service.stop.assert_called_once() mock_service.start.assert_called_once() if __name__ == '__main__': unittest.main()