import sys from pathlib import Path # Add api directory to path api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) import unittest import tempfile import shutil import os import json from unittest.mock import patch, MagicMock from calendar_manager import CalendarManager class TestCalendarManager(unittest.TestCase): def setUp(self): self.test_dir = tempfile.mkdtemp() self.data_dir = os.path.join(self.test_dir, 'data') self.config_dir = os.path.join(self.test_dir, 'config') os.makedirs(self.data_dir, exist_ok=True) os.makedirs(self.config_dir, exist_ok=True) self.manager = CalendarManager(data_dir=self.data_dir, config_dir=self.config_dir) def tearDown(self): shutil.rmtree(self.test_dir) def test_initialization(self): self.assertTrue(os.path.exists(self.manager.calendar_dir)) self.assertTrue(os.path.exists(self.manager.radicale_dir)) def test_ensure_config_exists(self): config_file = os.path.join(self.manager.radicale_dir, 'config') if os.path.exists(config_file): os.remove(config_file) self.manager._ensure_config_exists() self.assertTrue(os.path.exists(config_file)) def test_generate_radicale_config(self): config_file = os.path.join(self.manager.radicale_dir, 'config') if os.path.exists(config_file): os.remove(config_file) self.manager._generate_radicale_config() self.assertTrue(os.path.exists(config_file)) with open(config_file) as f: content = f.read() self.assertIn('[server]', content) self.assertIn('hosts = 0.0.0.0:5232', content) def test_get_status(self): status = self.manager.get_status() self.assertIsInstance(status, dict) self.assertIn('status', status) @patch.object(CalendarManager, 'create_calendar', return_value=True) @patch.object(CalendarManager, 'remove_calendar', return_value=True) def test_create_and_remove_calendar(self, mock_remove, mock_create): result = self.manager.create_calendar('testuser', 'testcal') self.assertTrue(result) result = self.manager.remove_calendar('testuser', 'testcal') self.assertTrue(result) @patch.object(CalendarManager, 'add_event', return_value=True) @patch.object(CalendarManager, 'remove_event', return_value=True) def test_add_and_remove_event(self, mock_remove, mock_add): result = self.manager.add_event('testuser', 'testcal', {'summary': 'Test'}) self.assertTrue(result) result = self.manager.remove_event('testuser', 'testcal', 'dummyuid') self.assertTrue(result) def test_error_handling(self): # Force errors by passing invalid arguments, should return False self.assertFalse(self.manager.create_calendar(None, None)) self.assertFalse(self.manager.add_event(None, None, None)) self.assertFalse(self.manager.remove_calendar(None, None)) self.assertFalse(self.manager.remove_event(None, None, None)) # --- New tests below --- def test_create_calendar_user_creates_and_persists(self): with patch.object(self.manager, '_sync_users_to_cell_config'): result = self.manager.create_calendar_user('alice', 'password123') self.assertTrue(result) users = self.manager._load_users() self.assertEqual(len(users), 1) self.assertEqual(users[0]['username'], 'alice') self.assertNotIn('password', users[0]) def test_create_calendar_user_duplicate_returns_false(self): with patch.object(self.manager, '_sync_users_to_cell_config'): self.manager.create_calendar_user('alice', 'password123') result = self.manager.create_calendar_user('alice', 'other') self.assertFalse(result) users = self.manager._load_users() self.assertEqual(len(users), 1) def test_create_calendar_user_creates_user_directory(self): with patch.object(self.manager, '_sync_users_to_cell_config'): self.manager.create_calendar_user('alice', 'password123') user_dir = os.path.join(self.manager.calendar_data_dir, 'users', 'alice') self.assertTrue(os.path.exists(user_dir)) def test_delete_calendar_user_removes_user(self): with patch.object(self.manager, '_sync_users_to_cell_config'): self.manager.create_calendar_user('alice', 'password123') with patch.object(self.manager, '_sync_users_to_cell_config'): result = self.manager.delete_calendar_user('alice') self.assertTrue(result) users = self.manager._load_users() self.assertEqual(len(users), 0) def test_delete_calendar_user_nonexistent_returns_false(self): result = self.manager.delete_calendar_user('nobody') self.assertFalse(result) def test_delete_calendar_user_removes_directory(self): with patch.object(self.manager, '_sync_users_to_cell_config'): self.manager.create_calendar_user('alice', 'password123') user_dir = os.path.join(self.manager.calendar_data_dir, 'users', 'alice') self.assertTrue(os.path.exists(user_dir)) with patch.object(self.manager, '_sync_users_to_cell_config'): self.manager.delete_calendar_user('alice') self.assertFalse(os.path.exists(user_dir)) def test_get_calendar_users_empty(self): users = self.manager.get_calendar_users() self.assertEqual(users, []) def test_get_calendar_users_returns_created(self): with patch.object(self.manager, '_sync_users_to_cell_config'): self.manager.create_calendar_user('alice', 'pass') self.manager.create_calendar_user('bob', 'pass') users = self.manager.get_calendar_users() self.assertEqual(len(users), 2) usernames = [u['username'] for u in users] self.assertIn('alice', usernames) self.assertIn('bob', usernames) def test_create_calendar_real_persists(self): result = self.manager.create_calendar('alice', 'personal') self.assertTrue(result) calendars = self.manager._load_calendars() self.assertEqual(len(calendars), 1) cal = calendars[0] self.assertEqual(cal['username'], 'alice') self.assertEqual(cal['name'], 'personal') def test_create_calendar_duplicate_returns_false(self): self.manager.create_calendar('alice', 'personal') result = self.manager.create_calendar('alice', 'personal') self.assertFalse(result) def test_create_calendar_with_description_and_color(self): result = self.manager.create_calendar('alice', 'work', description='Work stuff', color='#ff0000') self.assertTrue(result) calendars = self.manager._load_calendars() cal = calendars[0] self.assertEqual(cal['description'], 'Work stuff') self.assertEqual(cal['color'], '#ff0000') def test_create_calendar_updates_user_count(self): with patch.object(self.manager, '_sync_users_to_cell_config'): self.manager.create_calendar_user('alice', 'pass') self.manager.create_calendar('alice', 'personal') users = self.manager._load_users() alice = next(u for u in users if u['username'] == 'alice') self.assertEqual(alice['calendars_count'], 1) def test_remove_calendar_real_removes(self): self.manager.create_calendar('alice', 'personal') result = self.manager.remove_calendar('alice', 'personal') self.assertTrue(result) calendars = self.manager._load_calendars() self.assertEqual(len(calendars), 0) def test_remove_calendar_nonexistent_returns_true(self): """Removing a non-existent calendar is idempotent (returns True).""" result = self.manager.remove_calendar('alice', 'nonexistent') self.assertTrue(result) def test_add_event_real_persists(self): result = self.manager.add_event('alice', 'personal', {'summary': 'Meeting'}) self.assertTrue(result) events = self.manager._load_events() self.assertEqual(len(events), 1) self.assertEqual(events[0]['summary'], 'Meeting') self.assertEqual(events[0]['username'], 'alice') self.assertEqual(events[0]['calendar'], 'personal') def test_add_event_assigns_uid_if_missing(self): self.manager.add_event('alice', 'personal', {'summary': 'Test'}) events = self.manager._load_events() self.assertIn('uid', events[0]) def test_add_event_preserves_existing_uid(self): self.manager.add_event('alice', 'personal', {'summary': 'Test', 'uid': 'my-uid-123'}) events = self.manager._load_events() self.assertEqual(events[0]['uid'], 'my-uid-123') def test_remove_event_real_removes_by_uid(self): self.manager.add_event('alice', 'personal', {'summary': 'Test', 'uid': 'uid-1'}) result = self.manager.remove_event('alice', 'personal', 'uid-1') self.assertTrue(result) events = self.manager._load_events() self.assertEqual(len(events), 0) def test_remove_event_does_not_remove_wrong_uid(self): self.manager.add_event('alice', 'personal', {'summary': 'Test', 'uid': 'uid-1'}) self.manager.add_event('alice', 'personal', {'summary': 'Other', 'uid': 'uid-2'}) self.manager.remove_event('alice', 'personal', 'uid-1') events = self.manager._load_events() self.assertEqual(len(events), 1) self.assertEqual(events[0]['uid'], 'uid-2') def test_create_calendar_event_persists(self): result = self.manager.create_calendar_event( 'alice', 'personal', 'Team meeting', '2026-01-01T09:00:00', '2026-01-01T10:00:00', description='Weekly sync', location='Office') self.assertTrue(result) events = self.manager._load_events() self.assertEqual(len(events), 1) ev = events[0] self.assertEqual(ev['title'], 'Team meeting') self.assertEqual(ev['username'], 'alice') def test_create_calendar_event_updates_calendar_count(self): self.manager.create_calendar('alice', 'personal') self.manager.create_calendar_event( 'alice', 'personal', 'Sync', '2026-01-01T09:00:00', '2026-01-01T10:00:00') calendars = self.manager._load_calendars() self.assertEqual(calendars[0]['events_count'], 1) def test_get_calendar_events_filters_by_user_and_calendar(self): self.manager.create_calendar_event( 'alice', 'personal', 'Alice event', '2026-01-01T09:00', '2026-01-01T10:00') self.manager.create_calendar_event( 'bob', 'personal', 'Bob event', '2026-01-01T09:00', '2026-01-01T10:00') alice_events = self.manager.get_calendar_events('alice', 'personal') self.assertEqual(len(alice_events), 1) self.assertEqual(alice_events[0]['title'], 'Alice event') def test_get_calendar_events_date_filter(self): self.manager.create_calendar_event( 'alice', 'personal', 'Jan event', '2026-01-15T09:00', '2026-01-15T10:00') self.manager.create_calendar_event( 'alice', 'personal', 'Feb event', '2026-02-15T09:00', '2026-02-15T10:00') filtered = self.manager.get_calendar_events( 'alice', 'personal', start_date='2026-01-01', end_date='2026-01-31') self.assertEqual(len(filtered), 1) self.assertEqual(filtered[0]['title'], 'Jan event') def test_get_calendar_status_returns_users(self): with patch.object(self.manager, '_sync_users_to_cell_config'): self.manager.create_calendar_user('alice', 'pass') status = self.manager.get_calendar_status() self.assertIn('users', status) self.assertEqual(len(status['users']), 1) self.assertEqual(status['users'][0]['username'], 'alice') def test_get_metrics_empty(self): with patch.object(self.manager, '_check_calendar_status', return_value=False): metrics = self.manager.get_metrics() self.assertIn('users_count', metrics) self.assertIn('calendars_count', metrics) self.assertIn('events_count', metrics) self.assertEqual(metrics['users_count'], 0) def test_get_metrics_with_data(self): with patch.object(self.manager, '_sync_users_to_cell_config'): self.manager.create_calendar_user('alice', 'pass') self.manager.create_calendar('alice', 'personal') self.manager.add_event('alice', 'personal', {'summary': 'Evt'}) with patch.object(self.manager, '_check_calendar_status', return_value=True): metrics = self.manager.get_metrics() self.assertEqual(metrics['users_count'], 1) self.assertEqual(metrics['calendars_count'], 1) self.assertEqual(metrics['events_count'], 1) def test_apply_config_no_port_key(self): result = self.manager.apply_config({}) self.assertEqual(result['restarted'], []) def test_apply_config_updates_radicale_hosts(self): # Generate config first self.manager._generate_radicale_config() result = self.manager.apply_config({'port': 5233}) self.assertEqual(result['restarted'], []) config_file = os.path.join(self.manager.radicale_dir, 'config') with open(config_file) as f: content = f.read() self.assertIn('hosts = 0.0.0.0:5233', content) def test_apply_config_no_radicale_file_is_safe(self): """apply_config doesn't crash if radicale config file is missing.""" config_file = os.path.join(self.manager.radicale_dir, 'config') if os.path.exists(config_file): os.remove(config_file) result = self.manager.apply_config({'port': 5234}) # Should not raise; warnings list may or may not be empty self.assertIn('warnings', result) def test_write_radicale_htpasswd_creates_entry(self): """_write_radicale_htpasswd writes a bcrypt entry for the user.""" htpasswd = self.manager._radicale_htpasswd_path() os.makedirs(os.path.dirname(htpasswd), exist_ok=True) self.manager._write_radicale_htpasswd('alice', 'mypassword') self.assertTrue(os.path.exists(htpasswd)) with open(htpasswd) as f: content = f.read() self.assertIn('alice:', content) def test_write_radicale_htpasswd_updates_existing_entry(self): """_write_radicale_htpasswd replaces a user's old entry.""" htpasswd = self.manager._radicale_htpasswd_path() os.makedirs(os.path.dirname(htpasswd), exist_ok=True) self.manager._write_radicale_htpasswd('alice', 'pass1') self.manager._write_radicale_htpasswd('alice', 'pass2') with open(htpasswd) as f: lines = f.readlines() alice_lines = [l for l in lines if l.startswith('alice:')] self.assertEqual(len(alice_lines), 1) def test_remove_radicale_htpasswd_removes_entry(self): htpasswd = self.manager._radicale_htpasswd_path() os.makedirs(os.path.dirname(htpasswd), exist_ok=True) self.manager._write_radicale_htpasswd('alice', 'pass') self.manager._write_radicale_htpasswd('bob', 'pass') self.manager._remove_radicale_htpasswd('alice') with open(htpasswd) as f: content = f.read() self.assertNotIn('alice:', content) self.assertIn('bob:', content) def test_remove_radicale_htpasswd_no_file_is_safe(self): """_remove_radicale_htpasswd doesn't raise when the file doesn't exist.""" htpasswd = self.manager._radicale_htpasswd_path() if os.path.exists(htpasswd): os.remove(htpasswd) self.manager._remove_radicale_htpasswd('alice') # should not raise def test_write_radicale_htpasswd_no_config_dir_is_safe(self): """_write_radicale_htpasswd is a no-op when the config dir doesn't exist.""" # Don't create the config dir self.manager._write_radicale_htpasswd('alice', 'pass') htpasswd = self.manager._radicale_htpasswd_path() self.assertFalse(os.path.exists(htpasswd)) def test_test_database_connectivity_with_accessible_dir(self): result = self.manager._test_database_connectivity() self.assertIn('success', result) self.assertTrue(result['success']) def test_test_service_connectivity_unreachable(self): """_test_service_connectivity returns failure when cell-radicale isn't reachable.""" result = self.manager._test_service_connectivity() self.assertIn('success', result) # In test environment Radicale is not running, so should be False self.assertFalse(result['success']) def test_test_web_interface_unreachable(self): result = self.manager._test_web_interface() self.assertIn('success', result) self.assertFalse(result['success']) def test_restart_service_calls_container(self): with patch.object(self.manager, '_restart_container', return_value=True) as mock_restart: result = self.manager.restart_service() self.assertTrue(result) mock_restart.assert_called_once_with('cell-radicale') def test_restart_service_failure_returns_false(self): with patch.object(self.manager, '_restart_container', return_value=False): result = self.manager.restart_service() self.assertFalse(result) def test_sync_users_to_cell_config_best_effort(self): """_sync_users_to_cell_config failure is non-fatal.""" with patch('config_manager.ConfigManager', side_effect=Exception('no config')): # Should not raise self.manager._sync_users_to_cell_config() def test_check_calendar_status_returns_bool(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0, stdout=':5232 LISTEN') result = self.manager._check_calendar_status() self.assertIsInstance(result, bool) def test_check_calendar_status_false_when_no_port(self): with patch('subprocess.run') as mock_sub: mock_sub.return_value = MagicMock(returncode=0, stdout='no matching port') result = self.manager._check_calendar_status() self.assertFalse(result) def test_load_users_returns_empty_on_missing_file(self): users = self.manager._load_users() self.assertEqual(users, []) def test_load_calendars_returns_empty_on_missing_file(self): calendars = self.manager._load_calendars() self.assertEqual(calendars, []) def test_load_events_returns_empty_on_missing_file(self): events = self.manager._load_events() self.assertEqual(events, []) def test_load_users_handles_corrupt_file(self): with open(self.manager.users_file, 'w') as f: f.write('{corrupt') users = self.manager._load_users() self.assertEqual(users, []) def test_get_configured_port_default(self): port = self.manager._get_configured_port() self.assertEqual(port, 5232) def test_get_configured_port_from_config(self): with patch.object(self.manager, 'get_config', return_value={'port': 5555}): port = self.manager._get_configured_port() self.assertEqual(port, 5555) def test_test_connectivity_returns_dict(self): with patch.object(self.manager, '_test_service_connectivity', return_value={'success': False, 'message': ''}): with patch.object(self.manager, '_test_database_connectivity', return_value={'success': True, 'message': ''}): with patch.object(self.manager, '_test_web_interface', return_value={'success': False, 'message': ''}): result = self.manager.test_connectivity() self.assertIn('service_connectivity', result) self.assertIn('database_connectivity', result) self.assertIn('web_interface', result) self.assertIn('success', result) self.assertFalse(result['success']) if __name__ == '__main__': unittest.main()