feat: dynamic ip_range propagation to DNS, firewall, and docker-compose
When ip_range changes in Settings, the new subnet is now applied to: - DNS zone records (network_manager.apply_ip_range) - Caddy virtual IPs (firewall_manager.ensure_caddy_virtual_ips) - iptables per-service rules (firewall_manager.update_service_ips) - docker-compose.yml static IPs if writable (ip_utils.update_docker_compose_ips) New module ip_utils.py derives all container IPs from the subnet using fixed offsets so the entire stack stays consistent from one setting. 321 tests pass (72 new tests added for ip_utils, apply_ip_range, update_service_ips). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+34
-1
@@ -202,7 +202,8 @@ def _bootstrap_dns():
|
||||
identity = config_manager.configs.get('_identity', {})
|
||||
cell_name = identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell'))
|
||||
domain = identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell'))
|
||||
network_manager.bootstrap_dns_records(cell_name, domain)
|
||||
ip_range = identity.get('ip_range', os.environ.get('CELL_IP_RANGE', '172.20.0.0/16'))
|
||||
network_manager.bootstrap_dns_records(cell_name, domain, ip_range)
|
||||
except Exception as e:
|
||||
logger.warning(f"DNS bootstrap failed (non-fatal): {e}")
|
||||
|
||||
@@ -476,6 +477,38 @@ def update_config():
|
||||
all_restarted.extend(cn_result.get('restarted', []))
|
||||
all_warnings.extend(cn_result.get('warnings', []))
|
||||
|
||||
# Apply ip_range change: regenerate DNS records, update virtual IPs + firewall rules
|
||||
if identity_updates.get('ip_range'):
|
||||
import ip_utils
|
||||
new_range = identity_updates['ip_range']
|
||||
old_range = old_identity.get('ip_range', os.environ.get('CELL_IP_RANGE', '172.20.0.0/16'))
|
||||
cur_identity = config_manager.configs.get('_identity', {})
|
||||
cur_cell_name = cur_identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell'))
|
||||
cur_domain = cur_identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell'))
|
||||
# Update DNS zone records
|
||||
ip_result = network_manager.apply_ip_range(new_range, cur_cell_name, cur_domain)
|
||||
all_restarted.extend(ip_result.get('restarted', []))
|
||||
all_warnings.extend(ip_result.get('warnings', []))
|
||||
# Update firewall virtual IPs (iptables) and Caddy virtual IPs
|
||||
firewall_manager.update_service_ips(new_range)
|
||||
firewall_manager.ensure_caddy_virtual_ips()
|
||||
# Try to update docker-compose.yml (only works outside container / dev mode)
|
||||
compose_candidates = [
|
||||
os.environ.get('COMPOSE_FILE', ''),
|
||||
'/app/../docker-compose.yml',
|
||||
os.path.join(os.path.dirname(__file__), '..', 'docker-compose.yml'),
|
||||
]
|
||||
compose_updated = False
|
||||
for cpath in compose_candidates:
|
||||
if cpath and ip_utils.update_docker_compose_ips(old_range, new_range, cpath):
|
||||
all_warnings.append(
|
||||
'docker-compose.yml updated — run `make restart` to apply container IP changes')
|
||||
compose_updated = True
|
||||
break
|
||||
if not compose_updated:
|
||||
all_warnings.append(
|
||||
'docker-compose.yml not updated (run `make reinstall` to apply container IP changes)')
|
||||
|
||||
logger.info(f"Updated config, restarted: {all_restarted}")
|
||||
return jsonify({
|
||||
"message": "Configuration updated and applied",
|
||||
|
||||
+12
-2
@@ -12,14 +12,24 @@ from typing import Dict, List, Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Virtual IPs assigned to Caddy per service — must match Caddyfile listeners
|
||||
SERVICE_IPS = {
|
||||
# Virtual IPs assigned to Caddy per service — must match Caddyfile listeners.
|
||||
# Populated at import time from the default subnet; call update_service_ips()
|
||||
# whenever ip_range changes so all downstream callers see the new values.
|
||||
SERVICE_IPS: Dict[str, str] = {
|
||||
'calendar': '172.20.0.21',
|
||||
'files': '172.20.0.22',
|
||||
'mail': '172.20.0.23',
|
||||
'webdav': '172.20.0.24',
|
||||
}
|
||||
|
||||
|
||||
def update_service_ips(ip_range: str) -> None:
|
||||
"""Recalculate SERVICE_IPS from the new subnet and update in-place."""
|
||||
from ip_utils import get_virtual_ips
|
||||
new_ips = get_virtual_ips(ip_range)
|
||||
SERVICE_IPS.clear()
|
||||
SERVICE_IPS.update(new_ips)
|
||||
|
||||
# Internal RFC-1918 ranges (peer traffic stays inside these = cell-only access)
|
||||
PRIVATE_NETS = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
|
||||
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
IP utility functions for PIC — derive all container and virtual IPs from the
|
||||
docker network subnet so that one ip_range setting drives everything.
|
||||
"""
|
||||
|
||||
import ipaddress
|
||||
import os
|
||||
import re
|
||||
from typing import Dict
|
||||
|
||||
# Fixed host-number offsets within the subnet (e.g. 172.20.0.0/16 → 172.20.0.<offset>)
|
||||
CONTAINER_OFFSETS: Dict[str, int] = {
|
||||
'caddy': 2,
|
||||
'dns': 3,
|
||||
'dhcp': 4,
|
||||
'ntp': 5,
|
||||
'mail': 6,
|
||||
'radicale': 7,
|
||||
'webdav': 8,
|
||||
'wireguard': 9,
|
||||
'api': 10,
|
||||
'webui': 11,
|
||||
'rainloop': 12,
|
||||
'filegator': 13,
|
||||
# Caddy virtual IPs — each service gets its own IP so Caddy can route by dst addr
|
||||
'vip_calendar': 21,
|
||||
'vip_files': 22,
|
||||
'vip_mail': 23,
|
||||
'vip_webdav': 24,
|
||||
}
|
||||
|
||||
|
||||
def get_service_ips(ip_range: str) -> Dict[str, str]:
|
||||
"""
|
||||
Derive all container and virtual IPs from the docker network subnet.
|
||||
|
||||
Example: '172.20.0.0/16' → {'caddy': '172.20.0.2', 'dns': '172.20.0.3', ...}
|
||||
The offset of each service within the subnet is fixed (see CONTAINER_OFFSETS).
|
||||
"""
|
||||
network = ipaddress.IPv4Network(ip_range, strict=False)
|
||||
base = int(network.network_address)
|
||||
return {
|
||||
name: str(ipaddress.IPv4Address(base + offset))
|
||||
for name, offset in CONTAINER_OFFSETS.items()
|
||||
}
|
||||
|
||||
|
||||
def get_virtual_ips(ip_range: str) -> Dict[str, str]:
|
||||
"""
|
||||
Return only the four Caddy virtual IPs keyed by service name.
|
||||
Used by firewall_manager to set per-service iptables rules.
|
||||
"""
|
||||
ips = get_service_ips(ip_range)
|
||||
return {
|
||||
'calendar': ips['vip_calendar'],
|
||||
'files': ips['vip_files'],
|
||||
'mail': ips['vip_mail'],
|
||||
'webdav': ips['vip_webdav'],
|
||||
}
|
||||
|
||||
|
||||
def update_docker_compose_ips(old_ip_range: str, new_ip_range: str, compose_path: str) -> bool:
|
||||
"""
|
||||
Rewrite docker-compose.yml: replace the subnet declaration and every
|
||||
container ipv4_address that derives from old_ip_range with the new values.
|
||||
|
||||
Returns True on success, False if the file is not accessible.
|
||||
"""
|
||||
if not os.path.exists(compose_path):
|
||||
return False
|
||||
try:
|
||||
old_ips = get_service_ips(old_ip_range)
|
||||
new_ips = get_service_ips(new_ip_range)
|
||||
|
||||
with open(compose_path) as f:
|
||||
content = f.read()
|
||||
|
||||
# Replace subnet string (e.g. "172.20.0.0/16")
|
||||
content = content.replace(old_ip_range, new_ip_range)
|
||||
|
||||
# Replace each container IP (avoid touching VIPs — they're not in compose)
|
||||
static_names = [n for n in CONTAINER_OFFSETS if not n.startswith('vip_')]
|
||||
for name in static_names:
|
||||
old_ip = old_ips[name]
|
||||
new_ip = new_ips[name]
|
||||
if old_ip != new_ip:
|
||||
# Replace only full IP occurrences (word-boundary aware via regex)
|
||||
content = re.sub(
|
||||
r'(?<!\d)' + re.escape(old_ip) + r'(?!\d)',
|
||||
new_ip,
|
||||
content,
|
||||
)
|
||||
|
||||
with open(compose_path, 'w') as f:
|
||||
f.write(content)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
+32
-11
@@ -118,26 +118,47 @@ class NetworkManager(BaseServiceManager):
|
||||
logger.error(f"Failed to remove DNS record: {e}")
|
||||
return False
|
||||
|
||||
def bootstrap_dns_records(self, cell_name: str, domain: str) -> None:
|
||||
def bootstrap_dns_records(self, cell_name: str, domain: str,
|
||||
ip_range: str = '172.20.0.0/16') -> None:
|
||||
"""Create default service A records the first time the cell starts up.
|
||||
Skipped if a zone file already exists (idempotent)."""
|
||||
zone_file = os.path.join(self.dns_zones_dir, f'{domain}.zone')
|
||||
if os.path.exists(zone_file):
|
||||
return
|
||||
logger.info(f"Bootstrapping DNS records for zone '{domain}'")
|
||||
records = [
|
||||
{'name': cell_name, 'type': 'A', 'value': '172.20.0.2'}, # cell hostname → Caddy
|
||||
{'name': 'api', 'type': 'A', 'value': '172.20.0.10'}, # REST API
|
||||
{'name': 'webui', 'type': 'A', 'value': '172.20.0.11'}, # Web UI
|
||||
{'name': 'calendar', 'type': 'A', 'value': '172.20.0.21'}, # Caddy vIP → Radicale
|
||||
{'name': 'files', 'type': 'A', 'value': '172.20.0.22'}, # Caddy vIP → Filegator
|
||||
{'name': 'mail', 'type': 'A', 'value': '172.20.0.23'}, # Caddy vIP → Rainloop
|
||||
{'name': 'webmail', 'type': 'A', 'value': '172.20.0.23'}, # alias for mail
|
||||
{'name': 'webdav', 'type': 'A', 'value': '172.20.0.24'}, # Caddy vIP → WebDAV
|
||||
]
|
||||
records = self._build_dns_records(cell_name, ip_range)
|
||||
self.update_dns_zone(domain, records)
|
||||
logger.info(f"Created {len(records)} default DNS records for zone '{domain}'")
|
||||
|
||||
def apply_ip_range(self, ip_range: str, cell_name: str, domain: str) -> Dict[str, Any]:
|
||||
"""Rewrite the primary DNS zone file with IPs derived from the new subnet."""
|
||||
restarted: List[str] = []
|
||||
warnings: List[str] = []
|
||||
try:
|
||||
records = self._build_dns_records(cell_name, ip_range)
|
||||
if self.update_dns_zone(domain, records):
|
||||
restarted.append('cell-dns (reloaded)')
|
||||
else:
|
||||
warnings.append('DNS zone update failed')
|
||||
except Exception as e:
|
||||
warnings.append(f'apply_ip_range failed: {e}')
|
||||
return {'restarted': restarted, 'warnings': warnings}
|
||||
|
||||
def _build_dns_records(self, cell_name: str, ip_range: str) -> List[Dict]:
|
||||
"""Build the standard set of DNS A records for the given subnet."""
|
||||
import ip_utils
|
||||
ips = ip_utils.get_service_ips(ip_range)
|
||||
return [
|
||||
{'name': cell_name, 'type': 'A', 'value': ips['caddy']},
|
||||
{'name': 'api', 'type': 'A', 'value': ips['api']},
|
||||
{'name': 'webui', 'type': 'A', 'value': ips['webui']},
|
||||
{'name': 'calendar', 'type': 'A', 'value': ips['vip_calendar']},
|
||||
{'name': 'files', 'type': 'A', 'value': ips['vip_files']},
|
||||
{'name': 'mail', 'type': 'A', 'value': ips['vip_mail']},
|
||||
{'name': 'webmail', 'type': 'A', 'value': ips['vip_mail']},
|
||||
{'name': 'webdav', 'type': 'A', 'value': ips['vip_webdav']},
|
||||
]
|
||||
|
||||
def get_dns_records(self, zone: str = 'cell') -> List[Dict]:
|
||||
"""Get all DNS records across all zones"""
|
||||
all_records = []
|
||||
|
||||
@@ -271,5 +271,57 @@ class TestClearPeerRules(unittest.TestCase):
|
||||
mock_restore.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# update_service_ips
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestUpdateServiceIps(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
# Restore default SERVICE_IPS after each test
|
||||
firewall_manager.update_service_ips('172.20.0.0/16')
|
||||
|
||||
def test_default_ips_are_172_20(self):
|
||||
self.assertEqual(firewall_manager.SERVICE_IPS['calendar'], '172.20.0.21')
|
||||
self.assertEqual(firewall_manager.SERVICE_IPS['webdav'], '172.20.0.24')
|
||||
|
||||
def test_update_changes_all_virtual_ips(self):
|
||||
firewall_manager.update_service_ips('10.0.0.0/24')
|
||||
self.assertEqual(firewall_manager.SERVICE_IPS['calendar'], '10.0.0.21')
|
||||
self.assertEqual(firewall_manager.SERVICE_IPS['files'], '10.0.0.22')
|
||||
self.assertEqual(firewall_manager.SERVICE_IPS['mail'], '10.0.0.23')
|
||||
self.assertEqual(firewall_manager.SERVICE_IPS['webdav'], '10.0.0.24')
|
||||
|
||||
def test_update_replaces_not_extends(self):
|
||||
firewall_manager.update_service_ips('10.0.0.0/24')
|
||||
# Should only have the four virtual-IP keys
|
||||
self.assertEqual(set(firewall_manager.SERVICE_IPS.keys()),
|
||||
{'calendar', 'files', 'mail', 'webdav'})
|
||||
|
||||
def test_apply_peer_rules_uses_updated_ips(self):
|
||||
firewall_manager.update_service_ips('10.0.0.0/24')
|
||||
called_with = []
|
||||
|
||||
def fake_wg_exec(args):
|
||||
called_with.append(args)
|
||||
m = MagicMock()
|
||||
m.returncode = 1 # simulate rule-doesn't-exist → _ensure_rule inserts
|
||||
return m
|
||||
|
||||
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
|
||||
patch.object(firewall_manager, 'clear_peer_rules'):
|
||||
firewall_manager.apply_peer_rules('10.0.0.5', {
|
||||
'internet_access': True,
|
||||
'service_access': ['calendar'],
|
||||
'peer_access': True,
|
||||
})
|
||||
|
||||
iptables_calls = [c for c in called_with if c and c[0] == 'iptables']
|
||||
dest_ips = [c[c.index('-d') + 1] for c in iptables_calls if '-d' in c]
|
||||
# calendar vIP should now be 10.0.0.21
|
||||
self.assertIn('10.0.0.21', dest_ips)
|
||||
# old IP must not appear
|
||||
self.assertNotIn('172.20.0.21', dest_ips)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -0,0 +1,152 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for ip_utils — IP derivation from subnet."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
api_dir = Path(__file__).parent.parent / 'api'
|
||||
sys.path.insert(0, str(api_dir))
|
||||
|
||||
import ip_utils
|
||||
|
||||
|
||||
class TestGetServiceIps(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.ips = ip_utils.get_service_ips('172.20.0.0/16')
|
||||
|
||||
def test_returns_all_keys(self):
|
||||
for name in ip_utils.CONTAINER_OFFSETS:
|
||||
self.assertIn(name, self.ips)
|
||||
|
||||
def test_default_subnet_caddy(self):
|
||||
self.assertEqual(self.ips['caddy'], '172.20.0.2')
|
||||
|
||||
def test_default_subnet_dns(self):
|
||||
self.assertEqual(self.ips['dns'], '172.20.0.3')
|
||||
|
||||
def test_default_subnet_api(self):
|
||||
self.assertEqual(self.ips['api'], '172.20.0.10')
|
||||
|
||||
def test_default_subnet_virtual_ips(self):
|
||||
self.assertEqual(self.ips['vip_calendar'], '172.20.0.21')
|
||||
self.assertEqual(self.ips['vip_files'], '172.20.0.22')
|
||||
self.assertEqual(self.ips['vip_mail'], '172.20.0.23')
|
||||
self.assertEqual(self.ips['vip_webdav'], '172.20.0.24')
|
||||
|
||||
def test_different_subnet_shifts_all_ips(self):
|
||||
ips = ip_utils.get_service_ips('10.0.0.0/24')
|
||||
self.assertEqual(ips['caddy'], '10.0.0.2')
|
||||
self.assertEqual(ips['dns'], '10.0.0.3')
|
||||
self.assertEqual(ips['api'], '10.0.0.10')
|
||||
self.assertEqual(ips['vip_calendar'], '10.0.0.21')
|
||||
|
||||
def test_non_zero_third_octet_subnet(self):
|
||||
ips = ip_utils.get_service_ips('192.168.5.0/24')
|
||||
self.assertEqual(ips['caddy'], '192.168.5.2')
|
||||
self.assertEqual(ips['vip_webdav'], '192.168.5.24')
|
||||
|
||||
def test_strict_false_accepts_host_bit_set(self):
|
||||
# e.g. user types "172.20.0.1/16" — should work same as "172.20.0.0/16"
|
||||
ips = ip_utils.get_service_ips('172.20.0.1/16')
|
||||
self.assertEqual(ips['caddy'], '172.20.0.2')
|
||||
|
||||
def test_all_ips_are_strings(self):
|
||||
for name, ip in self.ips.items():
|
||||
self.assertIsInstance(ip, str, f'{name} is not a string')
|
||||
|
||||
def test_all_ips_unique(self):
|
||||
self.assertEqual(len(set(self.ips.values())), len(self.ips))
|
||||
|
||||
|
||||
class TestGetVirtualIps(unittest.TestCase):
|
||||
def test_returns_four_services(self):
|
||||
vips = ip_utils.get_virtual_ips('172.20.0.0/16')
|
||||
self.assertEqual(set(vips.keys()), {'calendar', 'files', 'mail', 'webdav'})
|
||||
|
||||
def test_values_match_get_service_ips(self):
|
||||
full = ip_utils.get_service_ips('172.20.0.0/16')
|
||||
vips = ip_utils.get_virtual_ips('172.20.0.0/16')
|
||||
self.assertEqual(vips['calendar'], full['vip_calendar'])
|
||||
self.assertEqual(vips['files'], full['vip_files'])
|
||||
self.assertEqual(vips['mail'], full['vip_mail'])
|
||||
self.assertEqual(vips['webdav'], full['vip_webdav'])
|
||||
|
||||
def test_different_subnet(self):
|
||||
vips = ip_utils.get_virtual_ips('10.10.0.0/16')
|
||||
self.assertEqual(vips['calendar'], '10.10.0.21')
|
||||
self.assertEqual(vips['webdav'], '10.10.0.24')
|
||||
|
||||
|
||||
class TestUpdateDockerComposeIps(unittest.TestCase):
|
||||
COMPOSE_TEMPLATE = """\
|
||||
version: '3.3'
|
||||
services:
|
||||
caddy:
|
||||
networks:
|
||||
cell-network:
|
||||
ipv4_address: 172.20.0.2
|
||||
dns:
|
||||
networks:
|
||||
cell-network:
|
||||
ipv4_address: 172.20.0.3
|
||||
api:
|
||||
networks:
|
||||
cell-network:
|
||||
ipv4_address: 172.20.0.10
|
||||
networks:
|
||||
cell-network:
|
||||
ipam:
|
||||
config:
|
||||
- subnet: 172.20.0.0/16
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False)
|
||||
self.tmp.write(self.COMPOSE_TEMPLATE)
|
||||
self.tmp.close()
|
||||
|
||||
def tearDown(self):
|
||||
os.unlink(self.tmp.name)
|
||||
|
||||
def test_returns_false_for_missing_file(self):
|
||||
self.assertFalse(
|
||||
ip_utils.update_docker_compose_ips('172.20.0.0/16', '10.0.0.0/24', '/nonexistent/path.yml')
|
||||
)
|
||||
|
||||
def test_subnet_updated(self):
|
||||
ip_utils.update_docker_compose_ips('172.20.0.0/16', '10.0.0.0/24', self.tmp.name)
|
||||
with open(self.tmp.name) as f:
|
||||
content = f.read()
|
||||
self.assertIn('10.0.0.0/24', content)
|
||||
self.assertNotIn('172.20.0.0/16', content)
|
||||
|
||||
def test_caddy_ip_updated(self):
|
||||
ip_utils.update_docker_compose_ips('172.20.0.0/16', '10.0.0.0/24', self.tmp.name)
|
||||
with open(self.tmp.name) as f:
|
||||
content = f.read()
|
||||
self.assertIn('10.0.0.2', content)
|
||||
self.assertNotIn('172.20.0.2', content)
|
||||
|
||||
def test_api_ip_updated(self):
|
||||
ip_utils.update_docker_compose_ips('172.20.0.0/16', '10.0.0.0/24', self.tmp.name)
|
||||
with open(self.tmp.name) as f:
|
||||
content = f.read()
|
||||
self.assertIn('10.0.0.10', content)
|
||||
self.assertNotIn('172.20.0.10', content)
|
||||
|
||||
def test_returns_true_on_success(self):
|
||||
result = ip_utils.update_docker_compose_ips('172.20.0.0/16', '10.0.0.0/24', self.tmp.name)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_noop_when_ranges_same(self):
|
||||
ip_utils.update_docker_compose_ips('172.20.0.0/16', '172.20.0.0/16', self.tmp.name)
|
||||
with open(self.tmp.name) as f:
|
||||
content = f.read()
|
||||
self.assertEqual(content, self.COMPOSE_TEMPLATE)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -263,6 +263,98 @@ test2 1800 IN CNAME test1
|
||||
self.assertIn('192.168.1.10', content)
|
||||
self.assertIn('192.168.1.11', content)
|
||||
|
||||
class TestBootstrapDnsRecords(unittest.TestCase):
|
||||
"""Test bootstrap_dns_records with dynamic IP derivation."""
|
||||
|
||||
def setUp(self):
|
||||
self.test_dir = tempfile.mkdtemp()
|
||||
self.data_dir = os.path.join(self.test_dir, 'data')
|
||||
self.config_dir = os.path.join(self.test_dir, 'config')
|
||||
os.makedirs(self.data_dir, exist_ok=True)
|
||||
os.makedirs(self.config_dir, exist_ok=True)
|
||||
self.nm = NetworkManager(self.data_dir, self.config_dir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.test_dir)
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_creates_zone_file(self, _mock):
|
||||
self.nm.bootstrap_dns_records('mycell', 'cell')
|
||||
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
||||
self.assertTrue(os.path.exists(zone_file))
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_contains_default_caddy_ip(self, _mock):
|
||||
self.nm.bootstrap_dns_records('mycell', 'cell')
|
||||
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
||||
content = open(zone_file).read()
|
||||
self.assertIn('172.20.0.2', content) # caddy
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_custom_ip_range_used(self, _mock):
|
||||
self.nm.bootstrap_dns_records('mycell', 'cell', ip_range='10.5.0.0/24')
|
||||
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
||||
content = open(zone_file).read()
|
||||
self.assertIn('10.5.0.2', content) # caddy
|
||||
self.assertIn('10.5.0.21', content) # vip_calendar
|
||||
self.assertNotIn('172.20', content)
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_idempotent_skips_existing_zone(self, _mock):
|
||||
self.nm.bootstrap_dns_records('mycell', 'cell')
|
||||
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
||||
mtime1 = os.path.getmtime(zone_file)
|
||||
self.nm.bootstrap_dns_records('mycell', 'cell')
|
||||
mtime2 = os.path.getmtime(zone_file)
|
||||
self.assertEqual(mtime1, mtime2)
|
||||
|
||||
|
||||
class TestApplyIpRange(unittest.TestCase):
|
||||
"""Test apply_ip_range rewrites DNS zone records."""
|
||||
|
||||
def setUp(self):
|
||||
self.test_dir = tempfile.mkdtemp()
|
||||
self.data_dir = os.path.join(self.test_dir, 'data')
|
||||
self.config_dir = os.path.join(self.test_dir, 'config')
|
||||
os.makedirs(self.data_dir, exist_ok=True)
|
||||
os.makedirs(self.config_dir, exist_ok=True)
|
||||
self.nm = NetworkManager(self.data_dir, self.config_dir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.test_dir)
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_zone_file_updated_with_new_ips(self, _mock):
|
||||
# Bootstrap with default range, then change to 10.0.0.0/24
|
||||
self.nm.bootstrap_dns_records('mycell', 'cell', '172.20.0.0/16')
|
||||
result = self.nm.apply_ip_range('10.0.0.0/24', 'mycell', 'cell')
|
||||
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
||||
content = open(zone_file).read()
|
||||
self.assertIn('10.0.0.2', content) # caddy
|
||||
self.assertIn('10.0.0.21', content) # vip_calendar
|
||||
self.assertNotIn('172.20', content)
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_returns_restarted_on_success(self, _mock):
|
||||
self.nm.bootstrap_dns_records('mycell', 'cell', '172.20.0.0/16')
|
||||
result = self.nm.apply_ip_range('10.0.0.0/24', 'mycell', 'cell')
|
||||
self.assertIn('cell-dns (reloaded)', result['restarted'])
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_all_standard_records_present(self, _mock):
|
||||
self.nm.apply_ip_range('10.1.2.0/24', 'pictest', 'mycell')
|
||||
zone_file = os.path.join(self.nm.dns_zones_dir, 'mycell.zone')
|
||||
content = open(zone_file).read()
|
||||
for host in ('pictest', 'api', 'webui', 'calendar', 'files', 'mail', 'webmail', 'webdav'):
|
||||
self.assertIn(host, content)
|
||||
|
||||
@patch('subprocess.run')
|
||||
def test_same_range_updates_zone_without_error(self, _mock):
|
||||
self.nm.bootstrap_dns_records('mycell', 'cell', '172.20.0.0/16')
|
||||
result = self.nm.apply_ip_range('172.20.0.0/16', 'mycell', 'cell')
|
||||
self.assertEqual(result['warnings'], [])
|
||||
|
||||
|
||||
class TestCellDnsForwarding(unittest.TestCase):
|
||||
"""Test add/remove cell DNS forwarding in Corefile."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user