feat: fix cross-cell service access — DNS DNAT, service DNAT, Caddy routing

DNS A records now return the WireGuard server IP (10.0.0.1) instead of
Docker bridge VIPs so cross-cell peers resolve service names correctly
regardless of their bridge subnet. DNAT rules (wg0:53→cell-dns:53 and
wg0:80→cell-caddy:80) are applied at startup. Caddy routes by Host header,
eliminating the Docker bridge subnet conflict. Firewall cell rules allow
DNS and service (Caddy) traffic from linked cell subnets. Split-tunnel
AllowedIPs now dynamically includes connected-cell VPN subnets and drops
the 172.20.0.0/16 range. Peers with route_via set now receive full-tunnel
config (0.0.0.0/0) so all their traffic exits via the remote cell.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-02 03:12:09 -04:00
parent f2f15eb17e
commit 9a800e3b6b
11 changed files with 325 additions and 146 deletions
+4 -1
View File
@@ -305,6 +305,8 @@ def _apply_startup_enforcement():
firewall_manager.apply_all_peer_rules(peers) firewall_manager.apply_all_peer_rules(peers)
firewall_manager.apply_all_cell_rules(cell_links) firewall_manager.apply_all_cell_rules(cell_links)
firewall_manager.ensure_cell_api_dnat() firewall_manager.ensure_cell_api_dnat()
firewall_manager.ensure_dns_dnat()
firewall_manager.ensure_service_dnat()
# Restore any cell link WireGuard peers that were lost from wg0.conf # Restore any cell link WireGuard peers that were lost from wg0.conf
# (happens if the container was rebuilt, wg0.conf was reset, etc.) # (happens if the container was rebuilt, wg0.conf was reset, etc.)
_restore_cell_wg_peers(cell_links) _restore_cell_wg_peers(cell_links)
@@ -337,7 +339,8 @@ def _bootstrap_dns():
cell_name = identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell')) cell_name = identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell'))
domain = identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell')) domain = identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell'))
ip_range = identity.get('ip_range', os.environ.get('CELL_IP_RANGE', '172.20.0.0/16')) ip_range = identity.get('ip_range', os.environ.get('CELL_IP_RANGE', '172.20.0.0/16'))
network_manager.bootstrap_dns_records(cell_name, domain, ip_range) # Bootstrap on first start; then always regenerate to ensure A records use WG server IP.
network_manager.apply_ip_range(ip_range, cell_name, domain)
except Exception as e: except Exception as e:
logger.warning(f"DNS bootstrap failed (non-fatal): {e}") logger.warning(f"DNS bootstrap failed (non-fatal): {e}")
+119 -17
View File
@@ -193,10 +193,14 @@ def apply_peer_rules(peer_ip: str, settings: Dict[str, Any]) -> bool:
'-m', 'comment', '--comment', comment, '-j', target]) '-m', 'comment', '--comment', comment, '-j', target])
# --- Step 3 (inserted last → ends up at TOP of chain) --- # --- Step 3 (inserted last → ends up at TOP of chain) ---
# Per-service rules — inserted in reverse dict order so first service ends up at top # Service access via Caddy: DNS returns WG server IP for all services;
for service, svc_ip in reversed(list(SERVICE_IPS.items())): # ensure_service_dnat() routes wg0:80 to Caddy. One ACCEPT/DROP rule
target = 'ACCEPT' if service in service_access else 'DROP' # controls service access; CoreDNS ACL enforces per-name granularity.
_iptables(['-I', 'FORWARD', '-s', peer_ip, '-d', svc_ip, caddy_ip = _get_caddy_container_ip()
if caddy_ip:
target = 'ACCEPT' if service_access else 'DROP'
_iptables(['-I', 'FORWARD', '-s', peer_ip, '-d', caddy_ip,
'-p', 'tcp', '--dport', '80',
'-m', 'comment', '--comment', comment, '-j', target]) '-m', 'comment', '--comment', comment, '-j', target])
logger.info(f"Applied rules for {peer_ip}: internet={internet_access} " logger.info(f"Applied rules for {peer_ip}: internet={internet_access} "
@@ -298,24 +302,50 @@ def _get_cell_api_ip() -> Optional[str]:
return r.stdout.strip() return r.stdout.strip()
def _get_dns_container_ip() -> str:
"""Return cell-dns container's Docker bridge IP. Falls back to 172.20.0.3."""
try:
r = _run(['docker', 'inspect', '--format',
'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}',
'cell-dns'], check=False)
return r.stdout.strip() or '172.20.0.3'
except Exception:
return '172.20.0.3'
def _get_caddy_container_ip() -> str:
"""Return cell-caddy container's Docker bridge IP. Falls back to 172.20.0.2."""
try:
r = _run(['docker', 'inspect', '--format',
'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}',
'cell-caddy'], check=False)
return r.stdout.strip() or '172.20.0.2'
except Exception:
return '172.20.0.2'
def apply_cell_rules(cell_name: str, vpn_subnet: str, inbound_services: List[str], def apply_cell_rules(cell_name: str, vpn_subnet: str, inbound_services: List[str],
exit_relay: bool = False) -> bool: exit_relay: bool = False) -> bool:
"""Apply FORWARD rules for a cell-to-cell peer. """Apply FORWARD rules for a cell-to-cell peer.
Traffic from vpn_subnet is allowed only to service VIPs listed in Traffic from vpn_subnet is allowed only to service VIPs listed in
inbound_services; all other cell traffic is DROPped. Cells get no inbound_services; all other cell traffic is DROPped. Cells get no
internet or peer access — only explicit service VIPs, plus the internet or peer access — only explicit service access via Caddy on
cell-api port (3000) for permission-sync pushes arriving via DNAT. port 80, plus the cell-api port (3000) for permission-sync pushes.
DNS (port 53) is always allowed so cell peers can resolve service names.
Service names resolve to the WG server IP; ensure_service_dnat() routes
wg0:80 to Caddy, which routes by Host header.
When exit_relay=True, the remote cell's peers can route internet When exit_relay=True, the remote cell's peers can route internet
traffic through this cell (Phase 3). A broad ACCEPT for traffic traffic through this cell (Phase 3).
going out eth0 is added below per-service rules but above catch-all.
Rule insertion order (first inserted = bottom, last inserted = top): Rule insertion order (first inserted = bottom, last inserted = top):
1. Catch-all DROP for the subnet (inserted first → bottom) 1. Catch-all DROP for the subnet (inserted first → bottom)
2. Exit relay ACCEPT (-o eth0) (if exit_relay, above catch-all) 2. Exit relay ACCEPT (-o eth0) (if exit_relay, above catch-all)
3. Per-service ACCEPT/DROP (inserted in reversed() order) 3. Service ACCEPT to Caddy port 80 (if any inbound_services)
4. API-sync ACCEPT (inserted last → top) 4. DNS ACCEPT to cell-dns port 53 (UDP + TCP)
5. API-sync ACCEPT (inserted last → top)
""" """
try: try:
tag = _cell_tag(cell_name) tag = _cell_tag(cell_name)
@@ -326,17 +356,27 @@ def apply_cell_rules(cell_name: str, vpn_subnet: str, inbound_services: List[str
'-m', 'comment', '--comment', tag, '-j', 'DROP']) '-m', 'comment', '--comment', tag, '-j', 'DROP'])
# Exit relay ACCEPT — allow internet-bound traffic from this cell's peers. # Exit relay ACCEPT — allow internet-bound traffic from this cell's peers.
# Inserted ABOVE catch-all but BELOW per-service rules so service-level
# DROP rules still take effect for specific service VIPs.
if exit_relay: if exit_relay:
_iptables(['-I', 'FORWARD', '-s', vpn_subnet, '-o', 'eth0', _iptables(['-I', 'FORWARD', '-s', vpn_subnet, '-o', 'eth0',
'-m', 'comment', '--comment', tag, '-j', 'ACCEPT']) '-m', 'comment', '--comment', tag, '-j', 'ACCEPT'])
# Per-service rules — inserted in reverse dict order, highest-priority last # Service access via Caddy — DNAT wg0:80 → Caddy; Host header routes to service.
for service, svc_ip in reversed(list(SERVICE_IPS.items())): # Only add ACCEPT if this cell has any inbound services granted.
target = 'ACCEPT' if service in inbound_services else 'DROP' if inbound_services:
_iptables(['-I', 'FORWARD', '-s', vpn_subnet, '-d', svc_ip, caddy_ip = _get_caddy_container_ip()
'-m', 'comment', '--comment', tag, '-j', target]) if caddy_ip:
_iptables(['-I', 'FORWARD', '-s', vpn_subnet, '-d', caddy_ip,
'-p', 'tcp', '--dport', '80',
'-m', 'comment', '--comment', tag, '-j', 'ACCEPT'])
# DNS ACCEPT — allow cross-cell peers to query CoreDNS via the WG server IP.
# ensure_dns_dnat() routes wg0:53 to cell-dns; FORWARD must allow it.
dns_ip = _get_dns_container_ip()
if dns_ip:
for proto in ('udp', 'tcp'):
_iptables(['-I', 'FORWARD', '-s', vpn_subnet, '-d', dns_ip,
'-p', proto, '--dport', '53',
'-m', 'comment', '--comment', tag, '-j', 'ACCEPT'])
# API permission-sync ACCEPT — inserted LAST so it goes to position 1 (above # API permission-sync ACCEPT — inserted LAST so it goes to position 1 (above
# the catch-all DROP). Remote cells push permissions to our cell-api via the # the catch-all DROP). Remote cells push permissions to our cell-api via the
@@ -415,6 +455,68 @@ def ensure_cell_api_dnat() -> bool:
return False return False
def ensure_dns_dnat() -> bool:
"""DNAT wg0:53 (UDP+TCP) → cell-dns:53 so VPN peers use the WG server IP for DNS.
Peers are configured with DNS = <wg_server_ip>. Their DNS queries arrive on
wg0:53 and must be forwarded to cell-dns inside the Docker bridge.
"""
try:
dns_ip = _get_dns_container_ip()
if not dns_ip:
logger.warning('ensure_dns_dnat: cell-dns not found')
return False
for proto in ('udp', 'tcp'):
dnat_check = ['-t', 'nat', '-C', 'PREROUTING', '-i', 'wg0', '-p', proto,
'--dport', '53', '-j', 'DNAT', '--to-destination', f'{dns_ip}:53']
dnat_add = ['-t', 'nat', '-A', 'PREROUTING', '-i', 'wg0', '-p', proto,
'--dport', '53', '-j', 'DNAT', '--to-destination', f'{dns_ip}:53']
if _wg_exec(['iptables'] + dnat_check).returncode != 0:
_wg_exec(['iptables'] + dnat_add)
for proto in ('udp', 'tcp'):
fwd_check = ['-C', 'FORWARD', '-i', 'wg0', '-o', 'eth0',
'-p', proto, '--dport', '53', '-j', 'ACCEPT']
fwd_add = ['-I', 'FORWARD', '-i', 'wg0', '-o', 'eth0',
'-p', proto, '--dport', '53', '-j', 'ACCEPT']
if _wg_exec(['iptables'] + fwd_check).returncode != 0:
_wg_exec(['iptables'] + fwd_add)
logger.info(f'ensure_dns_dnat: wg0:53 → {dns_ip}:53')
return True
except Exception as e:
logger.error(f'ensure_dns_dnat: {e}')
return False
def ensure_service_dnat() -> bool:
"""DNAT wg0:80 → cell-caddy:80 so VPN peers reach services via Host-header routing.
All service DNS names resolve to the WG server IP. Traffic to wg0:80 is
forwarded to Caddy, which routes to the correct backend by Host header.
"""
try:
caddy_ip = _get_caddy_container_ip()
if not caddy_ip:
logger.warning('ensure_service_dnat: cell-caddy not found')
return False
dnat_check = ['-t', 'nat', '-C', 'PREROUTING', '-i', 'wg0', '-p', 'tcp',
'--dport', '80', '-j', 'DNAT', '--to-destination', f'{caddy_ip}:80']
dnat_add = ['-t', 'nat', '-A', 'PREROUTING', '-i', 'wg0', '-p', 'tcp',
'--dport', '80', '-j', 'DNAT', '--to-destination', f'{caddy_ip}:80']
if _wg_exec(['iptables'] + dnat_check).returncode != 0:
_wg_exec(['iptables'] + dnat_add)
fwd_check = ['-C', 'FORWARD', '-i', 'wg0', '-o', 'eth0',
'-p', 'tcp', '--dport', '80', '-j', 'ACCEPT']
fwd_add = ['-I', 'FORWARD', '-i', 'wg0', '-o', 'eth0',
'-p', 'tcp', '--dport', '80', '-j', 'ACCEPT']
if _wg_exec(['iptables'] + fwd_check).returncode != 0:
_wg_exec(['iptables'] + fwd_add)
logger.info(f'ensure_service_dnat: wg0:80 → {caddy_ip}:80')
return True
except Exception as e:
logger.error(f'ensure_service_dnat: {e}')
return False
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# DNS ACL (CoreDNS Corefile generation) # DNS ACL (CoreDNS Corefile generation)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
+30 -18
View File
@@ -179,27 +179,39 @@ class NetworkManager(BaseServiceManager):
warnings.append(f'apply_ip_range failed: {e}') warnings.append(f'apply_ip_range failed: {e}')
return {'restarted': restarted, 'warnings': warnings} return {'restarted': restarted, 'warnings': warnings}
def _build_dns_records(self, cell_name: str, ip_range: str) -> List[Dict]: def _get_wg_server_ip(self) -> str:
"""Build the standard set of DNS A records for the given subnet. """Return the WireGuard server IP by reading wg0.conf. Falls back to 10.0.0.1."""
try:
import ipaddress
conf = os.path.join(self.config_dir, 'wireguard', 'wg_confs', 'wg0.conf')
with open(conf) as f:
for line in f:
stripped = line.strip()
if stripped.lower().startswith('address'):
addr = stripped.split('=', 1)[1].strip()
return str(ipaddress.ip_interface(addr).ip)
except Exception:
pass
return '10.0.0.1'
All user-facing names resolve to the Caddy reverse proxy (caddy IP) so def _build_dns_records(self, cell_name: str, ip_range: str) -> List[Dict]:
the Host header is passed through and Caddy routes based on it. """Build the standard set of DNS A records.
Exception: calendar/files/mail/webdav use dedicated virtual IPs so that
iptables per-service firewall rules can target them by destination IP. All service names resolve to the WG server IP so they are reachable
api and webui also go through Caddy they don't have their own VIPs and from both local WG peers and cross-cell peers without Docker bridge
their containers don't serve HTTP on port 80. subnet conflicts. ensure_service_dnat() routes wg0:80 to Caddy, which
routes requests to the correct backend by Host header.
""" """
import ip_utils wg_ip = self._get_wg_server_ip()
ips = ip_utils.get_service_ips(ip_range)
return [ return [
{'name': cell_name, 'type': 'A', 'value': ips['caddy']}, {'name': cell_name, 'type': 'A', 'value': wg_ip},
{'name': 'api', 'type': 'A', 'value': ips['caddy']}, {'name': 'api', 'type': 'A', 'value': wg_ip},
{'name': 'webui', 'type': 'A', 'value': ips['caddy']}, {'name': 'webui', 'type': 'A', 'value': wg_ip},
{'name': 'calendar', 'type': 'A', 'value': ips['vip_calendar']}, {'name': 'calendar', 'type': 'A', 'value': wg_ip},
{'name': 'files', 'type': 'A', 'value': ips['vip_files']}, {'name': 'files', 'type': 'A', 'value': wg_ip},
{'name': 'mail', 'type': 'A', 'value': ips['vip_mail']}, {'name': 'mail', 'type': 'A', 'value': wg_ip},
{'name': 'webmail', 'type': 'A', 'value': ips['vip_mail']}, {'name': 'webmail', 'type': 'A', 'value': wg_ip},
{'name': 'webdav', 'type': 'A', 'value': ips['vip_webdav']}, {'name': 'webdav', 'type': 'A', 'value': wg_ip},
] ]
def get_dns_records(self, zone: str = 'cell') -> List[Dict]: def get_dns_records(self, zone: str = 'cell') -> List[Dict]:
+3 -1
View File
@@ -77,7 +77,9 @@ def peer_services():
if peer_private_key: if peer_private_key:
try: try:
internet_access = peer.get('internet_access', True) internet_access = peer.get('internet_access', True)
allowed_ips = wireguard_manager.FULL_TUNNEL_IPS if internet_access else wireguard_manager.get_split_tunnel_ips() route_via = peer.get('route_via')
use_full = internet_access or bool(route_via)
allowed_ips = wireguard_manager.FULL_TUNNEL_IPS if use_full else wireguard_manager.get_split_tunnel_ips()
wg_config = wireguard_manager.get_peer_config( wg_config = wireguard_manager.get_peer_config(
peer_name=peer_name, peer_name=peer_name,
peer_ip=peer_ip, peer_ip=peer_ip,
+5 -1
View File
@@ -176,7 +176,11 @@ def get_peer_config():
allowed_ips = data.get('allowed_ips') or None allowed_ips = data.get('allowed_ips') or None
if not allowed_ips and registered: if not allowed_ips and registered:
internet_access = registered.get('internet_access', True) internet_access = registered.get('internet_access', True)
allowed_ips = wireguard_manager.FULL_TUNNEL_IPS if internet_access else wireguard_manager.get_split_tunnel_ips() route_via = registered.get('route_via')
# Full tunnel when internet is allowed OR when route_via is set
# (route_via exits via a remote cell — all traffic must go through the tunnel)
use_full = internet_access or bool(route_via)
allowed_ips = wireguard_manager.FULL_TUNNEL_IPS if use_full else wireguard_manager.get_split_tunnel_ips()
result = wireguard_manager.get_peer_config( result = wireguard_manager.get_peer_config(
peer_name=peer_name, peer_name=peer_name,
+27 -3
View File
@@ -203,8 +203,25 @@ class WireGuardManager(BaseServiceManager):
return SERVER_NETWORK return SERVER_NETWORK
def get_split_tunnel_ips(self) -> str: def get_split_tunnel_ips(self) -> str:
"""Return split-tunnel AllowedIPs: VPN subnet + Docker bridge.""" """Return split-tunnel AllowedIPs: local VPN subnet + all connected cell VPN subnets.
return f'{self._get_configured_network()}, 172.20.0.0/16'
172.20.0.0/16 is intentionally excluded all services are accessed via the
WG server IP (ensure_service_dnat routes wg0:80 to Caddy). Including the
Docker bridge subnet would cause routing conflicts when cells share the same range.
"""
local_net = self._get_configured_network()
cell_links_file = os.path.join(self.data_dir, 'api', 'cell_links.json')
cell_nets = []
try:
with open(cell_links_file) as f:
links = json.load(f)
for link in links:
subnet = link.get('vpn_subnet', '')
if subnet and subnet != local_net:
cell_nets.append(subnet)
except Exception:
pass
return ', '.join([local_net] + cell_nets)
def _load_registered_peers(self) -> list: def _load_registered_peers(self) -> list:
"""Read active peers from peers.json for wg0.conf reconstruction after bootstrap.""" """Read active peers from peers.json for wg0.conf reconstruction after bootstrap."""
@@ -733,7 +750,14 @@ class WireGuardManager(BaseServiceManager):
if allowed_ips is None: if allowed_ips is None:
allowed_ips = self.FULL_TUNNEL_IPS allowed_ips = self.FULL_TUNNEL_IPS
server_keys = self.get_keys() server_keys = self.get_keys()
peer_dns = _resolve_peer_dns() # Use WG server IP for DNS: ensure_dns_dnat() routes wg0:53 → cell-dns.
# This works for both split-tunnel (10.0.x.x in AllowedIPs) and cross-cell peers.
addr_str = self._get_configured_address()
try:
import ipaddress as _ipaddress
peer_dns = str(_ipaddress.ip_interface(addr_str).ip)
except Exception:
peer_dns = _resolve_peer_dns()
port = self._get_configured_port() port = self._get_configured_port()
endpoint = server_endpoint if ':' in server_endpoint else f'{server_endpoint}:{port}' endpoint = server_endpoint if ':' in server_endpoint else f'{server_endpoint}:{port}'
addr = peer_ip if '/' in peer_ip else f'{peer_ip}/32' addr = peer_ip if '/' in peer_ip else f'{peer_ip}/32'
+61 -45
View File
@@ -205,6 +205,8 @@ class TestGenerateCorefileWithCellLinks(unittest.TestCase):
class TestApplyPeerRules(unittest.TestCase): class TestApplyPeerRules(unittest.TestCase):
"""Verify correct iptables calls for full-internet vs split-tunnel peers.""" """Verify correct iptables calls for full-internet vs split-tunnel peers."""
_FAKE_CADDY_IP = '172.20.0.2'
def _run_apply(self, peer_ip, settings): def _run_apply(self, peer_ip, settings):
calls_made = [] calls_made = []
@@ -215,7 +217,9 @@ class TestApplyPeerRules(unittest.TestCase):
m.stdout = '' m.stdout = ''
return m return m
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec): with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
patch.object(firewall_manager, '_get_caddy_container_ip',
return_value=self._FAKE_CADDY_IP):
firewall_manager.apply_peer_rules(peer_ip, settings) firewall_manager.apply_peer_rules(peer_ip, settings)
return calls_made return calls_made
@@ -239,23 +243,23 @@ class TestApplyPeerRules(unittest.TestCase):
self.assertIn('DROP', targets) self.assertIn('DROP', targets)
self.assertIn('ACCEPT', targets) self.assertIn('ACCEPT', targets)
def test_service_access_restriction_generates_drop(self): def test_service_access_restriction_uses_caddy_rule(self):
"""service_access controls access via a single Caddy ACCEPT/DROP rule, not per-VIP rules."""
calls = self._run_apply('10.0.0.4', {'internet_access': False, calls = self._run_apply('10.0.0.4', {'internet_access': False,
'service_access': ['calendar'], 'service_access': ['calendar'],
'peer_access': True}) 'peer_access': True})
iptables_calls = [c for c in calls if 'iptables' in c] iptables_calls = [c for c in calls if 'iptables' in c]
# files/mail/webdav should be DROPped, calendar ACCEPTed # Caddy rule should be ACCEPT (any non-empty service_access)
targets_with_ips = [ caddy_rules = [c for c in iptables_calls
(c[c.index('-d') + 1], c[c.index('-j') + 1]) if '-d' in c and self._FAKE_CADDY_IP in c
for c in iptables_calls and '--dport' in c and '80' in c]
if '-d' in c and '-j' in c self.assertTrue(caddy_rules, "Expected a Caddy port-80 rule for service access")
] target = caddy_rules[-1][caddy_rules[-1].index('-j') + 1]
svc_rules = {ip: t for ip, t in targets_with_ips self.assertEqual(target, 'ACCEPT', "Non-empty service_access should ACCEPT Caddy")
if ip in firewall_manager.SERVICE_IPS.values()} # No per-VIP rules — per-service control is at DNS ACL level
calendar_ip = firewall_manager.SERVICE_IPS['calendar'] for svc_ip in firewall_manager.SERVICE_IPS.values():
files_ip = firewall_manager.SERVICE_IPS['files'] vip_rules = [c for c in iptables_calls if '-d' in c and svc_ip in c]
self.assertEqual(svc_rules.get(calendar_ip), 'ACCEPT') self.assertFalse(vip_rules, f"No per-VIP FORWARD rules expected for {svc_ip}")
self.assertEqual(svc_rules.get(files_ip), 'DROP')
def test_all_rules_tagged_with_peer_comment(self): def test_all_rules_tagged_with_peer_comment(self):
calls = self._run_apply('10.0.0.2', {'internet_access': True, calls = self._run_apply('10.0.0.2', {'internet_access': True,
@@ -380,18 +384,21 @@ class TestUpdateServiceIps(unittest.TestCase):
self.assertEqual(set(firewall_manager.SERVICE_IPS.keys()), self.assertEqual(set(firewall_manager.SERVICE_IPS.keys()),
{'calendar', 'files', 'mail', 'webdav'}) {'calendar', 'files', 'mail', 'webdav'})
def test_apply_peer_rules_uses_updated_ips(self): def test_apply_peer_rules_uses_caddy_not_vips(self):
"""Service access uses Caddy IP for FORWARD rules, not SERVICE_IPS VIPs."""
firewall_manager.update_service_ips('10.0.0.0/24') firewall_manager.update_service_ips('10.0.0.0/24')
called_with = [] called_with = []
_CADDY_IP = '172.20.0.2'
def fake_wg_exec(args): def fake_wg_exec(args):
called_with.append(args) called_with.append(args)
m = MagicMock() m = MagicMock()
m.returncode = 1 # simulate rule-doesn't-exist → _ensure_rule inserts m.returncode = 1
return m return m
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \ with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
patch.object(firewall_manager, 'clear_peer_rules'): patch.object(firewall_manager, 'clear_peer_rules'), \
patch.object(firewall_manager, '_get_caddy_container_ip', return_value=_CADDY_IP):
firewall_manager.apply_peer_rules('10.0.0.5', { firewall_manager.apply_peer_rules('10.0.0.5', {
'internet_access': True, 'internet_access': True,
'service_access': ['calendar'], 'service_access': ['calendar'],
@@ -400,9 +407,10 @@ class TestUpdateServiceIps(unittest.TestCase):
iptables_calls = [c for c in called_with if c and c[0] == 'iptables'] iptables_calls = [c for c in called_with if c and c[0] == 'iptables']
dest_ips = [c[c.index('-d') + 1] for c in iptables_calls if '-d' in c] dest_ips = [c[c.index('-d') + 1] for c in iptables_calls if '-d' in c]
# calendar vIP should now be 10.0.0.21 # Caddy IP should appear for service access
self.assertIn('10.0.0.21', dest_ips) self.assertIn(_CADDY_IP, dest_ips)
# old IP must not appear # VIPs (old or updated) must not appear — service access is via Caddy
self.assertNotIn('10.0.0.21', dest_ips)
self.assertNotIn('172.20.0.21', dest_ips) self.assertNotIn('172.20.0.21', dest_ips)
@@ -416,9 +424,11 @@ class TestCellRules(unittest.TestCase):
# ── helpers ─────────────────────────────────────────────────────────────── # ── helpers ───────────────────────────────────────────────────────────────
_FAKE_API_IP = '172.20.0.10' _FAKE_API_IP = '172.20.0.10'
_FAKE_CADDY_IP = '172.20.0.2'
_FAKE_DNS_IP = '172.20.0.3'
def _capture_apply(self, cell_name, vpn_subnet, inbound_services): def _capture_apply(self, cell_name, vpn_subnet, inbound_services):
"""Run apply_cell_rules with _wg_exec and _get_cell_api_ip mocked.""" """Run apply_cell_rules with _wg_exec and container IP helpers mocked."""
calls_made = [] calls_made = []
def fake_wg_exec(args): def fake_wg_exec(args):
@@ -429,7 +439,9 @@ class TestCellRules(unittest.TestCase):
return m return m
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \ with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
patch.object(firewall_manager, '_get_cell_api_ip', return_value=self._FAKE_API_IP): patch.object(firewall_manager, '_get_cell_api_ip', return_value=self._FAKE_API_IP), \
patch.object(firewall_manager, '_get_caddy_container_ip', return_value=self._FAKE_CADDY_IP), \
patch.object(firewall_manager, '_get_dns_container_ip', return_value=self._FAKE_DNS_IP):
firewall_manager.apply_cell_rules(cell_name, vpn_subnet, inbound_services) firewall_manager.apply_cell_rules(cell_name, vpn_subnet, inbound_services)
return [c for c in calls_made if 'iptables' in c] return [c for c in calls_made if 'iptables' in c]
@@ -480,18 +492,18 @@ class TestCellRules(unittest.TestCase):
self.assertTrue(subnet_drops, "Expected a catch-all DROP rule for the subnet") self.assertTrue(subnet_drops, "Expected a catch-all DROP rule for the subnet")
def test_apply_cell_rules_sends_accept_for_allowed_service(self): def test_apply_cell_rules_sends_accept_for_allowed_service(self):
"""apply_cell_rules inserts ACCEPT for the calendar VIP when calendar is in inbound.""" """apply_cell_rules inserts Caddy ACCEPT when inbound_services is non-empty."""
calls = self._capture_apply('office', '10.0.1.0/24', ['calendar']) calls = self._capture_apply('office', '10.0.1.0/24', ['calendar'])
calendar_ip = firewall_manager.SERVICE_IPS['calendar'] caddy_targets = self._targets_for_dest(calls, self._FAKE_CADDY_IP)
calendar_targets = self._targets_for_dest(calls, calendar_ip) self.assertIn('ACCEPT', caddy_targets,
self.assertIn('ACCEPT', calendar_targets) "Expected ACCEPT to Caddy when inbound_services is non-empty")
def test_apply_cell_rules_sends_drop_for_disallowed_service(self): def test_apply_cell_rules_no_caddy_accept_when_no_inbound(self):
"""apply_cell_rules inserts DROP for a service not in inbound_services.""" """apply_cell_rules does NOT insert Caddy ACCEPT when inbound_services is empty."""
calls = self._capture_apply('office', '10.0.1.0/24', ['calendar']) calls = self._capture_apply('office', '10.0.1.0/24', [])
files_ip = firewall_manager.SERVICE_IPS['files'] caddy_targets = self._targets_for_dest(calls, self._FAKE_CADDY_IP)
files_targets = self._targets_for_dest(calls, files_ip) self.assertNotIn('ACCEPT', caddy_targets,
self.assertIn('DROP', files_targets) "No Caddy ACCEPT expected when inbound_services is empty")
def test_apply_cell_rules_accepts_api_sync_traffic(self): def test_apply_cell_rules_accepts_api_sync_traffic(self):
"""apply_cell_rules inserts ACCEPT for cell-api:3000 so permission-sync pushes pass.""" """apply_cell_rules inserts ACCEPT for cell-api:3000 so permission-sync pushes pass."""
@@ -517,7 +529,9 @@ class TestCellRules(unittest.TestCase):
m = MagicMock(); m.returncode = 0; m.stdout = ''; return m m = MagicMock(); m.returncode = 0; m.stdout = ''; return m
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \ with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec), \
patch.object(firewall_manager, '_get_cell_api_ip', return_value='172.20.0.10'): patch.object(firewall_manager, '_get_cell_api_ip', return_value='172.20.0.10'), \
patch.object(firewall_manager, '_get_caddy_container_ip', return_value='172.20.0.2'), \
patch.object(firewall_manager, '_get_dns_container_ip', return_value='172.20.0.3'):
firewall_manager.apply_cell_rules('office', '10.0.1.0/24', []) firewall_manager.apply_cell_rules('office', '10.0.1.0/24', [])
# The API-sync ACCEPT must be the LAST -I FORWARD insertion so it sits at position 1 # The API-sync ACCEPT must be the LAST -I FORWARD insertion so it sits at position 1
@@ -527,26 +541,28 @@ class TestCellRules(unittest.TestCase):
# ── apply_cell_rules — empty inbound (all-deny) ─────────────────────────── # ── apply_cell_rules — empty inbound (all-deny) ───────────────────────────
def test_apply_cell_rules_empty_inbound_all_drop(self): def test_apply_cell_rules_empty_inbound_no_service_accept(self):
"""With inbound_services=[], all per-service rules are DROP.""" """With inbound_services=[], no service ACCEPT is added; catch-all DROP blocks traffic."""
calls = self._capture_apply('office', '10.0.1.0/24', []) calls = self._capture_apply('office', '10.0.1.0/24', [])
# No ACCEPT to Caddy
caddy_targets = self._targets_for_dest(calls, self._FAKE_CADDY_IP)
self.assertNotIn('ACCEPT', caddy_targets,
"No Caddy ACCEPT expected with empty inbound_services")
# No per-VIP rules at all
for service, svc_ip in firewall_manager.SERVICE_IPS.items(): for service, svc_ip in firewall_manager.SERVICE_IPS.items():
svc_targets = self._targets_for_dest(calls, svc_ip) svc_targets = self._targets_for_dest(calls, svc_ip)
self.assertTrue(svc_targets, self.assertFalse(svc_targets,
f"Expected at least one rule for {service} ({svc_ip})") f"No per-VIP rules expected for {service} ({svc_ip})")
self.assertNotIn('ACCEPT', svc_targets,
f"{service} should be DROP when not in inbound_services")
# ── apply_cell_rules — all inbound (all-accept) ─────────────────────────── # ── apply_cell_rules — all inbound (all-accept) ───────────────────────────
def test_apply_cell_rules_all_inbound_all_accept(self): def test_apply_cell_rules_all_inbound_caddy_accept(self):
"""With all four services in inbound, all per-service rules are ACCEPT.""" """With all four services in inbound, an ACCEPT rule is added for Caddy port 80."""
all_services = list(firewall_manager.SERVICE_IPS.keys()) all_services = list(firewall_manager.SERVICE_IPS.keys())
calls = self._capture_apply('office', '10.0.1.0/24', all_services) calls = self._capture_apply('office', '10.0.1.0/24', all_services)
for service, svc_ip in firewall_manager.SERVICE_IPS.items(): caddy_targets = self._targets_for_dest(calls, self._FAKE_CADDY_IP)
svc_targets = self._targets_for_dest(calls, svc_ip) self.assertIn('ACCEPT', caddy_targets,
self.assertIn('ACCEPT', svc_targets, "Expected ACCEPT to Caddy when all services are in inbound_services")
f"{service} should be ACCEPT when in inbound_services")
# ── apply_cell_rules — all rules tagged ─────────────────────────────────── # ── apply_cell_rules — all rules tagged ───────────────────────────────────
+15 -11
View File
@@ -284,20 +284,24 @@ class TestBootstrapDnsRecords(unittest.TestCase):
self.assertTrue(os.path.exists(zone_file)) self.assertTrue(os.path.exists(zone_file))
@patch('subprocess.run') @patch('subprocess.run')
def test_contains_default_caddy_ip(self, _mock): def test_contains_wg_server_ip(self, _mock):
"""Zone file records now use WG server IP (10.0.0.1) not Docker VIPs."""
self.nm.bootstrap_dns_records('mycell', 'cell') self.nm.bootstrap_dns_records('mycell', 'cell')
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone') zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
content = open(zone_file).read() content = open(zone_file).read()
self.assertIn('172.20.0.2', content) # caddy self.assertIn('10.0.0.1', content) # WG server IP for all services
self.assertNotIn('172.20.0.2', content) # Caddy VIP no longer in zone
self.assertNotIn('172.20.0.21', content) # Service VIPs no longer in zone
@patch('subprocess.run') @patch('subprocess.run')
def test_custom_ip_range_used(self, _mock): def test_custom_ip_range_does_not_affect_service_ips(self, _mock):
"""ip_range is no longer used for service record IPs; WG server IP is used."""
self.nm.bootstrap_dns_records('mycell', 'cell', ip_range='10.5.0.0/24') self.nm.bootstrap_dns_records('mycell', 'cell', ip_range='10.5.0.0/24')
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone') zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
content = open(zone_file).read() content = open(zone_file).read()
self.assertIn('10.5.0.2', content) # caddy self.assertIn('10.0.0.1', content) # WG server IP
self.assertIn('10.5.0.21', content) # vip_calendar self.assertNotIn('10.5.0.2', content) # old caddy pattern gone
self.assertNotIn('172.20', content) self.assertNotIn('10.5.0.21', content) # old VIP pattern gone
@patch('subprocess.run') @patch('subprocess.run')
def test_idempotent_skips_existing_zone(self, _mock): def test_idempotent_skips_existing_zone(self, _mock):
@@ -324,15 +328,15 @@ class TestApplyIpRange(unittest.TestCase):
shutil.rmtree(self.test_dir) shutil.rmtree(self.test_dir)
@patch('subprocess.run') @patch('subprocess.run')
def test_zone_file_updated_with_new_ips(self, _mock): def test_zone_file_updated_with_wg_server_ip(self, _mock):
# Bootstrap with default range, then change to 10.0.0.0/24 """apply_ip_range regenerates zone with WG server IP for all service records."""
self.nm.bootstrap_dns_records('mycell', 'cell', '172.20.0.0/16') self.nm.bootstrap_dns_records('mycell', 'cell', '172.20.0.0/16')
result = self.nm.apply_ip_range('10.0.0.0/24', 'mycell', 'cell') result = self.nm.apply_ip_range('10.0.0.0/24', 'mycell', 'cell')
zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone') zone_file = os.path.join(self.nm.dns_zones_dir, 'cell.zone')
content = open(zone_file).read() content = open(zone_file).read()
self.assertIn('10.0.0.2', content) # caddy self.assertIn('10.0.0.1', content) # WG server IP for all services
self.assertIn('10.0.0.21', content) # vip_calendar self.assertNotIn('172.20.0.2', content) # old Caddy pattern gone
self.assertNotIn('172.20', content) self.assertNotIn('172.20.0.21', content) # old VIP pattern gone
@patch('subprocess.run') @patch('subprocess.run')
def test_returns_restarted_on_success(self, _mock): def test_returns_restarted_on_success(self, _mock):
+41 -34
View File
@@ -465,10 +465,15 @@ class TestPeerEndpointAccessControl:
class TestDNSZoneRecords: class TestDNSZoneRecords:
""" """
Verify that network_manager._build_dns_records() generates the correct IPs. Verify that network_manager._build_dns_records() generates the correct IPs.
api and webui must point to Caddy (not their container IPs) so Caddy can
reverse-proxy them their containers don't listen on port 80. All service names now resolve to the WG server IP (10.0.0.1) rather than
Docker VIPs. ensure_service_dnat() routes wg0:80 Caddy; Caddy routes to
the correct backend by Host header. This allows cross-cell peers to reach
services without Docker bridge subnet conflicts.
""" """
_WG_SERVER_IP = '10.0.0.1'
def setUp(self): def setUp(self):
pass pass
@@ -477,59 +482,59 @@ class TestDNSZoneRecords:
mgr = nm.NetworkManager.__new__(nm.NetworkManager) mgr = nm.NetworkManager.__new__(nm.NetworkManager)
return mgr._build_dns_records(cell_name, ip_range) return mgr._build_dns_records(cell_name, ip_range)
def test_api_resolves_to_caddy_not_api_container(self): def test_api_resolves_to_wg_server_ip(self):
records = self._records() records = self._records()
api_rec = next((r for r in records if r['name'] == 'api'), None) api_rec = next((r for r in records if r['name'] == 'api'), None)
assert api_rec is not None, "No DNS record for 'api'" assert api_rec is not None, "No DNS record for 'api'"
assert api_rec['value'] == '172.20.0.2', ( assert api_rec['value'] == self._WG_SERVER_IP, (
f"api.dev should resolve to Caddy (172.20.0.2), not the API container " f"api.dev should resolve to WG server IP ({self._WG_SERVER_IP}); "
f"(172.20.0.10); got {api_rec['value']}" f"got {api_rec['value']}"
) )
def test_webui_resolves_to_caddy_not_webui_container(self): def test_webui_resolves_to_wg_server_ip(self):
records = self._records() records = self._records()
rec = next((r for r in records if r['name'] == 'webui'), None) rec = next((r for r in records if r['name'] == 'webui'), None)
assert rec is not None, "No DNS record for 'webui'" assert rec is not None, "No DNS record for 'webui'"
assert rec['value'] == '172.20.0.2', ( assert rec['value'] == self._WG_SERVER_IP, (
f"webui.dev should resolve to Caddy (172.20.0.2), not the WebUI container " f"webui.dev should resolve to WG server IP ({self._WG_SERVER_IP}); "
f"(172.20.0.11); got {rec['value']}" f"got {rec['value']}"
) )
def test_calendar_uses_vip(self): def test_calendar_resolves_to_wg_server_ip(self):
records = self._records() records = self._records()
rec = next((r for r in records if r['name'] == 'calendar'), None) rec = next((r for r in records if r['name'] == 'calendar'), None)
assert rec and rec['value'] == '172.20.0.21', \ assert rec and rec['value'] == self._WG_SERVER_IP, \
f"calendar.dev VIP should be 172.20.0.21; got {rec}" f"calendar.dev should resolve to WG server IP; got {rec}"
def test_files_uses_vip(self): def test_files_resolves_to_wg_server_ip(self):
records = self._records() records = self._records()
rec = next((r for r in records if r['name'] == 'files'), None) rec = next((r for r in records if r['name'] == 'files'), None)
assert rec and rec['value'] == '172.20.0.22', \ assert rec and rec['value'] == self._WG_SERVER_IP, \
f"files.dev VIP should be 172.20.0.22; got {rec}" f"files.dev should resolve to WG server IP; got {rec}"
def test_mail_uses_vip(self): def test_mail_resolves_to_wg_server_ip(self):
records = self._records() records = self._records()
rec = next((r for r in records if r['name'] == 'mail'), None) rec = next((r for r in records if r['name'] == 'mail'), None)
assert rec and rec['value'] == '172.20.0.23', \ assert rec and rec['value'] == self._WG_SERVER_IP, \
f"mail.dev VIP should be 172.20.0.23; got {rec}" f"mail.dev should resolve to WG server IP; got {rec}"
def test_webmail_uses_mail_vip(self): def test_webmail_resolves_to_wg_server_ip(self):
records = self._records() records = self._records()
rec = next((r for r in records if r['name'] == 'webmail'), None) rec = next((r for r in records if r['name'] == 'webmail'), None)
assert rec and rec['value'] == '172.20.0.23', \ assert rec and rec['value'] == self._WG_SERVER_IP, \
f"webmail.dev should share the mail VIP 172.20.0.23; got {rec}" f"webmail.dev should resolve to WG server IP; got {rec}"
def test_webdav_uses_vip(self): def test_webdav_resolves_to_wg_server_ip(self):
records = self._records() records = self._records()
rec = next((r for r in records if r['name'] == 'webdav'), None) rec = next((r for r in records if r['name'] == 'webdav'), None)
assert rec and rec['value'] == '172.20.0.24', \ assert rec and rec['value'] == self._WG_SERVER_IP, \
f"webdav.dev VIP should be 172.20.0.24; got {rec}" f"webdav.dev should resolve to WG server IP; got {rec}"
def test_cell_name_resolves_to_caddy(self): def test_cell_name_resolves_to_wg_server_ip(self):
records = self._records(cell_name='mypic') records = self._records(cell_name='mypic')
rec = next((r for r in records if r['name'] == 'mypic'), None) rec = next((r for r in records if r['name'] == 'mypic'), None)
assert rec and rec['value'] == '172.20.0.2', \ assert rec and rec['value'] == self._WG_SERVER_IP, \
f"mypic.dev should resolve to Caddy (172.20.0.2); got {rec}" f"mypic.dev should resolve to WG server IP; got {rec}"
def test_all_records_are_type_a(self): def test_all_records_are_type_a(self):
records = self._records() records = self._records()
@@ -540,21 +545,23 @@ class TestDNSZoneRecords:
class TestDNSZoneRecordsWithPytest: class TestDNSZoneRecordsWithPytest:
"""Same as above but using pytest-style (no setUp/tearDown).""" """Same as above but using pytest-style (no setUp/tearDown)."""
_WG_SERVER_IP = '10.0.0.1'
@pytest.fixture @pytest.fixture
def records(self): def records(self):
import network_manager as nm import network_manager as nm
mgr = nm.NetworkManager.__new__(nm.NetworkManager) mgr = nm.NetworkManager.__new__(nm.NetworkManager)
return mgr._build_dns_records('pic0', '172.20.0.0/16') return mgr._build_dns_records('pic0', '172.20.0.0/16')
def test_api_resolves_to_caddy(self, records): def test_api_resolves_to_wg_server_ip(self, records):
rec = next((r for r in records if r['name'] == 'api'), None) rec = next((r for r in records if r['name'] == 'api'), None)
assert rec and rec['value'] == '172.20.0.2', \ assert rec and rec['value'] == self._WG_SERVER_IP, \
f"api.dev should point to Caddy (172.20.0.2); got {rec}" f"api.dev should point to WG server IP ({self._WG_SERVER_IP}); got {rec}"
def test_webui_resolves_to_caddy(self, records): def test_webui_resolves_to_wg_server_ip(self, records):
rec = next((r for r in records if r['name'] == 'webui'), None) rec = next((r for r in records if r['name'] == 'webui'), None)
assert rec and rec['value'] == '172.20.0.2', \ assert rec and rec['value'] == self._WG_SERVER_IP, \
f"webui.dev should point to Caddy (172.20.0.2); got {rec}" f"webui.dev should point to WG server IP ({self._WG_SERVER_IP}); got {rec}"
# ─────────────────── Caddyfile generation ───────────────────────────────────── # ─────────────────── Caddyfile generation ─────────────────────────────────────
+3 -2
View File
@@ -244,7 +244,7 @@ class TestWireGuardManager(unittest.TestCase):
self.assertIn('[Peer]', config) self.assertIn('[Peer]', config)
self.assertIn('PrivateKey', config) self.assertIn('PrivateKey', config)
self.assertIn('Address = 10.0.0.2/32', config) self.assertIn('Address = 10.0.0.2/32', config)
self.assertIn('DNS = 172.20.0.3', config) self.assertIn('DNS = 10.0.0.1', config)
self.assertIn(keys['public_key'], config) self.assertIn(keys['public_key'], config)
self.assertIn('AllowedIPs', config) self.assertIn('AllowedIPs', config)
@@ -418,7 +418,8 @@ class TestWireGuardConfigReads(unittest.TestCase):
self._write_wg_conf(address='10.1.0.1/24') self._write_wg_conf(address='10.1.0.1/24')
split = self.wg.get_split_tunnel_ips() split = self.wg.get_split_tunnel_ips()
self.assertIn('10.1.0.0/24', split) self.assertIn('10.1.0.0/24', split)
self.assertIn('172.20.0.0/16', split) # 172.20.0.0/16 is intentionally excluded — services now use WG server IP via DNAT
self.assertNotIn('172.20.0.0/16', split)
self.assertNotIn('10.0.0.0/24', split) self.assertNotIn('10.0.0.0/24', split)
def test_get_server_config_uses_configured_port(self): def test_get_server_config_uses_configured_port(self):
+17 -13
View File
@@ -98,9 +98,12 @@ class TestInternetForwardingRules(unittest.TestCase):
class TestPeerConfigDns(unittest.TestCase): class TestPeerConfigDns(unittest.TestCase):
""" """
Verify that peer client configs include a DNS = <ip> line pointing to the Verify that peer client configs include a DNS = <wg_server_ip> line.
PIC DNS container. Without DNS, the client tunnel has no internet-accessible
domain resolution even though packets are forwarded correctly. DNS is set to the WG server IP (e.g. 10.0.0.1) rather than the Docker
cell-dns container IP. ensure_dns_dnat() routes wg0:53 cell-dns, so
peers reach CoreDNS via the WG server IP works for both split-tunnel
(10.0.x.x in AllowedIPs) and cross-cell peers.
""" """
def setUp(self): def setUp(self):
@@ -123,19 +126,20 @@ class TestPeerConfigDns(unittest.TestCase):
# Must be a parseable IPv4 address # Must be a parseable IPv4 address
ipaddress.IPv4Address(dns_ip) ipaddress.IPv4Address(dns_ip)
def test_peer_config_dns_defaults_to_cell_dns_ip(self): def test_peer_config_dns_uses_wg_server_ip(self):
"""When cell-dns hostname can't be resolved, falls back to 172.20.0.3.""" """DNS in peer config is the WG server IP; ensure_dns_dnat() routes wg0:53 → cell-dns."""
with patch('wireguard_manager.socket.gethostbyname', side_effect=OSError): keys = self.wg.generate_peer_keys('p1')
keys = self.wg.generate_peer_keys('p1') cfg = self.wg.get_peer_config('p1', '10.0.0.5', keys['private_key'])
cfg = self.wg.get_peer_config('p1', '10.0.0.5', keys['private_key']) # Default WG server address is 10.0.0.1/24 when no wg0.conf exists
self.assertIn('DNS = 172.20.0.3', cfg) self.assertIn('DNS = 10.0.0.1', cfg)
def test_peer_config_dns_uses_resolved_hostname(self): def test_peer_config_dns_fallback_to_resolve_on_error(self):
"""When cell-dns resolves, its IP is used as the DNS server.""" """If WG address parsing fails, _resolve_peer_dns() is used as fallback."""
with patch('wireguard_manager.socket.gethostbyname', return_value='172.20.0.3'): with patch.object(self.wg, '_get_configured_address', return_value='invalid'), \
patch('wireguard_manager.socket.gethostbyname', return_value='172.20.0.9'):
keys = self.wg.generate_peer_keys('p2') keys = self.wg.generate_peer_keys('p2')
cfg = self.wg.get_peer_config('p2', '10.0.0.6', keys['private_key']) cfg = self.wg.get_peer_config('p2', '10.0.0.6', keys['private_key'])
self.assertIn('DNS = 172.20.0.3', cfg) self.assertIn('DNS = 172.20.0.9', cfg)
def test_resolve_peer_dns_fallback(self): def test_resolve_peer_dns_fallback(self):
"""_resolve_peer_dns() always returns a string even when DNS lookup fails.""" """_resolve_peer_dns() always returns a string even when DNS lookup fails."""