Files
pic/api/app.py
T
roof 5a4e292440 fix: allow reply traffic from connected cells through FORWARD chain
apply_cell_rules drops all traffic from a cell's subnet except specific
service ports. This also drops ICMP replies and TCP ACKs for connections
initiated by local peers to the connected cell, breaking cross-cell
routing (ping to 10.0.0.1 silently dropped by test's cell DROP rule).

Fix: ensure_forward_stateful() inserts a stateful ESTABLISHED,RELATED
ACCEPT at the top of FORWARD. Called from apply_cell_rules (every cell
add/update) and from _apply_startup_enforcement. Idempotent.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 15:13:59 -04:00

671 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Personal Internet Cell API Server
Provides REST API endpoints for managing:
- Cell status and configuration
- Network services (DNS, DHCP, NTP)
- WireGuard VPN and peer management
- Email, Calendar, and File services
- Routing and VPN gateway
- Vault and trust management (Phase 6)
"""
import os
import io
import json
import stat
import zipfile
import shutil
import logging
import secrets
from datetime import datetime
from flask import Flask, request, jsonify, current_app, send_file, session
from flask_cors import CORS
import threading
import time
from collections import deque
import json as pyjson
from logging.handlers import RotatingFileHandler
import uuid
import contextvars
# Track API start time for uptime calculation
API_START_TIME = time.time()
# Manager singletons — all instantiated in managers.py; imported here so routes can
# reference them by module-level name and test patches (`patch('app.X', mock)`) work.
from managers import (
config_manager, service_bus, log_manager,
network_manager, wireguard_manager, peer_registry,
email_manager, calendar_manager, file_manager,
routing_manager, vault_manager, container_manager,
cell_link_manager, auth_manager,
firewall_manager, EventType,
)
# Re-exports: tests do `from app import CellManager` and `from app import _resolve_peer_dns`
from cell_manager import CellManager
from wireguard_manager import _resolve_peer_dns
from port_registry import PORT_FIELDS, detect_conflicts
import auth_routes
# Context variable for request info
request_context = contextvars.ContextVar('request_context', default={})
# Set default log level and log file if not already defined
LOG_LEVEL = globals().get('LOG_LEVEL', 'INFO')
LOG_FILE = globals().get('LOG_FILE', 'picell.log')
class ContextFilter(logging.Filter):
def filter(self, record):
ctx = request_context.get({})
for k, v in ctx.items():
setattr(record, k, v)
return True
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
'timestamp': self.formatTime(record, self.datefmt),
'level': record.levelname,
'name': record.name,
'message': record.getMessage(),
'request_id': getattr(record, 'request_id', None),
'client_ip': getattr(record, 'client_ip', None),
'method': getattr(record, 'method', None),
'path': getattr(record, 'path', None),
'status': getattr(record, 'status', None),
'user': getattr(record, 'user', None),
}
if record.exc_info:
log_record['exception'] = self.formatException(record.exc_info)
return pyjson.dumps({k: v for k, v in log_record.items() if v is not None})
json_formatter = JsonFormatter()
context_filter = ContextFilter()
handlers = [logging.StreamHandler()]
try:
file_handler = RotatingFileHandler(LOG_FILE, maxBytes=5_000_000, backupCount=5, encoding='utf-8')
file_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
file_handler.setFormatter(json_formatter)
file_handler.addFilter(context_filter)
handlers.append(file_handler)
except Exception as e:
print(f"Warning: Could not create rotating log file handler: {e}")
for h in handlers:
h.setFormatter(json_formatter)
h.addFilter(context_filter)
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
handlers=handlers
)
logger = logging.getLogger('picell')
# Flask app setup
app = Flask(__name__)
CORS(app,
supports_credentials=True,
origins=['http://localhost', 'http://localhost:5173', 'http://localhost:8081',
'http://127.0.0.1', 'http://127.0.0.1:5173', 'http://127.0.0.1:8081'])
# Development mode flag
app.config['DEVELOPMENT_MODE'] = True # Set to True for development, False for production
# Persist SECRET_KEY so sessions survive API restarts
SECRET_KEY_FILE = os.path.join(os.environ.get('DATA_DIR', '/app/data'), '.flask_secret_key')
if os.environ.get('SECRET_KEY'):
_flask_secret = os.environ['SECRET_KEY'].encode() if isinstance(os.environ['SECRET_KEY'], str) else os.environ['SECRET_KEY']
elif os.path.exists(SECRET_KEY_FILE) and os.path.getsize(SECRET_KEY_FILE) > 0:
with open(SECRET_KEY_FILE, 'rb') as _skf:
_flask_secret = _skf.read()
else:
_flask_secret = os.urandom(32)
try:
os.makedirs(os.path.dirname(SECRET_KEY_FILE), exist_ok=True)
_skf_fd = os.open(SECRET_KEY_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(_skf_fd, 'wb') as _skf:
_skf.write(_flask_secret)
except OSError as _e:
logger.warning(f"Could not persist SECRET_KEY to disk: {_e}")
app.config['SECRET_KEY'] = _flask_secret
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# Each PIC instance has a unique secret key — derive a short suffix from it so
# multiple instances accessed via the same hostname (e.g. localhost:portA vs
# localhost:portB) don't share session cookies and log each other out.
import hashlib as _hl
_cookie_suffix = _hl.sha256(_flask_secret).hexdigest()[:8]
app.config['SESSION_COOKIE_NAME'] = f'pic_sess_{_cookie_suffix}'
# config_manager, service_bus, log_manager and all other managers are imported
# from managers.py above — no re-instantiation needed here.
@app.before_request
def enrich_log_context():
req_id = str(uuid.uuid4())
client_ip = request.remote_addr
method = request.method
path = request.path
user = getattr(getattr(request, 'user', None), 'id', None) or 'anonymous'
request_context.set({
'request_id': req_id,
'client_ip': client_ip,
'method': method,
'path': path,
'user': user
})
@app.before_request
def enforce_auth():
"""Enforce session-based authentication and role-based access control.
Rules:
- /api/auth/* is always public (login, logout, me, change-password)
- Non-/api/ paths (e.g. /health) are always public
- /api/peer/* is accessible to peer role only (admin gets 403)
- All other /api/* routes require admin role
Enforcement is active when auth_manager is a real AuthManager instance
with at least one registered user. Tests that do not seed the auth
store will see an empty user list and bypass enforcement, preserving
backward-compatibility with pre-auth test suites.
"""
path = request.path
# Always allow non-API paths and auth namespace
if not path.startswith('/api/') or path.startswith('/api/auth/'):
return None
# Cell peer-sync endpoints authenticate via source IP + WG pubkey — not session
if path.startswith('/api/cells/peer-sync/'):
return None
# Only enforce when auth_manager has been properly initialised and seeded.
# When the user store is empty (file missing or unreadable — typical in
# unit tests and fresh installs), bypass enforcement so pre-auth test
# suites continue to work. 503 is only returned when the users file
# exists and is readable but contains no accounts (explicit misconfiguration).
try:
from auth_manager import AuthManager as _AuthManager
if not isinstance(auth_manager, _AuthManager):
return None
users = auth_manager.list_users()
if not users:
# Only fail closed when the auth file is readable but empty —
# that's an explicit misconfiguration. If the file is missing or
# unreadable (test env, wrong host path, permission denied), bypass
# so pre-auth test suites continue to work.
users_file = getattr(auth_manager, '_users_file', None)
if users_file:
try:
with open(users_file, 'r') as _f:
_f.read(1)
return jsonify({'error': 'Authentication not configured. Set admin password first.'}), 503
except (PermissionError, FileNotFoundError, OSError):
return None
return None
except Exception:
return None
username = session.get('username')
if not username:
return jsonify({'error': 'Not authenticated'}), 401
role = session.get('role')
if path.startswith('/api/peer/'):
if role != 'peer':
return jsonify({'error': 'Forbidden'}), 403
else:
if role != 'admin':
return jsonify({'error': 'Forbidden'}), 403
return None
@app.before_request
def check_csrf():
"""Double-submit CSRF protection for state-changing API requests.
Applies to POST/PUT/DELETE/PATCH on /api/* paths, excluding /api/auth/*.
Skipped entirely when app.config['TESTING'] is True so unit tests remain
unaffected without needing to set CSRF headers.
"""
if app.config.get('TESTING'):
return None
if request.method not in ('POST', 'PUT', 'DELETE', 'PATCH'):
return None
path = request.path
if not path.startswith('/api/') or path.startswith('/api/auth/'):
return None
# peer-sync uses IP+pubkey auth — no session, no CSRF token possible
if path.startswith('/api/cells/peer-sync/'):
return None
token_session = session.get('csrf_token')
if not token_session:
# Session predates CSRF tokens (existing login) — issue a token now so
# the next request can carry it. Don't block this request; the client
# couldn't have known the token yet.
session['csrf_token'] = secrets.token_hex(32)
return None
token_header = request.headers.get('X-CSRF-Token')
if not token_header or token_header != token_session:
return jsonify({'error': 'CSRF token missing or invalid'}), 403
return None
@app.after_request
def log_request(response):
ctx = request_context.get({})
ctx['status'] = response.status_code
logger.info(f"{ctx.get('method')} {ctx.get('path')} {ctx.get('status')}")
return response
@app.teardown_request
def clear_log_context(exc):
request_context.set({})
# Wire vault_manager into Flask app context (vault routes use current_app.vault_manager)
app.vault_manager = vault_manager
auth_routes.auth_manager = auth_manager
# Apply firewall + DNS rules from stored peer settings (survives API restarts)
def _configured_domain() -> str:
return config_manager.configs.get('_identity', {}).get('domain', 'cell')
def _restore_cell_wg_peers(cell_links):
"""Re-add any cell link [Peer] blocks that are missing from wg0.conf.
WireGuard peer blocks can be lost if the container is rebuilt or wg0.conf
is regenerated from scratch. Cell link data in cell_links.json is the
authoritative source — this function reconciles the conf file against it.
"""
try:
conf_content = wireguard_manager._read_config()
restored = 0
for link in cell_links:
pk = link.get('public_key', '')
if not pk or pk in conf_content:
continue
name = link.get('cell_name', '')
endpoint = link.get('endpoint', '')
vpn_subnet = link.get('vpn_subnet', '')
ok = wireguard_manager.add_cell_peer(
name=name, public_key=pk,
endpoint=endpoint, vpn_subnet=vpn_subnet,
)
if ok:
logger.info(f"Restored missing WG peer block for cell '{name}'")
restored += 1
else:
logger.warning(f"Failed to restore WG peer block for cell '{name}'")
if restored:
# Reload conf into the running WireGuard interface
wireguard_manager._syncconf()
except Exception as e:
logger.warning(f"_restore_cell_wg_peers failed (non-fatal): {e}")
def _apply_startup_enforcement():
try:
peers = peer_registry.list_peers()
cell_links = cell_link_manager.list_connections()
firewall_manager.reconcile_stale_peer_rules(peers)
import ipaddress as _ipa
try:
_wg_addr = wireguard_manager._get_configured_address()
_wg_subnet = str(_ipa.ip_network(_wg_addr, strict=False)) if _wg_addr else '10.0.0.0/24'
except Exception:
_wg_subnet = '10.0.0.0/24'
_cell_subnets = [l['vpn_subnet'] for l in cell_links if l.get('vpn_subnet')]
firewall_manager.apply_all_peer_rules(peers, wg_subnet=_wg_subnet, cell_subnets=_cell_subnets)
firewall_manager.apply_all_cell_rules(cell_links)
firewall_manager.ensure_forward_stateful()
firewall_manager.ensure_cell_api_dnat()
# Embed DNAT rules in PostUp so they survive WireGuard interface restarts,
# then also apply them immediately for the current session.
wireguard_manager.ensure_postup_dnat()
firewall_manager.ensure_dns_dnat()
firewall_manager.ensure_service_dnat()
# Restore any cell link WireGuard peers that were lost from wg0.conf
# (happens if the container was rebuilt, wg0.conf was reset, etc.)
_restore_cell_wg_peers(cell_links)
wireguard_manager.sync_cell_routes()
firewall_manager.apply_all_dns_rules(peers, COREFILE_PATH, _configured_domain(),
cell_links=cell_links)
logger.info(f"Applied enforcement rules for {len(peers)} peers, {len(cell_links)} cells on startup")
# Phase 3: reapply policy routing rules for peers whose internet traffic is
# routed through an exit cell (ip rule entries don't survive container restart)
cell_links_map = {l['cell_name']: l for l in cell_links}
for peer in peers:
via_cell = peer.get('route_via')
if not via_cell:
continue
link = cell_links_map.get(via_cell)
if not link:
continue
peer_ip = peer.get('ip', '').split('/')[0]
if peer_ip:
wireguard_manager.apply_peer_route_via(peer_ip, via_wg_ip=link['dns_ip'])
sync_summary = cell_link_manager.replay_pending_pushes()
if sync_summary.get('attempted'):
logger.info(f"Startup permission sync: {sync_summary}")
except Exception as e:
logger.warning(f"Startup enforcement failed (non-fatal): {e}")
def _bootstrap_dns():
try:
identity = config_manager.configs.get('_identity', {})
cell_name = identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell'))
domain = identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell'))
ip_range = identity.get('ip_range', os.environ.get('CELL_IP_RANGE', '172.20.0.0/16'))
# Bootstrap on first start; then always regenerate to ensure A records use WG server IP.
network_manager.apply_ip_range(ip_range, cell_name, domain)
except Exception as e:
logger.warning(f"DNS bootstrap failed (non-fatal): {e}")
COREFILE_PATH = '/app/config/dns/Corefile'
def _recover_pending_apply():
"""If the previous all-services apply was interrupted (helper died mid-run),
clear the 'applying' flag so the UI shows the changes as still pending (not stuck
in an 'applying' spinner) and the user can retry."""
try:
pending = config_manager.configs.get('_pending_restart', {})
if pending.get('applying') and pending.get('needs_restart'):
config_manager.configs['_pending_restart']['applying'] = False
config_manager._save_all_configs()
logger.warning("Previous config apply did not complete — pending changes restored for retry")
except Exception as e:
logger.warning(f"Pending apply recovery check failed: {e}")
_recover_pending_apply()
def _sync_wg_keys():
try:
wireguard_manager._sync_keys_from_conf()
except Exception as e:
logger.warning(f"WireGuard key sync failed (non-fatal): {e}")
# Run in background so startup isn't blocked waiting on docker exec
threading.Thread(target=_sync_wg_keys, daemon=True).start()
threading.Thread(target=_apply_startup_enforcement, daemon=True).start()
threading.Thread(target=_bootstrap_dns, daemon=True).start()
# Register services with service bus
service_bus.register_service('network', network_manager)
service_bus.register_service('wireguard', wireguard_manager)
service_bus.register_service('email', email_manager)
service_bus.register_service('calendar', calendar_manager)
service_bus.register_service('files', file_manager)
service_bus.register_service('routing', routing_manager)
service_bus.register_service('vault', app.vault_manager)
service_bus.register_service('container', container_manager)
# Register auth blueprint
app.register_blueprint(auth_routes.auth_bp)
# Register service blueprints (routes extracted from this file)
from routes.email import bp as _email_bp
from routes.calendar import bp as _calendar_bp
from routes.files import bp as _files_bp
from routes.network import bp as _network_bp
from routes.wireguard import bp as _wireguard_bp
from routes.cells import bp as _cells_bp
from routes.peers import bp as _peers_bp
from routes.routing import bp as _routing_bp
from routes.vault import bp as _vault_bp
from routes.containers import bp as _containers_bp
from routes.services import bp as _services_bp
from routes.peer_dashboard import bp as _peer_dashboard_bp
from routes.config import bp as _config_bp
app.register_blueprint(_email_bp)
app.register_blueprint(_calendar_bp)
app.register_blueprint(_files_bp)
app.register_blueprint(_network_bp)
app.register_blueprint(_wireguard_bp)
app.register_blueprint(_cells_bp)
app.register_blueprint(_peers_bp)
app.register_blueprint(_routing_bp)
app.register_blueprint(_vault_bp)
app.register_blueprint(_containers_bp)
app.register_blueprint(_services_bp)
app.register_blueprint(_peer_dashboard_bp)
app.register_blueprint(_config_bp)
# Re-export config helpers so existing test imports/patches keep working
from routes.config import (
_set_pending_restart, _clear_pending_restart,
_collect_service_ports, _dedup_changes,
)
# Unified health monitoring
HEALTH_HISTORY_SIZE = 100
health_history = deque(maxlen=HEALTH_HISTORY_SIZE)
health_monitor_running = True
# Health alerting configuration
HEALTH_ALERT_THRESHOLD = 3 # Number of consecutive failures before alert
service_alert_counters = {}
def perform_health_check():
"""Perform a unified health check of all services, with alerting."""
try:
# Use service bus to get health from all services
result = {
'timestamp': datetime.utcnow().isoformat(),
'alerts': []
}
# Get health from each service
for service_name in service_bus.list_services():
try:
service = service_bus.get_service(service_name)
if hasattr(service, 'health_check'):
health = service.health_check()
else:
health = service.get_status()
result[service_name] = health
except Exception as e:
result[service_name] = {'error': str(e), 'status': 'offline'}
# Health alerting logic — alert only when a service container is not running
global service_alert_counters
for service_name in service_bus.list_services():
if service_name in result:
status = result[service_name]
healthy = True
if isinstance(status, dict):
# Prefer status.running (container actually up) over healthy (connectivity tests)
inner = status.get('status', {})
if isinstance(inner, dict):
if 'running' in inner:
healthy = inner['running']
elif 'status' in inner:
healthy = str(inner['status']).lower() in ('ok', 'healthy', 'online', 'active')
elif 'running' in status:
healthy = status['running']
elif 'error' in status:
healthy = False
else:
healthy = bool(status)
# Only count as unhealthy if we're certain it's down
if not healthy:
service_alert_counters[service_name] = service_alert_counters.get(service_name, 0) + 1
if service_alert_counters[service_name] >= HEALTH_ALERT_THRESHOLD:
alert_msg = f"ALERT: {service_name} unhealthy for {service_alert_counters[service_name]} consecutive checks."
logger.warning(alert_msg)
result['alerts'].append(alert_msg)
# Publish alert event
service_bus.publish_event(EventType.ERROR_OCCURRED, service_name, {
'error': alert_msg,
'service': service_name,
'consecutive_failures': service_alert_counters[service_name]
})
else:
# Reset counter if service is healthy
if service_alert_counters.get(service_name, 0) > 0:
logger.info(f"Service {service_name} recovered, resetting alert counter")
service_alert_counters[service_name] = 0
logger.info(f"Unified health check: {result}")
return result
except Exception as e:
logger.error(f"Unified health check failed: {e}")
return {'error': str(e), 'timestamp': datetime.utcnow().isoformat()}
def health_monitor_loop():
while health_monitor_running:
with app.app_context():
health_result = perform_health_check()
health_history.appendleft(health_result)
# Publish health check event
service_bus.publish_event(EventType.HEALTH_CHECK, 'api', health_result)
time.sleep(60) # Check every 60 seconds
# Start health monitor thread
health_monitor_thread = threading.Thread(target=health_monitor_loop, daemon=True)
health_monitor_thread.start()
def _local_subnets():
"""Return all subnets the container is directly connected to (from routing table)."""
import ipaddress as _ipa, socket as _sock, struct as _struct
nets = []
try:
with open('/proc/net/route') as _f:
for _line in _f.readlines()[1:]:
_parts = _line.strip().split()
if len(_parts) < 8 or _parts[0] == 'lo':
continue
_dest = _sock.inet_ntoa(_struct.pack('<I', int(_parts[1], 16)))
_mask = _sock.inet_ntoa(_struct.pack('<I', int(_parts[7], 16)))
if _dest == '0.0.0.0':
continue
nets.append(_ipa.ip_network(f'{_dest}/{_mask}', strict=False))
except Exception:
pass
return nets
def is_local_request():
# Trust the direct TCP peer (request.remote_addr) first — it is always
# the container or process making the connection and cannot be spoofed.
# In production Flask is behind Caddy inside Docker, so remote_addr is
# always Caddy's Docker IP (RFC-1918) and this check is sufficient.
#
# Additionally, when a trusted reverse-proxy (Caddy) is in the path, it
# appends the real client IP as the LAST entry of X-Forwarded-For.
# Trusting only the LAST XFF entry (not the first, which a client could
# set to anything) is safe: a spoofed first entry such as
# "XFF: 127.0.0.1, <real-ip>" still passes because the last entry is the
# real IP appended by Caddy. An attacker directly hitting Flask on :3000
# could craft any XFF they like, but in the Docker topology port 3000 is
# not exposed to the internet.
remote_addr = request.remote_addr
def _allowed(addr):
if not addr:
return False
if addr in ('127.0.0.1', '::1', 'localhost'):
return True
try:
import ipaddress as _ipa
ip = _ipa.ip_address(addr.strip())
if ip.is_loopback:
return True
# Only trust loopback and Docker bridge (172.16.0.0/12).
# Deliberately excludes 10.0.0.0/8 (WireGuard peer subnet) and
# 192.168.0.0/16 (LAN) — VPN peers must not access local-only endpoints.
if ip in _ipa.ip_network('172.16.0.0/12'):
return True
# Any subnet the container is directly attached to (handles non-RFC-1918
# Docker bridge networks such as 172.0.0.0/24).
for _net in _local_subnets():
if ip in _net:
return True
except Exception:
pass
return False
if _allowed(remote_addr):
return True
# Check the last X-Forwarded-For entry (appended by the trusted proxy).
# Never trust any entry other than the last one.
try:
xff = request.headers.get('X-Forwarded-For', '')
if xff:
last_ip = xff.split(',')[-1].strip()
if last_ip and _allowed(last_ip):
return True
except Exception:
pass
return False
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint."""
try:
return jsonify({
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0"
})
except Exception as e:
logger.error(f"Health check failed: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/status', methods=['GET'])
def get_cell_status():
"""Get overall cell status."""
try:
# Use service bus to get status from all services
services_status = {}
for service_name in service_bus.list_services():
try:
service = service_bus.get_service(service_name)
services_status[service_name] = service.get_status()
except Exception as e:
services_status[service_name] = {'error': str(e)}
peers = peer_registry.list_peers()
# Calculate actual uptime
current_time = time.time()
uptime_seconds = int(current_time - API_START_TIME)
identity = config_manager.configs.get('_identity', {})
return jsonify({
"cell_name": identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell')),
"domain": identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell')),
"uptime": uptime_seconds,
"peers_count": len(peers),
"services": services_status,
"timestamp": datetime.utcnow().isoformat()
})
except Exception as e:
logger.error(f"Error getting cell status: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/health/history', methods=['GET'])
def get_health_history():
"""Get recent unified health check results."""
return jsonify(list(health_history))
@app.route('/api/health/history/clear', methods=['POST'])
def clear_health_history():
"""Clear health history and reset alert counters."""
global service_alert_counters
health_history.clear()
service_alert_counters = {}
return jsonify({'message': 'Health history cleared'})
if __name__ == '__main__':
debug = os.environ.get('FLASK_DEBUG', '0') == '1'
app.run(host='0.0.0.0', port=3000, debug=debug)