a43f9fbf0d
P0 — Broken functionality: - Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing) - Fix email_manager.delete_email_user() missing domain arg - Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file) - Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR) - Fix _next_peer_ip exhaustion returning 500 instead of 409 - Fix ConfigManager Caddyfile path (/app/config-caddy/) - Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx - Remove hardcoded credentials from Dashboard.jsx P1 — Security: - CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern) - enforce_auth: 503 only when users file readable but empty; never bypass on IOError - WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes - DNS add_cell_dns_forward: validate IP and domain; reject injection chars - DNS zone write: realpath containment + record content validation - iptables comment /32 suffix prevents substring match deleting wrong peer rules - is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge) - POST /api/containers: volume allow-list prevents arbitrary host mounts - file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user - email/calendar: stop persisting plaintext passwords in user records - routing_manager: validate IPs, networks, and interface names - peer_registry: write peers.json at mode 0o600 - vault_manager: Fernet key file at mode 0o600 - CORS: lock down to explicit origin list - domain/cell_name validation: reject newline, brace, semicolon injection chars P2 — Architecture: - Peer add: rollback registry entry if firewall rules fail post-add - restart_service(): base class now calls _restart_container(); email and calendar managers call cell-mail / cell-radicale respectively - email/calendar managers sync user list (no passwords) to cell_config.json - Pending-restart flag cleared only after helper subprocess exits with code 0 - docker-compose.yml: add config-caddy volume to API container P3 — Tests (854 → 1020): - Fill test_email_endpoints.py, test_calendar_endpoints.py, test_network_endpoints.py, test_routing_endpoints.py - New: test_peer_management_update.py, test_peer_management_edge_cases.py, test_input_validation.py, test_enforce_auth_configured.py, test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py, test_is_local_request_per_endpoint.py, test_caddy_routing.py - E2E conftest: skip WireGuard suite when wg-quick absent - Update existing tests to match fixed signatures and comment formats Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
412 lines
17 KiB
Python
412 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Unit tests for NetworkManager class
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add api directory to path
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
import unittest
|
|
import tempfile
|
|
import os
|
|
import json
|
|
import shutil
|
|
from unittest.mock import patch, MagicMock
|
|
from datetime import datetime
|
|
|
|
# Add parent directory to path for imports
|
|
import sys
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from network_manager import NetworkManager
|
|
|
|
class TestNetworkManager(unittest.TestCase):
|
|
"""Test cases for NetworkManager class"""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment"""
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.data_dir = os.path.join(self.test_dir, 'data')
|
|
self.config_dir = os.path.join(self.test_dir, 'config')
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
os.makedirs(self.config_dir, exist_ok=True)
|
|
|
|
# Create NetworkManager instance
|
|
self.network_manager = NetworkManager(self.data_dir, self.config_dir)
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment"""
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_initialization(self):
|
|
"""Test NetworkManager initialization"""
|
|
self.assertEqual(self.network_manager.data_dir, self.data_dir)
|
|
self.assertEqual(self.network_manager.config_dir, self.config_dir)
|
|
self.assertTrue(os.path.exists(self.network_manager.dns_zones_dir))
|
|
self.assertTrue(os.path.exists(os.path.dirname(self.network_manager.dhcp_leases_file)))
|
|
|
|
def test_generate_zone_content(self):
|
|
"""Test DNS zone content generation"""
|
|
records = [
|
|
{'name': 'test1', 'type': 'A', 'value': '192.168.1.10', 'ttl': 3600},
|
|
{'name': 'test2', 'type': 'CNAME', 'value': 'test1', 'ttl': 1800}
|
|
]
|
|
|
|
content = self.network_manager._generate_zone_content('test.cell', records)
|
|
|
|
self.assertIn('test.cell', content)
|
|
self.assertIn('SOA', content)
|
|
self.assertIn('192.168.1.10', content)
|
|
self.assertIn('test1', content)
|
|
self.assertIn('CNAME', content)
|
|
|
|
def test_add_dns_record(self):
|
|
"""Test adding DNS record"""
|
|
success = self.network_manager.add_dns_record('test.cell', 'test', 'A', '192.168.1.100')
|
|
self.assertTrue(success)
|
|
|
|
# Check if zone file was created
|
|
zone_file = os.path.join(self.network_manager.dns_zones_dir, 'test.cell.zone')
|
|
self.assertTrue(os.path.exists(zone_file))
|
|
|
|
# Check content
|
|
with open(zone_file, 'r') as f:
|
|
content = f.read()
|
|
self.assertIn('test', content)
|
|
self.assertIn('192.168.1.100', content)
|
|
|
|
def test_remove_dns_record(self):
|
|
"""Test removing DNS record"""
|
|
# Add a record first
|
|
self.network_manager.add_dns_record('test.cell', 'test', 'A', '192.168.1.100')
|
|
|
|
# Remove it
|
|
success = self.network_manager.remove_dns_record('test.cell', 'test', 'A')
|
|
self.assertTrue(success)
|
|
|
|
# Check if record was removed
|
|
zone_file = os.path.join(self.network_manager.dns_zones_dir, 'test.cell.zone')
|
|
with open(zone_file, 'r') as f:
|
|
content = f.read()
|
|
self.assertNotIn('192.168.1.100', content)
|
|
|
|
def test_load_dns_records(self):
|
|
"""Test loading DNS records from zone file"""
|
|
# Create a test zone file
|
|
zone_file = os.path.join(self.network_manager.dns_zones_dir, 'test.cell.zone')
|
|
content = """$TTL 3600
|
|
@ IN SOA test.cell. admin.test.cell. (
|
|
2024010101 ; Serial
|
|
3600 ; Refresh
|
|
1800 ; Retry
|
|
1209600 ; Expire
|
|
3600 ; Minimum TTL
|
|
)
|
|
|
|
; Name servers
|
|
@ IN NS test.cell.
|
|
|
|
test1 3600 IN A 192.168.1.10
|
|
test2 1800 IN CNAME test1
|
|
"""
|
|
|
|
with open(zone_file, 'w') as f:
|
|
f.write(content)
|
|
|
|
records = self.network_manager._load_dns_records('test.cell')
|
|
|
|
self.assertEqual(len(records), 2)
|
|
self.assertEqual(records[0]['name'], 'test1')
|
|
self.assertEqual(records[0]['value'], '192.168.1.10')
|
|
self.assertEqual(records[1]['name'], 'test2')
|
|
self.assertEqual(records[1]['type'], 'CNAME')
|
|
|
|
def test_get_dhcp_leases(self):
|
|
"""Test getting DHCP leases"""
|
|
# Create a test leases file
|
|
leases_file = self.network_manager.dhcp_leases_file
|
|
content = """1234567890 aa:bb:cc:dd:ee:ff 192.168.1.100 testhost *
|
|
1234567891 11:22:33:44:55:66 192.168.1.101 anotherhost *
|
|
"""
|
|
|
|
with open(leases_file, 'w') as f:
|
|
f.write(content)
|
|
|
|
leases = self.network_manager.get_dhcp_leases()
|
|
|
|
self.assertEqual(len(leases), 2)
|
|
self.assertEqual(leases[0]['mac'], 'aa:bb:cc:dd:ee:ff')
|
|
self.assertEqual(leases[0]['ip'], '192.168.1.100')
|
|
self.assertEqual(leases[0]['hostname'], 'testhost')
|
|
self.assertEqual(leases[1]['mac'], '11:22:33:44:55:66')
|
|
self.assertEqual(leases[1]['ip'], '192.168.1.101')
|
|
|
|
def test_add_dhcp_reservation(self):
|
|
"""Test adding DHCP reservation"""
|
|
success = self.network_manager.add_dhcp_reservation('aa:bb:cc:dd:ee:ff', '192.168.1.100', 'testhost')
|
|
self.assertTrue(success)
|
|
|
|
# Check if reservation file was created
|
|
reservation_file = os.path.join(self.config_dir, 'dhcp', 'reservations.conf')
|
|
self.assertTrue(os.path.exists(reservation_file))
|
|
|
|
# Check content
|
|
with open(reservation_file, 'r') as f:
|
|
content = f.read()
|
|
self.assertIn('aa:bb:cc:dd:ee:ff', content)
|
|
self.assertIn('192.168.1.100', content)
|
|
self.assertIn('testhost', content)
|
|
|
|
def test_remove_dhcp_reservation(self):
|
|
"""Test removing DHCP reservation"""
|
|
# Add a reservation first
|
|
self.network_manager.add_dhcp_reservation('aa:bb:cc:dd:ee:ff', '192.168.1.100', 'testhost')
|
|
|
|
# Remove it
|
|
success = self.network_manager.remove_dhcp_reservation('aa:bb:cc:dd:ee:ff')
|
|
self.assertTrue(success)
|
|
|
|
# Check if reservation was removed
|
|
reservation_file = os.path.join(self.config_dir, 'dhcp', 'reservations.conf')
|
|
with open(reservation_file, 'r') as f:
|
|
content = f.read()
|
|
self.assertNotIn('aa:bb:cc:dd:ee:ff', content)
|
|
|
|
@patch('subprocess.run')
|
|
def test_get_ntp_status(self, mock_run):
|
|
"""Test getting NTP status"""
|
|
# Mock NTP service running
|
|
mock_run.return_value.stdout = 'cell-ntp\n'
|
|
mock_run.return_value.returncode = 0
|
|
|
|
status = self.network_manager.get_ntp_status()
|
|
|
|
self.assertTrue(status['running'])
|
|
self.assertIn('stats', status)
|
|
|
|
@patch('subprocess.run')
|
|
def test_get_ntp_status_not_running(self, mock_run):
|
|
"""Test getting NTP status when service is not running"""
|
|
# Mock NTP service not running
|
|
mock_run.return_value.stdout = ''
|
|
mock_run.return_value.returncode = 0
|
|
|
|
status = self.network_manager.get_ntp_status()
|
|
|
|
self.assertFalse(status['running'])
|
|
self.assertIn('stats', status)
|
|
|
|
@patch('socket.getaddrinfo')
|
|
def test_test_dns_resolution(self, mock_getaddrinfo):
|
|
"""Test DNS resolution testing"""
|
|
mock_getaddrinfo.return_value = [(None, None, None, None, ('192.168.1.100', 0))]
|
|
result = self.network_manager.test_dns_resolution('test.cell')
|
|
self.assertTrue(result['success'])
|
|
self.assertIn('192.168.1.100', result['output'])
|
|
|
|
@patch('socket.getaddrinfo')
|
|
def test_test_dns_resolution_failure(self, mock_getaddrinfo):
|
|
"""Test DNS resolution testing with failure"""
|
|
import socket
|
|
mock_getaddrinfo.side_effect = socket.gaierror('NXDOMAIN')
|
|
result = self.network_manager.test_dns_resolution('nonexistent.cell')
|
|
self.assertFalse(result['success'])
|
|
self.assertIn('NXDOMAIN', result['error'])
|
|
|
|
@patch('subprocess.run')
|
|
def test_test_dhcp_functionality(self, mock_run):
|
|
"""Test DHCP functionality testing"""
|
|
# Mock DHCP service running
|
|
mock_run.return_value.stdout = 'cell-dhcp\n'
|
|
mock_run.return_value.returncode = 0
|
|
|
|
result = self.network_manager.test_dhcp_functionality()
|
|
|
|
self.assertTrue(result['running'])
|
|
self.assertIn('leases_count', result)
|
|
self.assertIn('leases', result)
|
|
|
|
@patch('subprocess.run')
|
|
def test_test_ntp_functionality(self, mock_run):
|
|
"""Test NTP functionality testing"""
|
|
# Mock NTP service running with tracking
|
|
mock_run.return_value.stdout = 'cell-ntp\n'
|
|
mock_run.return_value.returncode = 0
|
|
|
|
result = self.network_manager.test_ntp_functionality()
|
|
|
|
self.assertTrue(result['running'])
|
|
self.assertIn('ntp_test', result)
|
|
|
|
def test_update_dns_zone(self):
|
|
"""Test updating DNS zone"""
|
|
records = [
|
|
{'name': 'test1', 'type': 'A', 'value': '192.168.1.10', 'ttl': 3600},
|
|
{'name': 'test2', 'type': 'A', 'value': '192.168.1.11', 'ttl': 3600}
|
|
]
|
|
|
|
success = self.network_manager.update_dns_zone('test.cell', records)
|
|
self.assertTrue(success)
|
|
|
|
# Check if zone file was created
|
|
zone_file = os.path.join(self.network_manager.dns_zones_dir, 'test.cell.zone')
|
|
self.assertTrue(os.path.exists(zone_file))
|
|
|
|
# Check content
|
|
with open(zone_file, 'r') as f:
|
|
content = f.read()
|
|
self.assertIn('test1', content)
|
|
self.assertIn('test2', content)
|
|
self.assertIn('192.168.1.10', content)
|
|
self.assertIn('192.168.1.11', content)
|
|
|
|
class TestBootstrapDnsRecords(unittest.TestCase):
|
|
"""Test bootstrap_dns_records with dynamic IP derivation."""
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.data_dir = os.path.join(self.test_dir, 'data')
|
|
self.config_dir = os.path.join(self.test_dir, 'config')
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
os.makedirs(self.config_dir, exist_ok=True)
|
|
self.nm = NetworkManager(self.data_dir, self.config_dir)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
@patch('subprocess.run')
|
|
def test_creates_zone_file(self, _mock):
|
|
self.nm.bootstrap_dns_records('mycell', 'cell')
|
|
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
|
self.assertTrue(os.path.exists(zone_file))
|
|
|
|
@patch('subprocess.run')
|
|
def test_contains_default_caddy_ip(self, _mock):
|
|
self.nm.bootstrap_dns_records('mycell', 'cell')
|
|
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
|
content = open(zone_file).read()
|
|
self.assertIn('172.20.0.2', content) # caddy
|
|
|
|
@patch('subprocess.run')
|
|
def test_custom_ip_range_used(self, _mock):
|
|
self.nm.bootstrap_dns_records('mycell', 'cell', ip_range='10.5.0.0/24')
|
|
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
|
content = open(zone_file).read()
|
|
self.assertIn('10.5.0.2', content) # caddy
|
|
self.assertIn('10.5.0.21', content) # vip_calendar
|
|
self.assertNotIn('172.20', content)
|
|
|
|
@patch('subprocess.run')
|
|
def test_idempotent_skips_existing_zone(self, _mock):
|
|
self.nm.bootstrap_dns_records('mycell', 'cell')
|
|
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
|
mtime1 = os.path.getmtime(zone_file)
|
|
self.nm.bootstrap_dns_records('mycell', 'cell')
|
|
mtime2 = os.path.getmtime(zone_file)
|
|
self.assertEqual(mtime1, mtime2)
|
|
|
|
|
|
class TestApplyIpRange(unittest.TestCase):
|
|
"""Test apply_ip_range rewrites DNS zone records."""
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.data_dir = os.path.join(self.test_dir, 'data')
|
|
self.config_dir = os.path.join(self.test_dir, 'config')
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
os.makedirs(self.config_dir, exist_ok=True)
|
|
self.nm = NetworkManager(self.data_dir, self.config_dir)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
@patch('subprocess.run')
|
|
def test_zone_file_updated_with_new_ips(self, _mock):
|
|
# Bootstrap with default range, then change to 10.0.0.0/24
|
|
self.nm.bootstrap_dns_records('mycell', 'cell', '172.20.0.0/16')
|
|
result = self.nm.apply_ip_range('10.0.0.0/24', 'mycell', 'cell')
|
|
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
|
|
content = open(zone_file).read()
|
|
self.assertIn('10.0.0.2', content) # caddy
|
|
self.assertIn('10.0.0.21', content) # vip_calendar
|
|
self.assertNotIn('172.20', content)
|
|
|
|
@patch('subprocess.run')
|
|
def test_returns_restarted_on_success(self, _mock):
|
|
self.nm.bootstrap_dns_records('mycell', 'cell', '172.20.0.0/16')
|
|
result = self.nm.apply_ip_range('10.0.0.0/24', 'mycell', 'cell')
|
|
self.assertIn('cell-dns (reloaded)', result['restarted'])
|
|
|
|
@patch('subprocess.run')
|
|
def test_all_standard_records_present(self, _mock):
|
|
self.nm.apply_ip_range('10.1.2.0/24', 'pictest', 'mycell')
|
|
zone_file = os.path.join(self.nm.dns_zones_dir, 'mycell.zone')
|
|
content = open(zone_file).read()
|
|
for host in ('pictest', 'api', 'webui', 'calendar', 'files', 'mail', 'webmail', 'webdav'):
|
|
self.assertIn(host, content)
|
|
|
|
@patch('subprocess.run')
|
|
def test_same_range_updates_zone_without_error(self, _mock):
|
|
self.nm.bootstrap_dns_records('mycell', 'cell', '172.20.0.0/16')
|
|
result = self.nm.apply_ip_range('172.20.0.0/16', 'mycell', 'cell')
|
|
self.assertEqual(result['warnings'], [])
|
|
|
|
|
|
class TestCellDnsForwarding(unittest.TestCase):
|
|
"""Test add/remove cell DNS forwarding in Corefile."""
|
|
|
|
def setUp(self):
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.data_dir = os.path.join(self.test_dir, 'data')
|
|
self.config_dir = os.path.join(self.test_dir, 'config')
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
os.makedirs(os.path.join(self.config_dir, 'dns'), exist_ok=True)
|
|
self.nm = NetworkManager(self.data_dir, self.config_dir)
|
|
self.corefile = os.path.join(self.config_dir, 'dns', 'Corefile')
|
|
with open(self.corefile, 'w') as f:
|
|
f.write('home.cell {\n file /data/home.cell.zone\n log\n}\n\n. {\n forward . 8.8.8.8\n log\n}\n')
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
@patch('subprocess.run')
|
|
def test_add_cell_dns_forward_appends_block(self, _mock):
|
|
self.nm.add_cell_dns_forward('remote.cell', '10.1.0.1')
|
|
with open(self.corefile) as f:
|
|
content = f.read()
|
|
self.assertIn('remote.cell', content)
|
|
self.assertIn('10.1.0.1', content)
|
|
self.assertIn('forward . 10.1.0.1', content)
|
|
|
|
@patch('subprocess.run')
|
|
def test_add_cell_dns_forward_idempotent(self, _mock):
|
|
self.nm.add_cell_dns_forward('remote.cell', '10.1.0.1')
|
|
self.nm.add_cell_dns_forward('remote.cell', '10.1.0.1')
|
|
with open(self.corefile) as f:
|
|
content = f.read()
|
|
self.assertEqual(content.count('forward . 10.1.0.1'), 1)
|
|
|
|
@patch('subprocess.run')
|
|
def test_remove_cell_dns_forward_cleans_block(self, _mock):
|
|
self.nm.add_cell_dns_forward('remote.cell', '10.1.0.1')
|
|
self.nm.remove_cell_dns_forward('remote.cell')
|
|
with open(self.corefile) as f:
|
|
content = f.read()
|
|
self.assertNotIn('remote.cell', content)
|
|
self.assertNotIn('10.1.0.1', content)
|
|
|
|
@patch('subprocess.run')
|
|
def test_remove_nonexistent_forward_does_not_error(self, _mock):
|
|
# Removing a domain that was never added must not raise and must not
|
|
# leave the nonexistent domain in the regenerated Corefile.
|
|
result = self.nm.remove_cell_dns_forward('nonexistent.cell')
|
|
after = open(self.corefile).read()
|
|
self.assertNotIn('nonexistent.cell', after)
|
|
# The Corefile is regenerated (new canonical format) — that's correct.
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |