615448b875
When ip_range changes in Settings, the new subnet is now applied to: - DNS zone records (network_manager.apply_ip_range) - Caddy virtual IPs (firewall_manager.ensure_caddy_virtual_ips) - iptables per-service rules (firewall_manager.update_service_ips) - docker-compose.yml static IPs if writable (ip_utils.update_docker_compose_ips) New module ip_utils.py derives all container IPs from the subnet using fixed offsets so the entire stack stays consistent from one setting. 321 tests pass (72 new tests added for ip_utils, apply_ip_range, update_service_ips). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
833 lines
34 KiB
Python
833 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Network Manager for Personal Internet Cell
|
|
Handles DNS, DHCP, and NTP functionality
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from base_service_manager import BaseServiceManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class NetworkManager(BaseServiceManager):
|
|
"""Manages network services (DNS, DHCP, NTP)"""
|
|
|
|
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
|
|
super().__init__('network', data_dir, config_dir)
|
|
self.dns_zones_dir = os.path.join(data_dir, 'dns')
|
|
self.dhcp_leases_file = os.path.join(data_dir, 'dhcp', 'leases')
|
|
|
|
# Ensure directories exist
|
|
self.safe_makedirs(self.dns_zones_dir)
|
|
self.safe_makedirs(os.path.dirname(self.dhcp_leases_file))
|
|
|
|
def update_dns_zone(self, zone_name: str, records: List[Dict]) -> bool:
|
|
"""Update DNS zone file with new records"""
|
|
try:
|
|
zone_file = os.path.join(self.dns_zones_dir, f'{zone_name}.zone')
|
|
|
|
# Create zone file content
|
|
content = self._generate_zone_content(zone_name, records)
|
|
|
|
with open(zone_file, 'w') as f:
|
|
f.write(content)
|
|
|
|
# Reload DNS service
|
|
self._reload_dns_service()
|
|
|
|
logger.info(f"Updated DNS zone {zone_name} with {len(records)} records")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update DNS zone {zone_name}: {e}")
|
|
return False
|
|
|
|
def _generate_zone_content(self, zone_name: str, records: List[Dict]) -> str:
|
|
"""Generate DNS zone file content"""
|
|
timestamp = datetime.now().strftime('%Y%m%d%H')
|
|
|
|
content = f"""$TTL 3600
|
|
@ IN SOA {zone_name}. admin.{zone_name}. (
|
|
{timestamp} ; Serial
|
|
3600 ; Refresh
|
|
1800 ; Retry
|
|
1209600 ; Expire
|
|
3600 ; Minimum TTL
|
|
)
|
|
|
|
; Name servers
|
|
@ IN NS {zone_name}.
|
|
|
|
"""
|
|
|
|
# Add records
|
|
for record in records:
|
|
record_type = record.get('type', 'A')
|
|
name = record.get('name', '')
|
|
value = record.get('value', '')
|
|
ttl = record.get('ttl', '3600')
|
|
|
|
if name and value:
|
|
content += f"{name:<20} {ttl:<8} IN {record_type:<6} {value}\n"
|
|
|
|
return content
|
|
|
|
def add_dns_record(self, zone: str, name: str, record_type: str, value: str, ttl: int = 3600) -> bool:
|
|
"""Add a DNS record to a zone"""
|
|
try:
|
|
# Load existing records
|
|
records = self._load_dns_records(zone)
|
|
|
|
# Add new record
|
|
new_record = {
|
|
'name': name,
|
|
'type': record_type,
|
|
'value': value,
|
|
'ttl': ttl
|
|
}
|
|
|
|
# Remove existing record with same name and type
|
|
records = [r for r in records if not (r['name'] == name and r['type'] == record_type)]
|
|
records.append(new_record)
|
|
|
|
# Update zone
|
|
return self.update_dns_zone(zone, records)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to add DNS record: {e}")
|
|
return False
|
|
|
|
def remove_dns_record(self, zone: str, name: str, record_type: str = 'A') -> bool:
|
|
"""Remove a DNS record from a zone"""
|
|
try:
|
|
# Load existing records
|
|
records = self._load_dns_records(zone)
|
|
|
|
# Remove matching records
|
|
records = [r for r in records if not (r['name'] == name and r['type'] == record_type)]
|
|
|
|
# Update zone
|
|
return self.update_dns_zone(zone, records)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to remove DNS record: {e}")
|
|
return False
|
|
|
|
def bootstrap_dns_records(self, cell_name: str, domain: str,
|
|
ip_range: str = '172.20.0.0/16') -> None:
|
|
"""Create default service A records the first time the cell starts up.
|
|
Skipped if a zone file already exists (idempotent)."""
|
|
zone_file = os.path.join(self.dns_zones_dir, f'{domain}.zone')
|
|
if os.path.exists(zone_file):
|
|
return
|
|
logger.info(f"Bootstrapping DNS records for zone '{domain}'")
|
|
records = self._build_dns_records(cell_name, ip_range)
|
|
self.update_dns_zone(domain, records)
|
|
logger.info(f"Created {len(records)} default DNS records for zone '{domain}'")
|
|
|
|
def apply_ip_range(self, ip_range: str, cell_name: str, domain: str) -> Dict[str, Any]:
|
|
"""Rewrite the primary DNS zone file with IPs derived from the new subnet."""
|
|
restarted: List[str] = []
|
|
warnings: List[str] = []
|
|
try:
|
|
records = self._build_dns_records(cell_name, ip_range)
|
|
if self.update_dns_zone(domain, records):
|
|
restarted.append('cell-dns (reloaded)')
|
|
else:
|
|
warnings.append('DNS zone update failed')
|
|
except Exception as e:
|
|
warnings.append(f'apply_ip_range failed: {e}')
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
|
|
def _build_dns_records(self, cell_name: str, ip_range: str) -> List[Dict]:
|
|
"""Build the standard set of DNS A records for the given subnet."""
|
|
import ip_utils
|
|
ips = ip_utils.get_service_ips(ip_range)
|
|
return [
|
|
{'name': cell_name, 'type': 'A', 'value': ips['caddy']},
|
|
{'name': 'api', 'type': 'A', 'value': ips['api']},
|
|
{'name': 'webui', 'type': 'A', 'value': ips['webui']},
|
|
{'name': 'calendar', 'type': 'A', 'value': ips['vip_calendar']},
|
|
{'name': 'files', 'type': 'A', 'value': ips['vip_files']},
|
|
{'name': 'mail', 'type': 'A', 'value': ips['vip_mail']},
|
|
{'name': 'webmail', 'type': 'A', 'value': ips['vip_mail']},
|
|
{'name': 'webdav', 'type': 'A', 'value': ips['vip_webdav']},
|
|
]
|
|
|
|
def get_dns_records(self, zone: str = 'cell') -> List[Dict]:
|
|
"""Get all DNS records across all zones"""
|
|
all_records = []
|
|
try:
|
|
for fname in os.listdir(self.dns_zones_dir):
|
|
if fname.endswith('.zone'):
|
|
z = fname[:-5]
|
|
for rec in self._load_dns_records(z):
|
|
rec['zone'] = z
|
|
all_records.append(rec)
|
|
except Exception as e:
|
|
logger.error(f"Failed to list DNS records: {e}")
|
|
return all_records
|
|
|
|
def _load_dns_records(self, zone: str) -> List[Dict]:
|
|
"""Load DNS records from zone file"""
|
|
zone_file = os.path.join(self.dns_zones_dir, f'{zone}.zone')
|
|
|
|
if not os.path.exists(zone_file):
|
|
return []
|
|
|
|
records = []
|
|
try:
|
|
with open(zone_file, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
for line in lines:
|
|
line = line.strip().split(';')[0].strip() # strip inline comments
|
|
if not line or line.startswith('$'):
|
|
continue
|
|
parts = line.split()
|
|
# Support both: name IN type value (4 parts)
|
|
# and name TTL IN type value (5 parts)
|
|
if len(parts) == 4 and parts[1] in ('IN',) and parts[2] in ('A', 'CNAME', 'MX', 'TXT'):
|
|
records.append({'name': parts[0], 'ttl': '300', 'type': parts[2], 'value': parts[3]})
|
|
elif len(parts) >= 5:
|
|
record_type = parts[3]
|
|
if record_type in ('A', 'CNAME'):
|
|
records.append({
|
|
'name': parts[0],
|
|
'ttl': parts[1],
|
|
'type': record_type,
|
|
'value': parts[4]
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Failed to load DNS records: {e}")
|
|
|
|
return records
|
|
|
|
def get_dhcp_leases(self) -> List[Dict]:
|
|
"""Get current DHCP leases"""
|
|
leases = []
|
|
|
|
try:
|
|
if os.path.exists(self.dhcp_leases_file):
|
|
with open(self.dhcp_leases_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
parts = line.split()
|
|
if len(parts) >= 4:
|
|
leases.append({
|
|
'mac': parts[1],
|
|
'ip': parts[2],
|
|
'hostname': parts[3] if len(parts) > 3 else '',
|
|
'timestamp': parts[0]
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Failed to load DHCP leases: {e}")
|
|
|
|
return leases
|
|
|
|
def add_dhcp_reservation(self, mac: str, ip: str, hostname: str = '') -> bool:
|
|
"""Add a DHCP reservation"""
|
|
try:
|
|
reservation_file = os.path.join(self.config_dir, 'dhcp', 'reservations.conf')
|
|
|
|
# Ensure directory exists
|
|
self.safe_makedirs(os.path.dirname(reservation_file))
|
|
|
|
# Add reservation
|
|
with open(reservation_file, 'a') as f:
|
|
f.write(f"dhcp-host={mac},{ip},{hostname}\n")
|
|
|
|
# Reload DHCP service
|
|
self._reload_dhcp_service()
|
|
|
|
logger.info(f"Added DHCP reservation: {mac} -> {ip}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to add DHCP reservation: {e}")
|
|
return False
|
|
|
|
def remove_dhcp_reservation(self, mac: str) -> bool:
|
|
"""Remove a DHCP reservation"""
|
|
try:
|
|
reservation_file = os.path.join(self.config_dir, 'dhcp', 'reservations.conf')
|
|
|
|
if not os.path.exists(reservation_file):
|
|
return True
|
|
|
|
# Read existing reservations
|
|
with open(reservation_file, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
# Remove matching reservation
|
|
lines = [line for line in lines if not line.startswith(f"dhcp-host={mac},")]
|
|
|
|
# Write back
|
|
with open(reservation_file, 'w') as f:
|
|
f.writelines(lines)
|
|
|
|
# Reload DHCP service
|
|
self._reload_dhcp_service()
|
|
|
|
logger.info(f"Removed DHCP reservation: {mac}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to remove DHCP reservation: {e}")
|
|
return False
|
|
|
|
def get_ntp_status(self) -> Dict:
|
|
"""Get NTP service status"""
|
|
try:
|
|
# Check if NTP service is running
|
|
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-ntp', '--format', '{{.Names}}'],
|
|
capture_output=True, text=True)
|
|
|
|
is_running = len(result.stdout.strip()) > 0
|
|
|
|
# Get NTP statistics if running
|
|
stats = {}
|
|
if is_running:
|
|
try:
|
|
result = subprocess.run(['docker', 'exec', 'cell-ntp', 'chronyc', 'tracking'],
|
|
capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
stats['tracking'] = result.stdout
|
|
|
|
result = subprocess.run(['docker', 'exec', 'cell-ntp', 'chronyc', 'sources'],
|
|
capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
stats['sources'] = result.stdout
|
|
except Exception as e:
|
|
logger.error(f"Failed to get NTP stats: {e}")
|
|
|
|
return {
|
|
'running': is_running,
|
|
'stats': stats
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get NTP status: {e}")
|
|
return {'running': False, 'stats': {}}
|
|
|
|
def _reload_dns_service(self):
|
|
"""Reload DNS service"""
|
|
try:
|
|
subprocess.run(['docker', 'exec', 'cell-dns', 'kill', '-HUP', '1'],
|
|
capture_output=True, timeout=10)
|
|
except Exception as e:
|
|
logger.error(f"Failed to reload DNS service: {e}")
|
|
|
|
def _reload_dhcp_service(self):
|
|
"""Reload DHCP service"""
|
|
try:
|
|
subprocess.run(['docker', 'exec', 'cell-dhcp', 'kill', '-HUP', '1'],
|
|
capture_output=True, timeout=10)
|
|
except Exception as e:
|
|
logger.error(f"Failed to reload DHCP service: {e}")
|
|
|
|
def apply_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Write config to real service files and reload/restart affected containers."""
|
|
restarted = []
|
|
warnings = []
|
|
dnsmasq_changed = False
|
|
|
|
# DHCP range
|
|
if 'dhcp_range' in config:
|
|
try:
|
|
dhcp_conf = os.path.join(self.config_dir, 'dhcp', 'dnsmasq.conf')
|
|
if os.path.exists(dhcp_conf):
|
|
with open(dhcp_conf) as f:
|
|
lines = f.readlines()
|
|
lines = [
|
|
f"dhcp-range={config['dhcp_range']}\n" if l.startswith('dhcp-range=') else l
|
|
for l in lines
|
|
]
|
|
with open(dhcp_conf, 'w') as f:
|
|
f.writelines(lines)
|
|
dnsmasq_changed = True
|
|
except Exception as e:
|
|
warnings.append(f"dhcp_range write failed: {e}")
|
|
|
|
# NTP servers
|
|
if 'ntp_servers' in config and config['ntp_servers']:
|
|
try:
|
|
ntp_conf = os.path.join(self.config_dir, 'ntp', 'chrony.conf')
|
|
if os.path.exists(ntp_conf):
|
|
with open(ntp_conf) as f:
|
|
lines = f.readlines()
|
|
# Remove existing server lines, add new ones
|
|
lines = [l for l in lines if not l.startswith('server ')]
|
|
new_servers = [f"server {s} iburst\n" for s in config['ntp_servers']]
|
|
lines = new_servers + lines
|
|
with open(ntp_conf, 'w') as f:
|
|
f.writelines(lines)
|
|
self._restart_container('cell-ntp')
|
|
restarted.append('cell-ntp')
|
|
except Exception as e:
|
|
warnings.append(f"ntp_servers write failed: {e}")
|
|
|
|
if dnsmasq_changed:
|
|
self._reload_dhcp_service()
|
|
restarted.append('cell-dhcp (reloaded)')
|
|
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
|
|
def apply_domain(self, domain: str) -> Dict[str, Any]:
|
|
"""Update domain across dnsmasq, Corefile, and zone file; reload DNS + DHCP."""
|
|
restarted = []
|
|
warnings = []
|
|
|
|
# 1. Update dnsmasq.conf domain= line
|
|
try:
|
|
dhcp_conf = os.path.join(self.config_dir, 'dhcp', 'dnsmasq.conf')
|
|
if os.path.exists(dhcp_conf):
|
|
with open(dhcp_conf) as f:
|
|
lines = f.readlines()
|
|
lines = [
|
|
f"domain={domain}\n" if l.startswith('domain=') else l
|
|
for l in lines
|
|
]
|
|
with open(dhcp_conf, 'w') as f:
|
|
f.writelines(lines)
|
|
self._reload_dhcp_service()
|
|
restarted.append('cell-dhcp (reloaded)')
|
|
except Exception as e:
|
|
warnings.append(f"dnsmasq domain update failed: {e}")
|
|
|
|
# 2. Update Corefile: replace old primary zone block with new domain
|
|
try:
|
|
corefile = os.path.join(self.config_dir, 'dns', 'Corefile')
|
|
if os.path.exists(corefile):
|
|
with open(corefile) as f:
|
|
content = f.read()
|
|
import re
|
|
# Replace first named zone block (not the catch-all .) with new domain
|
|
# Matches: <word> { ... } blocks (zone names like "cell", "oldname")
|
|
def replace_zone(m):
|
|
zone = m.group(1)
|
|
if zone == '.':
|
|
return m.group(0) # keep catch-all
|
|
# Replace zone name with new domain; update file path reference
|
|
body = m.group(2)
|
|
body = re.sub(r'file\s+/data/\S+\.zone',
|
|
f'file /data/{domain}.zone', body)
|
|
return f'{domain} {{{body}}}'
|
|
new_content = re.sub(
|
|
r'(\S+)\s*\{([^}]*)\}',
|
|
replace_zone, content, flags=re.DOTALL
|
|
)
|
|
with open(corefile, 'w') as f:
|
|
f.write(new_content)
|
|
except Exception as e:
|
|
warnings.append(f"Corefile domain update failed: {e}")
|
|
|
|
# 3. Update zone file: rename and rewrite $ORIGIN / SOA
|
|
try:
|
|
dns_data = os.path.join(self.data_dir, 'dns')
|
|
if os.path.isdir(dns_data):
|
|
# Find existing primary zone file (anything not named 'local')
|
|
for fname in os.listdir(dns_data):
|
|
if fname.endswith('.zone') and 'local' not in fname:
|
|
src = os.path.join(dns_data, fname)
|
|
with open(src) as f:
|
|
zone_content = f.read()
|
|
# Detect old domain from $ORIGIN line
|
|
m = re.search(r'^\$ORIGIN\s+(\S+)', zone_content, re.MULTILINE)
|
|
old_origin = m.group(1).rstrip('.') if m else None
|
|
if old_origin and old_origin != domain:
|
|
zone_content = zone_content.replace(
|
|
f'{old_origin}.', f'{domain}.')
|
|
zone_content = re.sub(
|
|
r'^\$ORIGIN\s+\S+', f'$ORIGIN {domain}.', zone_content, flags=re.MULTILINE)
|
|
dst = os.path.join(dns_data, f'{domain}.zone')
|
|
with open(dst, 'w') as f:
|
|
f.write(zone_content)
|
|
if src != dst:
|
|
os.remove(src)
|
|
break
|
|
except Exception as e:
|
|
warnings.append(f"zone file domain update failed: {e}")
|
|
|
|
# 4. Reload CoreDNS
|
|
try:
|
|
self._reload_dns_service()
|
|
restarted.append('cell-dns (reloaded)')
|
|
except Exception as e:
|
|
warnings.append(f"CoreDNS reload failed: {e}")
|
|
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
|
|
def apply_cell_name(self, old_name: str, new_name: str) -> Dict[str, Any]:
|
|
"""Update the cell hostname record in the primary DNS zone file."""
|
|
restarted = []
|
|
warnings = []
|
|
if not old_name or not new_name or old_name == new_name:
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
try:
|
|
dns_data = os.path.join(self.data_dir, 'dns')
|
|
if os.path.isdir(dns_data):
|
|
for fname in os.listdir(dns_data):
|
|
if fname.endswith('.zone') and 'local' not in fname:
|
|
zone_file = os.path.join(dns_data, fname)
|
|
with open(zone_file) as f:
|
|
content = f.read()
|
|
# Replace hostname record: old_name IN A ...
|
|
import re
|
|
content = re.sub(
|
|
rf'^{re.escape(old_name)}(\s+IN\s+A\s+)',
|
|
f'{new_name}\\1',
|
|
content, flags=re.MULTILINE
|
|
)
|
|
with open(zone_file, 'w') as f:
|
|
f.write(content)
|
|
break
|
|
self._reload_dns_service()
|
|
restarted.append('cell-dns (reloaded)')
|
|
except Exception as e:
|
|
warnings.append(f"cell_name DNS update failed: {e}")
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
|
|
def add_cell_dns_forward(self, domain: str, dns_ip: str) -> Dict[str, Any]:
|
|
"""Append a CoreDNS forwarding block for a remote cell's domain."""
|
|
restarted = []
|
|
warnings = []
|
|
try:
|
|
corefile = os.path.join(self.config_dir, 'dns', 'Corefile')
|
|
if not os.path.exists(corefile):
|
|
warnings.append('Corefile not found')
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
with open(corefile) as f:
|
|
content = f.read()
|
|
marker = f'# cell:{domain}'
|
|
if marker in content:
|
|
return {'restarted': restarted, 'warnings': warnings} # already present
|
|
forward_block = (
|
|
f'\n{marker}\n'
|
|
f'{domain} {{\n'
|
|
f' forward . {dns_ip}\n'
|
|
f' log\n'
|
|
f'}}\n'
|
|
)
|
|
with open(corefile, 'a') as f:
|
|
f.write(forward_block)
|
|
self._reload_dns_service()
|
|
restarted.append('cell-dns (reloaded)')
|
|
except Exception as e:
|
|
warnings.append(f'add_cell_dns_forward failed: {e}')
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
|
|
def remove_cell_dns_forward(self, domain: str) -> Dict[str, Any]:
|
|
"""Remove a CoreDNS forwarding block for a remote cell's domain."""
|
|
import re
|
|
restarted = []
|
|
warnings = []
|
|
try:
|
|
corefile = os.path.join(self.config_dir, 'dns', 'Corefile')
|
|
if not os.path.exists(corefile):
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
with open(corefile) as f:
|
|
content = f.read()
|
|
marker = f'# cell:{domain}'
|
|
if marker not in content:
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
new_content = re.sub(
|
|
rf'\n# cell:{re.escape(domain)}\n{re.escape(domain)}\s*\{{[^}}]*\}}\n',
|
|
'',
|
|
content,
|
|
flags=re.DOTALL,
|
|
)
|
|
with open(corefile, 'w') as f:
|
|
f.write(new_content)
|
|
self._reload_dns_service()
|
|
restarted.append('cell-dns (reloaded)')
|
|
except Exception as e:
|
|
warnings.append(f'remove_cell_dns_forward failed: {e}')
|
|
return {'restarted': restarted, 'warnings': warnings}
|
|
|
|
def test_dns_resolution(self, domain: str) -> Dict:
|
|
"""Test DNS resolution for a domain using Python socket."""
|
|
import socket
|
|
try:
|
|
results = socket.getaddrinfo(domain, None)
|
|
addrs = [r[4][0] for r in results]
|
|
return {'success': True, 'output': f"Resolved: {', '.join(addrs)}", 'error': ''}
|
|
except Exception as e:
|
|
return {'success': False, 'output': '', 'error': str(e)}
|
|
|
|
def test_dhcp_functionality(self) -> Dict:
|
|
"""Test DHCP functionality"""
|
|
try:
|
|
# Check if DHCP service is running
|
|
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-dhcp', '--format', '{{.Names}}'],
|
|
capture_output=True, text=True)
|
|
|
|
is_running = len(result.stdout.strip()) > 0
|
|
|
|
# Get DHCP leases
|
|
leases = self.get_dhcp_leases()
|
|
|
|
return {
|
|
'success': is_running,
|
|
'running': is_running,
|
|
'leases_count': len(leases),
|
|
'leases': leases
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to test DHCP functionality: {e}")
|
|
return {'success': False, 'running': False, 'leases_count': 0, 'leases': []}
|
|
|
|
def test_ntp_functionality(self) -> Dict:
|
|
"""Test NTP functionality"""
|
|
try:
|
|
# Check if NTP service is running
|
|
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-ntp', '--format', '{{.Names}}'],
|
|
capture_output=True, text=True)
|
|
|
|
is_running = len(result.stdout.strip()) > 0
|
|
|
|
# Test NTP query
|
|
ntp_test = {}
|
|
if is_running:
|
|
try:
|
|
result = subprocess.run(['docker', 'exec', 'cell-ntp', 'chronyc', 'tracking'],
|
|
capture_output=True, text=True, timeout=10)
|
|
ntp_test['tracking'] = result.returncode == 0
|
|
ntp_test['output'] = result.stdout
|
|
except Exception as e:
|
|
ntp_test['tracking'] = False
|
|
ntp_test['error'] = str(e)
|
|
|
|
return {
|
|
'success': is_running,
|
|
'running': is_running,
|
|
'ntp_test': ntp_test
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to test NTP functionality: {e}")
|
|
return {'success': False, 'running': False, 'ntp_test': {}}
|
|
|
|
def get_network_info(self) -> dict:
|
|
"""Return general network info: IP addresses, interfaces, gateway, DNS, etc."""
|
|
try:
|
|
info = {}
|
|
# Get network interfaces
|
|
result = subprocess.run(['ip', '-j', 'addr'], capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
import json as _json
|
|
info['interfaces'] = _json.loads(result.stdout)
|
|
else:
|
|
info['interfaces'] = []
|
|
# Get default gateway
|
|
result = subprocess.run(['ip', 'route', 'show', 'default'], capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
info['default_gateway'] = result.stdout.strip()
|
|
else:
|
|
info['default_gateway'] = ''
|
|
# Get DNS servers
|
|
resolv_conf = '/etc/resolv.conf'
|
|
dns_servers = []
|
|
try:
|
|
with open(resolv_conf, 'r') as f:
|
|
for line in f:
|
|
if line.startswith('nameserver'):
|
|
dns_servers.append(line.strip().split()[1])
|
|
except Exception:
|
|
pass
|
|
info['dns_servers'] = dns_servers
|
|
return info
|
|
except Exception as e:
|
|
logger.error(f"Failed to get network info: {e}")
|
|
return {'error': str(e)}
|
|
|
|
def get_dns_status(self) -> dict:
|
|
"""Return DNS service status and summary info."""
|
|
try:
|
|
# Check if DNS service is running
|
|
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-dns', '--format', '{{.Names}}'], capture_output=True, text=True)
|
|
is_running = len(result.stdout.strip()) > 0
|
|
# Get DNS records count (for all zones)
|
|
records_count = 0
|
|
try:
|
|
for fname in os.listdir(self.dns_zones_dir):
|
|
if fname.endswith('.zone'):
|
|
with open(os.path.join(self.dns_zones_dir, fname), 'r') as f:
|
|
for line in f:
|
|
if line.strip() and not line.startswith(';') and not line.startswith('$'):
|
|
parts = line.split()
|
|
if len(parts) >= 5 and parts[3] in ('A', 'CNAME'):
|
|
records_count += 1
|
|
except Exception:
|
|
pass
|
|
return {'running': is_running, 'records_count': records_count}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get DNS status: {e}")
|
|
return {'running': False, 'records_count': 0, 'error': str(e)}
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get network service status"""
|
|
try:
|
|
# Check if we're running in Docker environment
|
|
import os
|
|
is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true'
|
|
|
|
if is_docker:
|
|
# Check if network containers are actually running
|
|
dns_running = self._check_dns_container_status()
|
|
dhcp_running = self._check_dhcp_container_status()
|
|
ntp_running = self._check_ntp_container_status()
|
|
all_running = dns_running and dhcp_running and ntp_running
|
|
|
|
status = {
|
|
'dns_running': dns_running,
|
|
'dhcp_running': dhcp_running,
|
|
'ntp_running': ntp_running,
|
|
'running': all_running,
|
|
'status': 'online' if all_running else 'offline',
|
|
'network': {
|
|
'dns_running': dns_running,
|
|
'dhcp_running': dhcp_running,
|
|
'ntp_running': ntp_running,
|
|
'running': all_running,
|
|
'status': 'online' if all_running else 'offline'
|
|
},
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
else:
|
|
# Check actual service status in production
|
|
dns_running = self._check_dns_status()
|
|
dhcp_running = self._check_dhcp_status()
|
|
ntp_running = self._check_ntp_status()
|
|
|
|
status = {
|
|
'dns_running': dns_running,
|
|
'dhcp_running': dhcp_running,
|
|
'ntp_running': ntp_running,
|
|
'running': dns_running and dhcp_running and ntp_running,
|
|
'status': 'online' if (dns_running and dhcp_running and ntp_running) else 'offline',
|
|
'network': {
|
|
'dns_running': dns_running,
|
|
'dhcp_running': dhcp_running,
|
|
'ntp_running': ntp_running,
|
|
'running': dns_running and dhcp_running and ntp_running,
|
|
'status': 'online' if (dns_running and dhcp_running and ntp_running) else 'offline'
|
|
},
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return status
|
|
except Exception as e:
|
|
return self.handle_error(e, "get_status")
|
|
|
|
def _check_dns_container_status(self) -> bool:
|
|
"""Check if DNS Docker container is running"""
|
|
try:
|
|
import docker
|
|
client = docker.from_env()
|
|
containers = client.containers.list(filters={'name': 'cell-dns'})
|
|
return len(containers) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
def _check_dhcp_container_status(self) -> bool:
|
|
"""Check if DHCP Docker container is running"""
|
|
try:
|
|
import docker
|
|
client = docker.from_env()
|
|
containers = client.containers.list(filters={'name': 'cell-dhcp'})
|
|
return len(containers) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
def _check_ntp_container_status(self) -> bool:
|
|
"""Check if NTP Docker container is running"""
|
|
try:
|
|
import docker
|
|
client = docker.from_env()
|
|
containers = client.containers.list(filters={'name': 'cell-ntp'})
|
|
return len(containers) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
def test_connectivity(self) -> Dict[str, Any]:
|
|
"""Test network service connectivity"""
|
|
try:
|
|
dns_test = self.test_dns_resolution('google.com')
|
|
dhcp_test = self.test_dhcp_functionality()
|
|
ntp_test = self.test_ntp_functionality()
|
|
|
|
results = {
|
|
'dns_test': dns_test,
|
|
'dhcp_test': dhcp_test,
|
|
'ntp_test': ntp_test,
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
# Determine overall success
|
|
success = all(
|
|
result.get('success', False)
|
|
for result in [dns_test, dhcp_test, ntp_test]
|
|
)
|
|
results['success'] = success
|
|
|
|
# Add network key for compatibility
|
|
results['network'] = {
|
|
'dns_test': dns_test,
|
|
'dhcp_test': dhcp_test,
|
|
'ntp_test': ntp_test,
|
|
'success': success
|
|
}
|
|
|
|
return results
|
|
except Exception as e:
|
|
return self.handle_error(e, "test_connectivity")
|
|
|
|
def _check_dns_status(self) -> bool:
|
|
"""Check if DNS service is running"""
|
|
try:
|
|
result = subprocess.run(['systemctl', 'is-active', 'coredns'],
|
|
capture_output=True, text=True, timeout=5)
|
|
return result.returncode == 0 and result.stdout.strip() == 'active'
|
|
except Exception:
|
|
# Fallback: check if port 53 is listening
|
|
try:
|
|
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
|
|
return ':53 ' in result.stdout
|
|
except Exception:
|
|
return False
|
|
|
|
def _check_dhcp_status(self) -> bool:
|
|
"""Check if DHCP service is running"""
|
|
try:
|
|
result = subprocess.run(['systemctl', 'is-active', 'dnsmasq'],
|
|
capture_output=True, text=True, timeout=5)
|
|
return result.returncode == 0 and result.stdout.strip() == 'active'
|
|
except Exception:
|
|
# Fallback: check if port 67 is listening
|
|
try:
|
|
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
|
|
return ':67 ' in result.stdout
|
|
except Exception:
|
|
return False
|
|
|
|
def _check_ntp_status(self) -> bool:
|
|
"""Check if NTP service is running"""
|
|
try:
|
|
result = subprocess.run(['systemctl', 'is-active', 'chronyd'],
|
|
capture_output=True, text=True, timeout=5)
|
|
return result.returncode == 0 and result.stdout.strip() == 'active'
|
|
except Exception:
|
|
# Fallback: check if port 123 is listening
|
|
try:
|
|
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
|
|
return ':123 ' in result.stdout
|
|
except Exception:
|
|
return False |