Files
pic/api/ip_utils.py
T
roof d5018c2b34 fix: architecture audit — security, atomicity, broken endpoints, test coverage
Sprint 1 — Security & correctness:
- Restore all 10 commented-out is_local_request() checks (vault, containers, images, volumes)
- Fix XFF spoofing: only trust the LAST X-Forwarded-For entry (Caddy's append), not all
- Require prefix length in wireguard.address (was accepting bare IPs like 10.0.0.1)
- Validate service_access list in add_peer (valid: calendar/files/mail/webdav)
- Fix dhcp/reservations POST/DELETE: unpack mac/ip/hostname from body (was passing dict as positional arg)
- Fix network/test POST: remove spurious data arg (test_connectivity takes no args)
- Fix remove_peer: clear iptables rules and regenerate DNS ACLs on deletion (was leaving stale rules)
- Fix CoreDNS reload: SIGHUP → SIGUSR1 (SIGHUP kills the process; SIGUSR1 triggers reload plugin)
- Remove local.{domain} block from Corefile template (local.zone doesn't exist, caused log spam)
- Fix routing_manager._remove_nat_rule: targeted -D instead of flushing entire POSTROUTING chain

Sprint 2 — State consistency:
- Atomic config writes in config_manager, ip_utils, firewall_manager, network_manager
  (write to .tmp → fsync → os.replace, prevents truncated files on kill)
- backup_config: now also backs up Caddyfile, Corefile, .env, DNS zone files
- restore_config: restores all of the above so config stays consistent after restore

Sprint 3 — Dead code / documentation:
- Remove CellManager instantiation from app startup (was never called, double-instantiated all managers)
- Document routing_manager scope (targets host, not cell-wireguard; methods not called by any active route)

Sprint 4 — Test infrastructure:
- Add tests/conftest.py with shared tmp_dir, tmp_config_dir, tmp_data_dir, flask_client fixtures
- Add tests/test_config_validation.py: 400 paths for ip_range, port, wireguard.address validation
- Add tests/test_ip_utils_caddyfile.py: 14 tests for write_caddyfile (was completely untested)
- Expand test_app_misc.py: 7 new is_local_request tests covering XFF spoofing and cell-network IPs
- Add --cov-fail-under=70 to make test-coverage
- Add pre-commit hook that runs pytest before every commit

414 tests pass (was 372).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 03:27:52 -04:00

245 lines
7.4 KiB
Python

#!/usr/bin/env python3
"""
IP utility functions for PIC — derive all container and virtual IPs from the
docker network subnet so that one ip_range setting drives everything.
The canonical source of IPs is the .env file at the project root.
docker-compose.yml uses ${VAR:-default} substitution to read from it.
"""
import ipaddress
import os
from typing import Dict, List, Optional
# Fixed host-number offsets within the subnet (e.g. 172.20.0.0/16 → 172.20.0.<offset>)
CONTAINER_OFFSETS: Dict[str, int] = {
'caddy': 2,
'dns': 3,
'dhcp': 4,
'ntp': 5,
'mail': 6,
'radicale': 7,
'webdav': 8,
'wireguard': 9,
'api': 10,
'webui': 11,
'rainloop': 12,
'filegator': 13,
# Caddy virtual IPs — each service gets its own IP so Caddy can route by dst addr
'vip_calendar': 21,
'vip_files': 22,
'vip_mail': 23,
'vip_webdav': 24,
}
# Mapping from service key → docker-compose env var name (static containers only)
ENV_VAR_NAMES: Dict[str, str] = {
'caddy': 'CADDY_IP',
'dns': 'DNS_IP',
'dhcp': 'DHCP_IP',
'ntp': 'NTP_IP',
'mail': 'MAIL_IP',
'radicale': 'RADICALE_IP',
'webdav': 'WEBDAV_IP',
'wireguard': 'WG_IP',
'api': 'API_IP',
'webui': 'WEBUI_IP',
'rainloop': 'RAINLOOP_IP',
'filegator': 'FILEGATOR_IP',
}
# Default host-port bindings for each service
PORT_DEFAULTS: Dict[str, int] = {
'dns_port': 53,
'dhcp_port': 67,
'ntp_port': 123,
'mail_smtp_port': 25,
'mail_submission_port': 587,
'mail_imap_port': 993,
'radicale_port': 5232,
'webdav_port': 8080,
'wg_port': 51820,
'api_port': 3000,
'webui_port': 8081,
'rainloop_port': 8888,
'filegator_port': 8082,
}
# Mapping from port key → docker-compose env var name
PORT_ENV_VAR_NAMES: Dict[str, str] = {
'dns_port': 'DNS_PORT',
'dhcp_port': 'DHCP_PORT',
'ntp_port': 'NTP_PORT',
'mail_smtp_port': 'MAIL_SMTP_PORT',
'mail_submission_port': 'MAIL_SUBMISSION_PORT',
'mail_imap_port': 'MAIL_IMAP_PORT',
'radicale_port': 'RADICALE_PORT',
'webdav_port': 'WEBDAV_PORT',
'wg_port': 'WG_PORT',
'api_port': 'API_PORT',
'webui_port': 'WEBUI_PORT',
'rainloop_port': 'RAINLOOP_PORT',
'filegator_port': 'FILEGATOR_PORT',
}
# Mapping from port key → docker-compose service name(s) that must restart on port change
PORT_TO_CONTAINERS: Dict[str, List[str]] = {
'dns_port': ['dns'],
'dhcp_port': ['dhcp'],
'ntp_port': ['ntp'],
'mail_smtp_port': ['mail'],
'mail_submission_port': ['mail'],
'mail_imap_port': ['mail'],
'radicale_port': ['radicale'],
'webdav_port': ['webdav'],
'wg_port': ['wireguard'],
'api_port': ['api'],
'webui_port': ['webui'],
'rainloop_port': ['rainloop'],
'filegator_port': ['filegator'],
}
def get_service_ips(ip_range: str) -> Dict[str, str]:
"""
Derive all container and virtual IPs from the docker network subnet.
Example: '172.20.0.0/16'{'caddy': '172.20.0.2', 'dns': '172.20.0.3', ...}
The offset of each service within the subnet is fixed (see CONTAINER_OFFSETS).
"""
network = ipaddress.IPv4Network(ip_range, strict=False)
base = int(network.network_address)
return {
name: str(ipaddress.IPv4Address(base + offset))
for name, offset in CONTAINER_OFFSETS.items()
}
def get_virtual_ips(ip_range: str) -> Dict[str, str]:
"""
Return only the four Caddy virtual IPs keyed by service name.
Used by firewall_manager to set per-service iptables rules.
"""
ips = get_service_ips(ip_range)
return {
'calendar': ips['vip_calendar'],
'files': ips['vip_files'],
'mail': ips['vip_mail'],
'webdav': ips['vip_webdav'],
}
def write_caddyfile(ip_range: str, cell_name: str, domain: str, path: str) -> bool:
"""
Generate the Caddy reverse-proxy config from the current ip_range, cell_name, and domain.
Must be called after any ip_range or domain change so Caddy routes correctly.
Container-internal ports are fixed by docker-compose and never change.
Returns True on success.
"""
try:
ips = get_service_ips(ip_range)
caddy_ip = ips['caddy']
vip_calendar = ips['vip_calendar']
vip_files = ips['vip_files']
vip_mail = ips['vip_mail']
vip_webdav = ips['vip_webdav']
content = f"""\
{{
auto_https off
}}
# Main cell domain — no service-IP restriction needed
http://{cell_name}.{domain}, http://{caddy_ip}:80 {{
handle /api/* {{
reverse_proxy cell-api:3000
}}
handle /calendar* {{
reverse_proxy cell-radicale:5232
}}
handle /files* {{
reverse_proxy cell-filegator:8080
}}
handle /webmail* {{
reverse_proxy cell-rainloop:8888
}}
handle {{
reverse_proxy cell-webui:80
}}
}}
# Per-service virtual IPs — each gets its own IP so iptables can target them
http://calendar.{domain}, http://{vip_calendar}:80 {{
reverse_proxy cell-radicale:5232
}}
http://files.{domain}, http://{vip_files}:80 {{
reverse_proxy cell-filegator:8080
}}
http://mail.{domain}, http://webmail.{domain}, http://{vip_mail}:80 {{
reverse_proxy cell-rainloop:8888
}}
http://webdav.{domain}, http://{vip_webdav}:80 {{
reverse_proxy cell-webdav:80
}}
http://api.{domain} {{
reverse_proxy cell-api:3000
}}
# Catch-all for direct IP / localhost
:80 {{
handle /api/* {{
reverse_proxy cell-api:3000
}}
handle {{
reverse_proxy cell-webui:80
}}
}}
"""
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
tmp = path + '.tmp'
with open(tmp, 'w') as f:
f.write(content)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
return True
except Exception:
return False
def write_env_file(ip_range: str, path: str, ports: Optional[Dict[str, int]] = None) -> bool:
"""
Write (or overwrite) the docker-compose .env file with IPs and ports.
docker-compose reads this file automatically at startup to substitute
${VAR:-default} placeholders in docker-compose.yml. Call this at setup
time and whenever ip_range or port values change.
ports: override specific port defaults (keys from PORT_DEFAULTS).
Returns True on success, False if the path is not writable.
"""
try:
ips = get_service_ips(ip_range)
merged_ports = dict(PORT_DEFAULTS)
if ports:
merged_ports.update(ports)
lines = [f'CELL_NETWORK={ip_range}\n']
for svc, var in ENV_VAR_NAMES.items():
lines.append(f'{var}={ips[svc]}\n')
for key, var in PORT_ENV_VAR_NAMES.items():
lines.append(f'{var}={merged_ports[key]}\n')
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
tmp = path + '.tmp'
with open(tmp, 'w') as f:
f.writelines(lines)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
return True
except Exception:
return False