#!/usr/bin/env python3 """ Additional tests for ConfigManager covering untested utility methods: - set_identity_field - get_installed_services / set_installed_service / remove_installed_service - get_connectivity_config / set_connectivity_field - set_ddns_config / get_ddns_token / set_ddns_token - export_config yaml format - import_config yaml format + selective services - backup_config exception path (lines 424-426) - restore_config selective restore (lines 441-453) - _validate_vol_entry (unsafe container/path/name) - _save_all_configs OSError path """ import sys import os import json import tempfile import shutil import unittest from pathlib import Path from unittest.mock import patch, MagicMock api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) from config_manager import ConfigManager def _make_cm(tmp): """Create a ConfigManager with temp dirs.""" config_file = os.path.join(tmp, 'cell_config.json') data_dir = os.path.join(tmp, 'data') os.makedirs(data_dir, exist_ok=True) return ConfigManager(config_file, data_dir) class TestSetIdentityField(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.cm = _make_cm(self.tmp) def tearDown(self): shutil.rmtree(self.tmp) def test_set_identity_field_persists(self): self.cm.set_identity_field('cell_name', 'mycell') self.assertEqual(self.cm.configs['_identity']['cell_name'], 'mycell') def test_set_identity_field_creates_identity_if_missing(self): self.cm.configs.pop('_identity', None) self.cm.set_identity_field('domain', 'cell') self.assertIn('_identity', self.cm.configs) self.assertEqual(self.cm.configs['_identity']['domain'], 'cell') class TestInstalledServices(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.cm = _make_cm(self.tmp) def tearDown(self): shutil.rmtree(self.tmp) def test_get_installed_services_empty_by_default(self): result = self.cm.get_installed_services() self.assertIsInstance(result, dict) def test_set_installed_service_stores_record(self): self.cm.set_installed_service('gitea', {'version': '1.0', 'enabled': True}) self.assertIn('gitea', self.cm.get_installed_services()) def test_remove_installed_service_removes_entry(self): self.cm.set_installed_service('gitea', {'version': '1.0'}) self.cm.remove_installed_service('gitea') self.assertNotIn('gitea', self.cm.get_installed_services()) def test_remove_installed_service_not_present_does_not_raise(self): # Should not raise even if service was never installed self.cm.remove_installed_service('nonexistent') class TestConnectivityConfig(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.cm = _make_cm(self.tmp) def tearDown(self): shutil.rmtree(self.tmp) def test_get_connectivity_config_returns_dict_with_exits(self): result = self.cm.get_connectivity_config() self.assertIn('exits', result) self.assertIn('peer_exit_map', result) def test_get_connectivity_config_initializes_missing(self): self.cm.configs.pop('connectivity', None) result = self.cm.get_connectivity_config() self.assertIsInstance(result, dict) self.assertIn('exits', result) def test_set_connectivity_field_returns_true_on_success(self): result = self.cm.set_connectivity_field('exits', {'vpn1': {'host': '10.0.0.1'}}) self.assertTrue(result) self.assertIn('exits', self.cm.configs.get('connectivity', {})) def test_set_connectivity_field_returns_false_on_save_error(self): with patch.object(self.cm, '_save_all_configs', side_effect=OSError('disk full')): result = self.cm.set_connectivity_field('exits', {}) self.assertFalse(result) class TestDdnsConfig(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.cm = _make_cm(self.tmp) def tearDown(self): shutil.rmtree(self.tmp) def test_set_ddns_config_strips_token(self): self.cm.set_ddns_config({'hostname': 'pic.ngo', 'token': 'SECRET'}) ddns = self.cm.configs.get('ddns', {}) self.assertNotIn('token', ddns) self.assertEqual(ddns.get('hostname'), 'pic.ngo') def test_set_ddns_token_writes_to_file(self): self.cm.set_ddns_token('mytoken123') token_path = self.cm._ddns_token_path self.assertTrue(token_path.exists()) self.assertEqual(token_path.read_text().strip(), 'mytoken123') def test_get_ddns_token_reads_from_file(self): self.cm.set_ddns_token('readmetoken') result = self.cm.get_ddns_token() self.assertEqual(result, 'readmetoken') def test_get_ddns_token_migrates_from_configs(self): # Legacy token stored in cell_config.json self.cm.configs['ddns'] = {'hostname': 'pic.ngo', 'token': 'oldtoken'} result = self.cm.get_ddns_token() self.assertEqual(result, 'oldtoken') # After migration, should be in file self.assertTrue(self.cm._ddns_token_path.exists()) def test_set_ddns_token_oserror_does_not_raise(self): with patch('builtins.open', side_effect=OSError('no space')): with patch.object(Path, 'parent', new_callable=lambda: property(lambda self: Path(self.name).parent)): # Just make sure no exception propagates try: self.cm.set_ddns_token('tok') except Exception: pass def test_set_ddns_token_removes_legacy_token_from_config(self): self.cm.configs['ddns'] = {'hostname': 'pic.ngo', 'token': 'legacytok'} self.cm.set_ddns_token('newtok') # Legacy token should be removed from in-memory config ddns = self.cm.configs.get('ddns', {}) self.assertNotIn('token', ddns) class TestExportImportConfigExtra(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.cm = _make_cm(self.tmp) def tearDown(self): shutil.rmtree(self.tmp) def test_export_config_yaml_format(self): self.cm.update_service_config('network', {'dns_port': 53}) result = self.cm.export_config('yaml') self.assertIn('network', result) def test_export_config_filters_by_services(self): self.cm.update_service_config('network', {'dns_port': 53}) self.cm.update_service_config('wireguard', {'port': 51820}) result = self.cm.export_config('json', services=['network']) data = json.loads(result) self.assertIn('network', data) self.assertNotIn('wireguard', data) def test_import_config_yaml_format(self): yaml_data = 'network:\n dns_port: 53\n' result = self.cm.import_config(yaml_data, 'yaml') self.assertTrue(result) def test_import_config_filters_by_services(self): data = json.dumps({'network': {'dns_port': 53}, 'wireguard': {'port': 51820}}) result = self.cm.import_config(data, 'json', services=['network']) self.assertTrue(result) self.assertEqual(self.cm.configs.get('network', {}).get('dns_port'), 53) def test_import_config_unsupported_format_returns_false(self): result = self.cm.import_config('', 'xml') self.assertFalse(result) def test_import_config_with_identity(self): data = json.dumps({'identity': {'cell_name': 'imported_cell'}}) result = self.cm.import_config(data, 'json') self.assertTrue(result) self.assertEqual( self.cm.configs.get('_identity', {}).get('cell_name'), 'imported_cell' ) class TestBackupRestoreExtra(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.cm = _make_cm(self.tmp) def tearDown(self): shutil.rmtree(self.tmp) def test_backup_config_exception_reraises(self): # Force an exception by making shutil.copy2 raise after backup_path is created with patch('shutil.copy2', side_effect=OSError('disk full')): # backup_config reraises on exception with self.assertRaises(Exception): self.cm.backup_config() def test_restore_config_selective_services(self): # Create a real backup first backup_id = self.cm.backup_config() # Change a config value then restore selectively self.cm.configs.setdefault('network', {})['dns_port'] = 9999 result = self.cm.restore_config(backup_id, services=['network']) self.assertTrue(result) def test_restore_config_nonexistent_backup_returns_false(self): result = self.cm.restore_config('backup_nonexistent_999') self.assertFalse(result) class TestSaveAllConfigsError(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.cm = _make_cm(self.tmp) def tearDown(self): shutil.rmtree(self.tmp) def test_save_all_configs_permission_error_is_logged(self): # Replace the config_file path with something that will fail to write with patch('builtins.open', side_effect=PermissionError('no permission')): # Should not raise self.cm._save_all_configs() class TestValidateVolEntry(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.cm = _make_cm(self.tmp) def tearDown(self): shutil.rmtree(self.tmp) def test_valid_vol_entry_returns_true(self): result = self.cm._validate_vol_entry('email', { 'container': 'cell-mail', 'path': '/data/mail', 'name': 'mail_data' }) self.assertTrue(result) def test_unsafe_container_name_returns_false(self): result = self.cm._validate_vol_entry('email', { 'container': '../../../etc/passwd', 'path': '/data', 'name': 'safe_name' }) self.assertFalse(result) def test_unsafe_path_traversal_returns_false(self): result = self.cm._validate_vol_entry('email', { 'container': 'cell-mail', 'path': '/data/../etc', 'name': 'safe_name' }) self.assertFalse(result) def test_path_not_starting_with_slash_returns_false(self): result = self.cm._validate_vol_entry('email', { 'container': 'cell-mail', 'path': 'relative/path', 'name': 'safe_name' }) self.assertFalse(result) def test_unsafe_vol_name_returns_false(self): result = self.cm._validate_vol_entry('email', { 'container': 'cell-mail', 'path': '/data/mail', 'name': 'name with spaces!' }) self.assertFalse(result) if __name__ == '__main__': unittest.main()