diff --git a/api/config_manager.py b/api/config_manager.py index 7c9a60e..e9059f4 100644 --- a/api/config_manager.py +++ b/api/config_manager.py @@ -260,27 +260,8 @@ class ConfigManager: secrets_backup = backup_path / 'secrets.yaml' if secrets_backup.exists(): shutil.copy2(secrets_backup, self.secrets_file) - # Reload configurations + # Reload configurations — restore only what was in the backup self.configs = self._load_all_configs() - # Ensure all configs have required fields - for service, schema in self.service_schemas.items(): - config = self.configs.get(service, {}) - for field in schema['required']: - if field not in config: - # Set a default value based on type - t = schema['types'][field] - if t is int: - config[field] = 0 - elif t is str: - config[field] = '' - elif t is list: - config[field] = [] - elif t is bool: - config[field] = False - self.configs[service] = config - - # Write back to file - self._save_all_configs() logger.info(f"Restored configuration from backup: {backup_id}") return True except Exception as e: @@ -351,26 +332,10 @@ class ConfigManager: configs = yaml.safe_load(config_data) else: raise ValueError(f"Unsupported format: {format}") - # Validate and update each service config + # Import only services present in the data — don't fabricate missing ones for service, config in configs.items(): if service in self.service_schemas: self.update_service_config(service, config) - # Ensure all configs have required fields - for service, schema in self.service_schemas.items(): - config = self.get_service_config(service) - for field in schema['required']: - if field not in config: - t = schema['types'][field] - if t is int: - config[field] = 0 - elif t is str: - config[field] = '' - elif t is list: - config[field] = [] - elif t is bool: - config[field] = False - # Write back to file - self._save_all_configs() logger.info("Imported configurations successfully") return True except Exception as e: diff --git a/api/network_manager.py b/api/network_manager.py index 2ba3bde..4795b9f 100644 --- a/api/network_manager.py +++ b/api/network_manager.py @@ -339,9 +339,11 @@ class NetworkManager(BaseServiceManager): return {'restarted': restarted, 'warnings': warnings} def apply_domain(self, domain: str) -> Dict[str, Any]: - """Update domain in dnsmasq config and reload.""" + """Update domain across dnsmasq, Corefile, and zone file; reload DNS + DHCP.""" restarted = [] warnings = [] + + # 1. Update dnsmasq.conf domain= line try: dhcp_conf = os.path.join(self.config_dir, 'dhcp', 'dnsmasq.conf') if os.path.exists(dhcp_conf): @@ -356,7 +358,69 @@ class NetworkManager(BaseServiceManager): self._reload_dhcp_service() restarted.append('cell-dhcp (reloaded)') except Exception as e: - warnings.append(f"domain write to dnsmasq failed: {e}") + warnings.append(f"dnsmasq domain update failed: {e}") + + # 2. Update Corefile: replace old primary zone block with new domain + try: + corefile = os.path.join(self.config_dir, 'dns', 'Corefile') + if os.path.exists(corefile): + with open(corefile) as f: + content = f.read() + import re + # Replace first named zone block (not the catch-all .) with new domain + # Matches: { ... } blocks (zone names like "cell", "oldname") + def replace_zone(m): + zone = m.group(1) + if zone == '.': + return m.group(0) # keep catch-all + # Replace zone name with new domain; update file path reference + body = m.group(2) + body = re.sub(r'file\s+/data/\S+\.zone', + f'file /data/{domain}.zone', body) + return f'{domain} {{{body}}}' + new_content = re.sub( + r'(\S+)\s*\{([^}]*)\}', + replace_zone, content, flags=re.DOTALL + ) + with open(corefile, 'w') as f: + f.write(new_content) + except Exception as e: + warnings.append(f"Corefile domain update failed: {e}") + + # 3. Update zone file: rename and rewrite $ORIGIN / SOA + try: + dns_data = os.path.join(self.data_dir, 'dns') + if os.path.isdir(dns_data): + # Find existing primary zone file (anything not named 'local') + for fname in os.listdir(dns_data): + if fname.endswith('.zone') and 'local' not in fname: + src = os.path.join(dns_data, fname) + with open(src) as f: + zone_content = f.read() + # Detect old domain from $ORIGIN line + m = re.search(r'^\$ORIGIN\s+(\S+)', zone_content, re.MULTILINE) + old_origin = m.group(1).rstrip('.') if m else None + if old_origin and old_origin != domain: + zone_content = zone_content.replace( + f'{old_origin}.', f'{domain}.') + zone_content = re.sub( + r'^\$ORIGIN\s+\S+', f'$ORIGIN {domain}.', zone_content, flags=re.MULTILINE) + dst = os.path.join(dns_data, f'{domain}.zone') + with open(dst, 'w') as f: + f.write(zone_content) + if src != dst: + os.remove(src) + break + except Exception as e: + warnings.append(f"zone file domain update failed: {e}") + + # 4. Reload CoreDNS + try: + self._reload_dns_service() + restarted.append('cell-dns (reloaded)') + except Exception as e: + warnings.append(f"CoreDNS reload failed: {e}") + return {'restarted': restarted, 'warnings': warnings} def test_dns_resolution(self, domain: str) -> Dict: diff --git a/tests/test_config_manager.py b/tests/test_config_manager.py index 362d3a7..efd2453 100644 --- a/tests/test_config_manager.py +++ b/tests/test_config_manager.py @@ -222,5 +222,155 @@ class TestConfigManager(unittest.TestCase): changed = self.config_manager.has_config_changed('network', original_hash) self.assertTrue(changed) + def test_restore_does_not_zero_unconfigured_services(self): + """Restore must not inject zero-filled entries for services absent from backup.""" + # Only configure network before backup + self.config_manager.update_service_config('network', { + 'dns_port': 53, 'dhcp_range': '10.0.0.100,10.0.0.200,12h', 'ntp_servers': ['pool.ntp.org'] + }) + backup_id = self.config_manager.backup_config() + + # Restore into a fresh manager (simulates restoring to a clean install) + fresh_cfg_file = os.path.join(self.temp_dir, 'cell_config2.json') + fresh = ConfigManager(fresh_cfg_file, self.data_dir) + # Restore needs the backup_dir to match + fresh.backup_dir = self.config_manager.backup_dir + success = fresh.restore_config(backup_id) + self.assertTrue(success) + + # email was not in the backup — it must NOT appear with port=0 + email_cfg = fresh.get_service_config('email') + self.assertNotIn('smtp_port', email_cfg, + "restore must not inject zero-filled entries for services not in backup") + self.assertNotIn('imap_port', email_cfg) + + # network was in the backup — it must be intact + net_cfg = fresh.get_service_config('network') + self.assertEqual(net_cfg['dns_port'], 53) + + def test_restore_does_not_zero_import(self): + """import_config must not inject zero-filled entries for absent services.""" + export_data = json.dumps({ + 'network': {'dns_port': 53, 'dhcp_range': '10.0.0.100,10.0.0.200,12h', 'ntp_servers': []} + }) + success = self.config_manager.import_config(export_data) + self.assertTrue(success) + email_cfg = self.config_manager.get_service_config('email') + self.assertNotIn('smtp_port', email_cfg, + "import must not inject zero-filled entries for absent services") + + +class TestNetworkManagerApply(unittest.TestCase): + """Test apply_config / apply_domain actually write real config files.""" + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + self.data_dir = os.path.join(self.test_dir, 'data') + self.config_dir = os.path.join(self.test_dir, 'config') + os.makedirs(os.path.join(self.data_dir, 'dns'), exist_ok=True) + os.makedirs(os.path.join(self.config_dir, 'dhcp'), exist_ok=True) + os.makedirs(os.path.join(self.config_dir, 'ntp'), exist_ok=True) + + # Seed minimal config files + with open(os.path.join(self.config_dir, 'dhcp', 'dnsmasq.conf'), 'w') as f: + f.write('dhcp-range=10.0.0.100,10.0.0.200,12h\ndomain=cell\n') + with open(os.path.join(self.config_dir, 'ntp', 'chrony.conf'), 'w') as f: + f.write('server time.google.com iburst\nserver pool.ntp.org iburst\n') + + sys.path.insert(0, str(Path(__file__).parent.parent / 'api')) + from network_manager import NetworkManager + self.nm = NetworkManager(self.data_dir, self.config_dir) + + def tearDown(self): + shutil.rmtree(self.test_dir) + + @patch('subprocess.run') + def test_apply_config_writes_dhcp_range(self, mock_run): + mock_run.return_value = MagicMock(returncode=0) + result = self.nm.apply_config({'dhcp_range': '192.168.1.100,192.168.1.200,24h'}) + dhcp_conf = open(os.path.join(self.config_dir, 'dhcp', 'dnsmasq.conf')).read() + self.assertIn('192.168.1.100,192.168.1.200,24h', dhcp_conf) + self.assertIn('cell-dhcp', ' '.join(result['restarted'])) + + @patch('subprocess.run') + def test_apply_config_writes_ntp_servers(self, mock_run): + mock_run.return_value = MagicMock(returncode=0) + result = self.nm.apply_config({'ntp_servers': ['ntp1.example.com', 'ntp2.example.com']}) + ntp_conf = open(os.path.join(self.config_dir, 'ntp', 'chrony.conf')).read() + self.assertIn('server ntp1.example.com iburst', ntp_conf) + self.assertIn('server ntp2.example.com iburst', ntp_conf) + # Old servers must be gone + self.assertNotIn('time.google.com', ntp_conf) + self.assertIn('cell-ntp', result['restarted']) + + @patch('subprocess.run') + def test_apply_domain_updates_dnsmasq(self, mock_run): + mock_run.return_value = MagicMock(returncode=0) + result = self.nm.apply_domain('newdomain.local') + dhcp_conf = open(os.path.join(self.config_dir, 'dhcp', 'dnsmasq.conf')).read() + self.assertIn('domain=newdomain.local', dhcp_conf) + self.assertNotIn('domain=cell', dhcp_conf) + + @patch('subprocess.run') + def test_apply_domain_updates_corefile(self, mock_run): + """apply_domain must rewrite the Corefile zone name and reload CoreDNS.""" + mock_run.return_value = MagicMock(returncode=0) + # Create a Corefile with zone 'cell' + dns_conf_dir = os.path.join(self.config_dir, 'dns') + os.makedirs(dns_conf_dir, exist_ok=True) + corefile = os.path.join(dns_conf_dir, 'Corefile') + with open(corefile, 'w') as f: + f.write('. {\n forward . 8.8.8.8\n}\ncell {\n file /data/cell.zone\n log\n}\n') + # Create zone file + zone_file = os.path.join(self.data_dir, 'dns', 'cell.zone') + with open(zone_file, 'w') as f: + f.write('$ORIGIN cell.\n$TTL 300\n@ IN SOA ns1.cell. admin.cell. 2024010101 3600 900 604800 300\n') + + self.nm.apply_domain('newdomain.local') + + corefile_content = open(corefile).read() + self.assertIn('newdomain.local', corefile_content, + "Corefile must reference the new domain zone") + self.assertNotIn('\ncell {', corefile_content, + "Corefile must not keep old 'cell' zone block") + + +class TestEmailManagerApply(unittest.TestCase): + """Test email_manager.apply_config writes mailserver.env correctly.""" + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + self.data_dir = os.path.join(self.test_dir, 'data') + self.config_dir = os.path.join(self.test_dir, 'config') + os.makedirs(os.path.join(self.config_dir, 'mail'), exist_ok=True) + os.makedirs(os.path.join(self.data_dir, 'email'), exist_ok=True) + with open(os.path.join(self.config_dir, 'mail', 'mailserver.env'), 'w') as f: + f.write('OVERRIDE_HOSTNAME=mail.cell\nPOSTMASTER_ADDRESS=admin@cell\nLOG_LEVEL=warn\n') + + sys.path.insert(0, str(Path(__file__).parent.parent / 'api')) + from email_manager import EmailManager + self.em = EmailManager(self.data_dir, self.config_dir) + + def tearDown(self): + shutil.rmtree(self.test_dir) + + @patch('subprocess.run') + def test_apply_config_updates_mailserver_env(self, mock_run): + mock_run.return_value = MagicMock(returncode=0) + result = self.em.apply_config({'domain': 'example.local'}) + env = open(os.path.join(self.config_dir, 'mail', 'mailserver.env')).read() + self.assertIn('OVERRIDE_HOSTNAME=mail.example.local', env) + self.assertIn('POSTMASTER_ADDRESS=admin@example.local', env) + self.assertIn('LOG_LEVEL=warn', env, "other env vars must be preserved") + self.assertIn('cell-mail', result['restarted']) + + @patch('subprocess.run') + def test_apply_config_no_domain_no_restart(self, mock_run): + mock_run.return_value = MagicMock(returncode=0) + result = self.em.apply_config({'smtp_port': 587}) + # smtp_port alone doesn't restart cell-mail (no mailserver.env key to change) + self.assertEqual(result['restarted'], []) + + if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main()