fix: restore/import no longer zeros unconfigured services; domain change updates DNS

config_manager restore_config and import_config previously injected zero-filled
entries (port=0, domain='') for every service schema regardless of whether that
service was in the backup/import data. Removed this logic — only restore what's
actually in the backup.

network_manager.apply_domain now:
- updates dnsmasq.conf domain= line (reload cell-dhcp)
- rewrites Corefile zone blocks to the new domain name
- renames and rewrites the primary zone file $ORIGIN + SOA records
- reloads CoreDNS

Tests added first (TDD):
- test_restore_does_not_zero_unconfigured_services
- test_restore_does_not_zero_import
- test_apply_domain_updates_corefile (zone file + Corefile)
- test_apply_domain_updates_dnsmasq
- test_apply_config_writes_dhcp_range / ntp_servers
- test_apply_config_updates_mailserver_env / no_domain_no_restart

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-21 04:50:10 -04:00
parent 87ff50c378
commit ac9b26303f
3 changed files with 219 additions and 40 deletions
+2 -37
View File
@@ -260,27 +260,8 @@ class ConfigManager:
secrets_backup = backup_path / 'secrets.yaml'
if secrets_backup.exists():
shutil.copy2(secrets_backup, self.secrets_file)
# Reload configurations
# Reload configurations — restore only what was in the backup
self.configs = self._load_all_configs()
# Ensure all configs have required fields
for service, schema in self.service_schemas.items():
config = self.configs.get(service, {})
for field in schema['required']:
if field not in config:
# Set a default value based on type
t = schema['types'][field]
if t is int:
config[field] = 0
elif t is str:
config[field] = ''
elif t is list:
config[field] = []
elif t is bool:
config[field] = False
self.configs[service] = config
# Write back to file
self._save_all_configs()
logger.info(f"Restored configuration from backup: {backup_id}")
return True
except Exception as e:
@@ -351,26 +332,10 @@ class ConfigManager:
configs = yaml.safe_load(config_data)
else:
raise ValueError(f"Unsupported format: {format}")
# Validate and update each service config
# Import only services present in the data — don't fabricate missing ones
for service, config in configs.items():
if service in self.service_schemas:
self.update_service_config(service, config)
# Ensure all configs have required fields
for service, schema in self.service_schemas.items():
config = self.get_service_config(service)
for field in schema['required']:
if field not in config:
t = schema['types'][field]
if t is int:
config[field] = 0
elif t is str:
config[field] = ''
elif t is list:
config[field] = []
elif t is bool:
config[field] = False
# Write back to file
self._save_all_configs()
logger.info("Imported configurations successfully")
return True
except Exception as e:
+66 -2
View File
@@ -339,9 +339,11 @@ class NetworkManager(BaseServiceManager):
return {'restarted': restarted, 'warnings': warnings}
def apply_domain(self, domain: str) -> Dict[str, Any]:
"""Update domain in dnsmasq config and reload."""
"""Update domain across dnsmasq, Corefile, and zone file; reload DNS + DHCP."""
restarted = []
warnings = []
# 1. Update dnsmasq.conf domain= line
try:
dhcp_conf = os.path.join(self.config_dir, 'dhcp', 'dnsmasq.conf')
if os.path.exists(dhcp_conf):
@@ -356,7 +358,69 @@ class NetworkManager(BaseServiceManager):
self._reload_dhcp_service()
restarted.append('cell-dhcp (reloaded)')
except Exception as e:
warnings.append(f"domain write to dnsmasq failed: {e}")
warnings.append(f"dnsmasq domain update failed: {e}")
# 2. Update Corefile: replace old primary zone block with new domain
try:
corefile = os.path.join(self.config_dir, 'dns', 'Corefile')
if os.path.exists(corefile):
with open(corefile) as f:
content = f.read()
import re
# Replace first named zone block (not the catch-all .) with new domain
# Matches: <word> { ... } blocks (zone names like "cell", "oldname")
def replace_zone(m):
zone = m.group(1)
if zone == '.':
return m.group(0) # keep catch-all
# Replace zone name with new domain; update file path reference
body = m.group(2)
body = re.sub(r'file\s+/data/\S+\.zone',
f'file /data/{domain}.zone', body)
return f'{domain} {{{body}}}'
new_content = re.sub(
r'(\S+)\s*\{([^}]*)\}',
replace_zone, content, flags=re.DOTALL
)
with open(corefile, 'w') as f:
f.write(new_content)
except Exception as e:
warnings.append(f"Corefile domain update failed: {e}")
# 3. Update zone file: rename and rewrite $ORIGIN / SOA
try:
dns_data = os.path.join(self.data_dir, 'dns')
if os.path.isdir(dns_data):
# Find existing primary zone file (anything not named 'local')
for fname in os.listdir(dns_data):
if fname.endswith('.zone') and 'local' not in fname:
src = os.path.join(dns_data, fname)
with open(src) as f:
zone_content = f.read()
# Detect old domain from $ORIGIN line
m = re.search(r'^\$ORIGIN\s+(\S+)', zone_content, re.MULTILINE)
old_origin = m.group(1).rstrip('.') if m else None
if old_origin and old_origin != domain:
zone_content = zone_content.replace(
f'{old_origin}.', f'{domain}.')
zone_content = re.sub(
r'^\$ORIGIN\s+\S+', f'$ORIGIN {domain}.', zone_content, flags=re.MULTILINE)
dst = os.path.join(dns_data, f'{domain}.zone')
with open(dst, 'w') as f:
f.write(zone_content)
if src != dst:
os.remove(src)
break
except Exception as e:
warnings.append(f"zone file domain update failed: {e}")
# 4. Reload CoreDNS
try:
self._reload_dns_service()
restarted.append('cell-dns (reloaded)')
except Exception as e:
warnings.append(f"CoreDNS reload failed: {e}")
return {'restarted': restarted, 'warnings': warnings}
def test_dns_resolution(self, domain: str) -> Dict: