Files
pic/api/network_manager.py
T
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

891 lines
38 KiB
Python

#!/usr/bin/env python3
"""
Network Manager for Personal Internet Cell
Handles DNS, DHCP, and NTP functionality
"""
import os
import re
import json
import subprocess
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
class NetworkManager(BaseServiceManager):
"""Manages network services (DNS, DHCP, NTP)"""
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('network', data_dir, config_dir)
self.dns_zones_dir = os.path.join(data_dir, 'dns')
self.dhcp_leases_file = os.path.join(data_dir, 'dhcp', 'leases')
# Ensure directories exist
self.safe_makedirs(self.dns_zones_dir)
self.safe_makedirs(os.path.dirname(self.dhcp_leases_file))
def update_dns_zone(self, zone_name: str, records: List[Dict]) -> bool:
"""Update DNS zone file with new records"""
# Validate zone_name — must be a safe DNS label, no path traversal
if not isinstance(zone_name, str) or not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9.-]{0,252}$', zone_name):
logger.error(f"update_dns_zone: invalid zone_name {zone_name!r}")
return False
try:
zone_file = os.path.join(self.dns_zones_dir, f'{zone_name}.zone')
# Containment check: resolved zone_file must be inside dns_zones_dir
real_dir = os.path.realpath(self.dns_zones_dir)
real_zone = os.path.realpath(zone_file)
if not (real_zone == real_dir or real_zone.startswith(real_dir + os.sep)):
logger.error(f"update_dns_zone: path traversal attempt for zone {zone_name!r}")
return False
# Validate every record's name and value to prevent zone-file injection
for rec in records:
rname = rec.get('name', '')
rvalue = rec.get('value', '')
if rname and not re.match(r'^[a-zA-Z0-9_.*-]{1,253}$', str(rname)):
logger.error(f"update_dns_zone: invalid record name {rname!r}")
return False
if rvalue and not re.match(r'^[a-zA-Z0-9._: -]{1,512}$', str(rvalue)):
logger.error(f"update_dns_zone: invalid record value {rvalue!r}")
return False
# Create zone file content
content = self._generate_zone_content(zone_name, records)
tmp_file = zone_file + '.tmp'
with open(tmp_file, 'w') as f:
f.write(content)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_file, zone_file)
# Reload DNS service
self._reload_dns_service()
logger.info(f"Updated DNS zone {zone_name} with {len(records)} records")
return True
except Exception as e:
logger.error(f"Failed to update DNS zone {zone_name}: {e}")
return False
def _generate_zone_content(self, zone_name: str, records: List[Dict]) -> str:
"""Generate DNS zone file content"""
timestamp = datetime.now().strftime('%Y%m%d%H')
content = f"""$TTL 3600
@ IN SOA {zone_name}. admin.{zone_name}. (
{timestamp} ; Serial
3600 ; Refresh
1800 ; Retry
1209600 ; Expire
3600 ; Minimum TTL
)
; Name servers
@ IN NS {zone_name}.
"""
# Add records
for record in records:
record_type = record.get('type', 'A')
name = record.get('name', '')
value = record.get('value', '')
ttl = record.get('ttl', '3600')
if name and value:
content += f"{name:<20} {ttl:<8} IN {record_type:<6} {value}\n"
return content
def add_dns_record(self, zone: str, name: str, record_type: str, value: str, ttl: int = 3600) -> bool:
"""Add a DNS record to a zone"""
# Validate zone, name, and value to prevent injection / path traversal
if not isinstance(zone, str) or not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9.-]{0,252}$', zone):
logger.error(f"add_dns_record: invalid zone {zone!r}")
return False
if not isinstance(name, str) or not re.match(r'^[a-zA-Z0-9_.*-]{1,253}$', name):
logger.error(f"add_dns_record: invalid name {name!r}")
return False
if not isinstance(value, str) or not re.match(r'^[a-zA-Z0-9._: -]{1,512}$', value):
logger.error(f"add_dns_record: invalid value {value!r}")
return False
try:
# Load existing records
records = self._load_dns_records(zone)
# Add new record
new_record = {
'name': name,
'type': record_type,
'value': value,
'ttl': ttl
}
# Remove existing record with same name and type
records = [r for r in records if not (r['name'] == name and r['type'] == record_type)]
records.append(new_record)
# Update zone
return self.update_dns_zone(zone, records)
except Exception as e:
logger.error(f"Failed to add DNS record: {e}")
return False
def remove_dns_record(self, zone: str, name: str, record_type: str = 'A') -> bool:
"""Remove a DNS record from a zone"""
try:
# Load existing records
records = self._load_dns_records(zone)
# Remove matching records
records = [r for r in records if not (r['name'] == name and r['type'] == record_type)]
# Update zone
return self.update_dns_zone(zone, records)
except Exception as e:
logger.error(f"Failed to remove DNS record: {e}")
return False
def bootstrap_dns_records(self, cell_name: str, domain: str,
ip_range: str = '172.20.0.0/16') -> None:
"""Create default service A records the first time the cell starts up.
Skipped if a zone file already exists (idempotent)."""
zone_file = os.path.join(self.dns_zones_dir, f'{domain}.zone')
if os.path.exists(zone_file):
return
logger.info(f"Bootstrapping DNS records for zone '{domain}'")
records = self._build_dns_records(cell_name, ip_range)
self.update_dns_zone(domain, records)
logger.info(f"Created {len(records)} default DNS records for zone '{domain}'")
def apply_ip_range(self, ip_range: str, cell_name: str, domain: str) -> Dict[str, Any]:
"""Rewrite the primary DNS zone file with IPs derived from the new subnet."""
restarted: List[str] = []
warnings: List[str] = []
try:
records = self._build_dns_records(cell_name, ip_range)
if self.update_dns_zone(domain, records):
restarted.append('cell-dns (reloaded)')
else:
warnings.append('DNS zone update failed')
except Exception as e:
warnings.append(f'apply_ip_range failed: {e}')
return {'restarted': restarted, 'warnings': warnings}
def _build_dns_records(self, cell_name: str, ip_range: str) -> List[Dict]:
"""Build the standard set of DNS A records for the given subnet.
All user-facing names resolve to the Caddy reverse proxy (caddy IP) so
the Host header is passed through and Caddy routes based on it.
Exception: calendar/files/mail/webdav use dedicated virtual IPs so that
iptables per-service firewall rules can target them by destination IP.
api and webui also go through Caddy — they don't have their own VIPs and
their containers don't serve HTTP on port 80.
"""
import ip_utils
ips = ip_utils.get_service_ips(ip_range)
return [
{'name': cell_name, 'type': 'A', 'value': ips['caddy']},
{'name': 'api', 'type': 'A', 'value': ips['caddy']},
{'name': 'webui', 'type': 'A', 'value': ips['caddy']},
{'name': 'calendar', 'type': 'A', 'value': ips['vip_calendar']},
{'name': 'files', 'type': 'A', 'value': ips['vip_files']},
{'name': 'mail', 'type': 'A', 'value': ips['vip_mail']},
{'name': 'webmail', 'type': 'A', 'value': ips['vip_mail']},
{'name': 'webdav', 'type': 'A', 'value': ips['vip_webdav']},
]
def get_dns_records(self, zone: str = 'cell') -> List[Dict]:
"""Get all DNS records across all zones"""
all_records = []
try:
for fname in os.listdir(self.dns_zones_dir):
if fname.endswith('.zone'):
z = fname[:-5]
for rec in self._load_dns_records(z):
rec['zone'] = z
all_records.append(rec)
except Exception as e:
logger.error(f"Failed to list DNS records: {e}")
return all_records
def _load_dns_records(self, zone: str) -> List[Dict]:
"""Load DNS records from zone file"""
zone_file = os.path.join(self.dns_zones_dir, f'{zone}.zone')
if not os.path.exists(zone_file):
return []
records = []
try:
with open(zone_file, 'r') as f:
lines = f.readlines()
for line in lines:
line = line.strip().split(';')[0].strip() # strip inline comments
if not line or line.startswith('$'):
continue
parts = line.split()
# Support both: name IN type value (4 parts)
# and name TTL IN type value (5 parts)
if len(parts) == 4 and parts[1] in ('IN',) and parts[2] in ('A', 'CNAME', 'MX', 'TXT'):
records.append({'name': parts[0], 'ttl': '300', 'type': parts[2], 'value': parts[3]})
elif len(parts) >= 5:
record_type = parts[3]
if record_type in ('A', 'CNAME'):
records.append({
'name': parts[0],
'ttl': parts[1],
'type': record_type,
'value': parts[4]
})
except Exception as e:
logger.error(f"Failed to load DNS records: {e}")
return records
def get_dhcp_leases(self) -> List[Dict]:
"""Get current DHCP leases"""
leases = []
try:
if os.path.exists(self.dhcp_leases_file):
with open(self.dhcp_leases_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split()
if len(parts) >= 4:
leases.append({
'mac': parts[1],
'ip': parts[2],
'hostname': parts[3] if len(parts) > 3 else '',
'timestamp': parts[0]
})
except Exception as e:
logger.error(f"Failed to load DHCP leases: {e}")
return leases
def add_dhcp_reservation(self, mac: str, ip: str, hostname: str = '') -> bool:
"""Add a DHCP reservation"""
try:
reservation_file = os.path.join(self.config_dir, 'dhcp', 'reservations.conf')
# Ensure directory exists
self.safe_makedirs(os.path.dirname(reservation_file))
# Add reservation
with open(reservation_file, 'a') as f:
f.write(f"dhcp-host={mac},{ip},{hostname}\n")
# Reload DHCP service
self._reload_dhcp_service()
logger.info(f"Added DHCP reservation: {mac} -> {ip}")
return True
except Exception as e:
logger.error(f"Failed to add DHCP reservation: {e}")
return False
def remove_dhcp_reservation(self, mac: str) -> bool:
"""Remove a DHCP reservation"""
try:
reservation_file = os.path.join(self.config_dir, 'dhcp', 'reservations.conf')
if not os.path.exists(reservation_file):
return True
# Read existing reservations
with open(reservation_file, 'r') as f:
lines = f.readlines()
# Remove matching reservation
lines = [line for line in lines if not line.startswith(f"dhcp-host={mac},")]
# Write back
with open(reservation_file, 'w') as f:
f.writelines(lines)
# Reload DHCP service
self._reload_dhcp_service()
logger.info(f"Removed DHCP reservation: {mac}")
return True
except Exception as e:
logger.error(f"Failed to remove DHCP reservation: {e}")
return False
def get_ntp_status(self) -> Dict:
"""Get NTP service status"""
try:
# Check if NTP service is running
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-ntp', '--format', '{{.Names}}'],
capture_output=True, text=True)
is_running = len(result.stdout.strip()) > 0
# Get NTP statistics if running
stats = {}
if is_running:
try:
result = subprocess.run(['docker', 'exec', 'cell-ntp', 'chronyc', 'tracking'],
capture_output=True, text=True)
if result.returncode == 0:
stats['tracking'] = result.stdout
result = subprocess.run(['docker', 'exec', 'cell-ntp', 'chronyc', 'sources'],
capture_output=True, text=True)
if result.returncode == 0:
stats['sources'] = result.stdout
except Exception as e:
logger.error(f"Failed to get NTP stats: {e}")
return {
'running': is_running,
'stats': stats
}
except Exception as e:
logger.error(f"Failed to get NTP status: {e}")
return {'running': False, 'stats': {}}
def _reload_dns_service(self):
"""Reload DNS service"""
try:
subprocess.run(['docker', 'exec', 'cell-dns', 'kill', '-HUP', '1'],
capture_output=True, timeout=10)
except Exception as e:
logger.error(f"Failed to reload DNS service: {e}")
def _reload_dhcp_service(self):
"""Reload DHCP service"""
try:
subprocess.run(['docker', 'exec', 'cell-dhcp', 'kill', '-HUP', '1'],
capture_output=True, timeout=10)
except Exception as e:
logger.error(f"Failed to reload DHCP service: {e}")
def apply_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Write config to real service files and reload/restart affected containers."""
restarted = []
warnings = []
dnsmasq_changed = False
# DHCP range
if 'dhcp_range' in config:
try:
dhcp_conf = os.path.join(self.config_dir, 'dhcp', 'dnsmasq.conf')
if os.path.exists(dhcp_conf):
with open(dhcp_conf) as f:
lines = f.readlines()
lines = [
f"dhcp-range={config['dhcp_range']}\n" if l.startswith('dhcp-range=') else l
for l in lines
]
with open(dhcp_conf, 'w') as f:
f.writelines(lines)
dnsmasq_changed = True
except Exception as e:
warnings.append(f"dhcp_range write failed: {e}")
# NTP servers
if 'ntp_servers' in config and config['ntp_servers']:
try:
ntp_conf = os.path.join(self.config_dir, 'ntp', 'chrony.conf')
if os.path.exists(ntp_conf):
with open(ntp_conf) as f:
lines = f.readlines()
# Remove existing server lines, add new ones
lines = [l for l in lines if not l.startswith('server ')]
new_servers = [f"server {s} iburst\n" for s in config['ntp_servers']]
lines = new_servers + lines
with open(ntp_conf, 'w') as f:
f.writelines(lines)
self._restart_container('cell-ntp')
restarted.append('cell-ntp')
except Exception as e:
warnings.append(f"ntp_servers write failed: {e}")
if dnsmasq_changed:
self._reload_dhcp_service()
restarted.append('cell-dhcp (reloaded)')
return {'restarted': restarted, 'warnings': warnings}
def apply_domain(self, domain: str, reload: bool = True) -> Dict[str, Any]:
"""Update domain across dnsmasq, Corefile, and zone file; reload DNS + DHCP.
reload=False writes config files only — use when deferring container restart.
"""
restarted = []
warnings = []
# 1. Update dnsmasq.conf domain= line
try:
dhcp_conf = os.path.join(self.config_dir, 'dhcp', 'dnsmasq.conf')
if os.path.exists(dhcp_conf):
with open(dhcp_conf) as f:
lines = f.readlines()
lines = [
f"domain={domain}\n" if l.startswith('domain=') else l
for l in lines
]
with open(dhcp_conf, 'w') as f:
f.writelines(lines)
if reload:
self._reload_dhcp_service()
restarted.append('cell-dhcp (reloaded)')
except Exception as e:
warnings.append(f"dnsmasq domain update failed: {e}")
# 2. Regenerate Corefile using generate_corefile so it always stays consistent
try:
import firewall_manager as _fm
corefile = os.path.join(self.config_dir, 'dns', 'Corefile')
peers_file = os.path.join(self.data_dir, 'peers.json')
try:
import json as _json
peers = _json.loads(open(peers_file).read()) if os.path.exists(peers_file) else []
except Exception:
peers = []
_fm.generate_corefile(peers, corefile, domain)
except Exception as e:
warnings.append(f"Corefile domain update failed: {e}")
# 3. Update zone file: rename and rewrite $ORIGIN / SOA, remove stale zones
try:
dns_data = os.path.join(self.data_dir, 'dns')
if os.path.isdir(dns_data):
dst = os.path.join(dns_data, f'{domain}.zone')
zone_files = [
os.path.join(dns_data, f)
for f in os.listdir(dns_data)
if f.endswith('.zone') and 'local' not in f
]
src = next((p for p in zone_files if p != dst), dst)
if os.path.exists(src):
with open(src) as f:
zone_content = f.read()
m = re.search(r'^\$ORIGIN\s+(\S+)', zone_content, re.MULTILINE)
old_origin = m.group(1).rstrip('.') if m else None
if old_origin and old_origin != domain:
zone_content = zone_content.replace(f'{old_origin}.', f'{domain}.')
zone_content = re.sub(
r'^\$ORIGIN\s+\S+', f'$ORIGIN {domain}.', zone_content, flags=re.MULTILINE)
with open(dst, 'w') as f:
f.write(zone_content)
for zone_path in zone_files:
if zone_path != dst:
try:
os.remove(zone_path)
except OSError:
pass
except Exception as e:
warnings.append(f"zone file domain update failed: {e}")
# 4. Reload CoreDNS (only when not deferring to Apply)
if reload:
try:
self._reload_dns_service()
restarted.append('cell-dns (reloaded)')
except Exception as e:
warnings.append(f"CoreDNS reload failed: {e}")
return {'restarted': restarted, 'warnings': warnings}
def apply_cell_name(self, old_name: str, new_name: str, reload: bool = True) -> Dict[str, Any]:
"""Update the cell hostname record in the primary DNS zone file.
reload=False writes the zone file only — use when deferring container restart.
"""
restarted = []
warnings = []
if not old_name or not new_name or old_name == new_name:
return {'restarted': restarted, 'warnings': warnings}
try:
dns_data = os.path.join(self.data_dir, 'dns')
if os.path.isdir(dns_data):
for fname in os.listdir(dns_data):
if fname.endswith('.zone') and 'local' not in fname:
zone_file = os.path.join(dns_data, fname)
with open(zone_file) as f:
content = f.read()
content = re.sub(
rf'^{re.escape(old_name)}(\s+IN\s+A\s+)',
f'{new_name}\\1',
content, flags=re.MULTILINE
)
with open(zone_file, 'w') as f:
f.write(content)
break
if reload:
self._reload_dns_service()
restarted.append('cell-dns (reloaded)')
except Exception as e:
warnings.append(f"cell_name DNS update failed: {e}")
return {'restarted': restarted, 'warnings': warnings}
def _load_cell_links(self) -> List[Dict[str, Any]]:
"""Load cell_links.json from the data directory (written by CellLinkManager)."""
links_file = os.path.join(self.data_dir, 'cell_links.json')
if os.path.exists(links_file):
try:
with open(links_file) as f:
return json.load(f)
except Exception:
return []
return []
def add_cell_dns_forward(self, domain: str, dns_ip: str) -> Dict[str, Any]:
"""Register a CoreDNS forwarding entry for a remote cell's domain.
Validates inputs, then rebuilds the entire Corefile via
firewall_manager.apply_all_dns_rules() so that no existing stanza is
silently wiped. Does NOT write the Corefile directly.
"""
import ipaddress
import firewall_manager as fm
restarted = []
warnings = []
# Validate dns_ip — newlines/garbage would inject arbitrary CoreDNS directives
try:
ipaddress.ip_address(dns_ip)
except (ValueError, TypeError):
warnings.append(f'add_cell_dns_forward: invalid dns_ip {dns_ip!r}')
return {'restarted': restarted, 'warnings': warnings}
# Validate domain — reject newlines, braces, spaces, and any non-DNS chars
if (not isinstance(domain, str)
or not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9.-]{0,252}$', domain)
or any(c in domain for c in ('\n', '\r', '{', '}', ' ', '\t'))):
warnings.append(f'add_cell_dns_forward: invalid domain {domain!r}')
return {'restarted': restarted, 'warnings': warnings}
try:
# Build the full forwarding list: existing links + new entry (deduped by domain)
existing_links = self._load_cell_links()
# The new entry may not yet be in cell_links.json (CellLinkManager saves after
# calling us), so we merge it in here.
merged = [l for l in existing_links if l.get('domain') != domain]
merged.append({'domain': domain, 'dns_ip': dns_ip})
corefile_path = os.path.join(self.config_dir, 'dns', 'Corefile')
# Peers list is empty here; the full peer list is used by the periodic
# apply_all_dns_rules() call from app.py. We only need to persist the
# forwarding stanza without disturbing whatever peer ACLs are in the file.
fm.apply_all_dns_rules([], corefile_path=corefile_path, cell_links=merged)
restarted.append('cell-dns (reloaded)')
except Exception as e:
warnings.append(f'add_cell_dns_forward failed: {e}')
return {'restarted': restarted, 'warnings': warnings}
def remove_cell_dns_forward(self, domain: str) -> Dict[str, Any]:
"""Unregister a CoreDNS forwarding entry for a remote cell's domain.
Rebuilds the entire Corefile via firewall_manager.apply_all_dns_rules()
with the named domain excluded. Does NOT write the Corefile directly.
"""
import firewall_manager as fm
restarted = []
warnings = []
try:
existing_links = self._load_cell_links()
# Exclude the domain being removed; CellLinkManager will also remove it
# from cell_links.json after this call returns.
remaining = [l for l in existing_links if l.get('domain') != domain]
corefile_path = os.path.join(self.config_dir, 'dns', 'Corefile')
fm.apply_all_dns_rules([], corefile_path=corefile_path, cell_links=remaining)
restarted.append('cell-dns (reloaded)')
except Exception as e:
warnings.append(f'remove_cell_dns_forward failed: {e}')
return {'restarted': restarted, 'warnings': warnings}
def test_dns_resolution(self, domain: str) -> Dict:
"""Test DNS resolution for a domain using Python socket."""
import socket
try:
results = socket.getaddrinfo(domain, None)
addrs = [r[4][0] for r in results]
return {'success': True, 'output': f"Resolved: {', '.join(addrs)}", 'error': ''}
except Exception as e:
return {'success': False, 'output': '', 'error': str(e)}
def test_dhcp_functionality(self) -> Dict:
"""Test DHCP functionality"""
try:
# Check if DHCP service is running
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-dhcp', '--format', '{{.Names}}'],
capture_output=True, text=True)
is_running = len(result.stdout.strip()) > 0
# Get DHCP leases
leases = self.get_dhcp_leases()
return {
'success': is_running,
'running': is_running,
'leases_count': len(leases),
'leases': leases
}
except Exception as e:
logger.error(f"Failed to test DHCP functionality: {e}")
return {'success': False, 'running': False, 'leases_count': 0, 'leases': []}
def test_ntp_functionality(self) -> Dict:
"""Test NTP functionality"""
try:
# Check if NTP service is running
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-ntp', '--format', '{{.Names}}'],
capture_output=True, text=True)
is_running = len(result.stdout.strip()) > 0
# Test NTP query
ntp_test = {}
if is_running:
try:
result = subprocess.run(['docker', 'exec', 'cell-ntp', 'chronyc', 'tracking'],
capture_output=True, text=True, timeout=10)
ntp_test['tracking'] = result.returncode == 0
ntp_test['output'] = result.stdout
except Exception as e:
ntp_test['tracking'] = False
ntp_test['error'] = str(e)
return {
'success': is_running,
'running': is_running,
'ntp_test': ntp_test
}
except Exception as e:
logger.error(f"Failed to test NTP functionality: {e}")
return {'success': False, 'running': False, 'ntp_test': {}}
def get_network_info(self) -> dict:
"""Return general network info: IP addresses, interfaces, gateway, DNS, etc."""
try:
info = {}
# Get network interfaces
result = subprocess.run(['ip', '-j', 'addr'], capture_output=True, text=True)
if result.returncode == 0:
import json as _json
info['interfaces'] = _json.loads(result.stdout)
else:
info['interfaces'] = []
# Get default gateway
result = subprocess.run(['ip', 'route', 'show', 'default'], capture_output=True, text=True)
if result.returncode == 0:
info['default_gateway'] = result.stdout.strip()
else:
info['default_gateway'] = ''
# Get DNS servers
resolv_conf = '/etc/resolv.conf'
dns_servers = []
try:
with open(resolv_conf, 'r') as f:
for line in f:
if line.startswith('nameserver'):
dns_servers.append(line.strip().split()[1])
except Exception:
pass
info['dns_servers'] = dns_servers
return info
except Exception as e:
logger.error(f"Failed to get network info: {e}")
return {'error': str(e)}
def get_dns_status(self) -> dict:
"""Return DNS service status and summary info."""
try:
# Check if DNS service is running
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-dns', '--format', '{{.Names}}'], capture_output=True, text=True)
is_running = len(result.stdout.strip()) > 0
# Get DNS records count (for all zones)
records_count = 0
try:
for fname in os.listdir(self.dns_zones_dir):
if fname.endswith('.zone'):
with open(os.path.join(self.dns_zones_dir, fname), 'r') as f:
for line in f:
if line.strip() and not line.startswith(';') and not line.startswith('$'):
parts = line.split()
if len(parts) >= 5 and parts[3] in ('A', 'CNAME'):
records_count += 1
except Exception:
pass
return {'running': is_running, 'records_count': records_count}
except Exception as e:
logger.error(f"Failed to get DNS status: {e}")
return {'running': False, 'records_count': 0, 'error': str(e)}
def get_status(self) -> Dict[str, Any]:
"""Get network service status"""
try:
# Check if we're running in Docker environment
import os
is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true'
if is_docker:
# Check if network containers are actually running
dns_running = self._check_dns_container_status()
dhcp_running = self._check_dhcp_container_status()
ntp_running = self._check_ntp_container_status()
all_running = dns_running and dhcp_running and ntp_running
status = {
'dns_running': dns_running,
'dhcp_running': dhcp_running,
'ntp_running': ntp_running,
'running': all_running,
'status': 'online' if all_running else 'offline',
'network': {
'dns_running': dns_running,
'dhcp_running': dhcp_running,
'ntp_running': ntp_running,
'running': all_running,
'status': 'online' if all_running else 'offline'
},
'timestamp': datetime.utcnow().isoformat()
}
else:
# Check actual service status in production
dns_running = self._check_dns_status()
dhcp_running = self._check_dhcp_status()
ntp_running = self._check_ntp_status()
status = {
'dns_running': dns_running,
'dhcp_running': dhcp_running,
'ntp_running': ntp_running,
'running': dns_running and dhcp_running and ntp_running,
'status': 'online' if (dns_running and dhcp_running and ntp_running) else 'offline',
'network': {
'dns_running': dns_running,
'dhcp_running': dhcp_running,
'ntp_running': ntp_running,
'running': dns_running and dhcp_running and ntp_running,
'status': 'online' if (dns_running and dhcp_running and ntp_running) else 'offline'
},
'timestamp': datetime.utcnow().isoformat()
}
return status
except Exception as e:
return self.handle_error(e, "get_status")
def _check_dns_container_status(self) -> bool:
"""Check if DNS Docker container is running"""
try:
import docker
client = docker.from_env()
containers = client.containers.list(filters={'name': 'cell-dns'})
return len(containers) > 0
except Exception:
return False
def _check_dhcp_container_status(self) -> bool:
"""Check if DHCP Docker container is running"""
try:
import docker
client = docker.from_env()
containers = client.containers.list(filters={'name': 'cell-dhcp'})
return len(containers) > 0
except Exception:
return False
def _check_ntp_container_status(self) -> bool:
"""Check if NTP Docker container is running"""
try:
import docker
client = docker.from_env()
containers = client.containers.list(filters={'name': 'cell-ntp'})
return len(containers) > 0
except Exception:
return False
def test_connectivity(self) -> Dict[str, Any]:
"""Test network service connectivity"""
try:
dns_test = self.test_dns_resolution('google.com')
dhcp_test = self.test_dhcp_functionality()
ntp_test = self.test_ntp_functionality()
results = {
'dns_test': dns_test,
'dhcp_test': dhcp_test,
'ntp_test': ntp_test,
'timestamp': datetime.utcnow().isoformat()
}
# Determine overall success
success = all(
result.get('success', False)
for result in [dns_test, dhcp_test, ntp_test]
)
results['success'] = success
# Add network key for compatibility
results['network'] = {
'dns_test': dns_test,
'dhcp_test': dhcp_test,
'ntp_test': ntp_test,
'success': success
}
return results
except Exception as e:
return self.handle_error(e, "test_connectivity")
def _check_dns_status(self) -> bool:
"""Check if DNS service is running"""
try:
result = subprocess.run(['systemctl', 'is-active', 'coredns'],
capture_output=True, text=True, timeout=5)
return result.returncode == 0 and result.stdout.strip() == 'active'
except Exception:
# Fallback: check if port 53 is listening
try:
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
return ':53 ' in result.stdout
except Exception:
return False
def _check_dhcp_status(self) -> bool:
"""Check if DHCP service is running"""
try:
result = subprocess.run(['systemctl', 'is-active', 'dnsmasq'],
capture_output=True, text=True, timeout=5)
return result.returncode == 0 and result.stdout.strip() == 'active'
except Exception:
# Fallback: check if port 67 is listening
try:
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
return ':67 ' in result.stdout
except Exception:
return False
def _check_ntp_status(self) -> bool:
"""Check if NTP service is running"""
try:
result = subprocess.run(['systemctl', 'is-active', 'chronyd'],
capture_output=True, text=True, timeout=5)
return result.returncode == 0 and result.stdout.strip() == 'active'
except Exception:
# Fallback: check if port 123 is listening
try:
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
return ':123 ' in result.stdout
except Exception:
return False