#!/usr/bin/env python3 """ Personal Internet Cell API Server Provides REST API endpoints for managing: - Cell status and configuration - Network services (DNS, DHCP, NTP) - WireGuard VPN and peer management - Email, Calendar, and File services - Routing and VPN gateway - Vault and trust management (Phase 6) """ import os import io import json import stat import zipfile import shutil import logging import secrets from datetime import datetime from flask import Flask, request, jsonify, current_app, send_file, session from flask_cors import CORS import threading import time from collections import deque import json as pyjson from logging.handlers import RotatingFileHandler import uuid import contextvars # Track API start time for uptime calculation API_START_TIME = time.time() # Manager singletons — all instantiated in managers.py; imported here so routes can # reference them by module-level name and test patches (`patch('app.X', mock)`) work. from managers import ( config_manager, service_bus, log_manager, network_manager, wireguard_manager, peer_registry, email_manager, calendar_manager, file_manager, routing_manager, vault_manager, container_manager, cell_link_manager, auth_manager, firewall_manager, EventType, ) # Re-exports: tests do `from app import CellManager` and `from app import _resolve_peer_dns` from cell_manager import CellManager from wireguard_manager import _resolve_peer_dns from port_registry import PORT_FIELDS, detect_conflicts import auth_routes # Context variable for request info request_context = contextvars.ContextVar('request_context', default={}) # Set default log level and log file if not already defined LOG_LEVEL = globals().get('LOG_LEVEL', 'INFO') LOG_FILE = globals().get('LOG_FILE', 'picell.log') class ContextFilter(logging.Filter): def filter(self, record): ctx = request_context.get({}) for k, v in ctx.items(): setattr(record, k, v) return True class JsonFormatter(logging.Formatter): def format(self, record): log_record = { 'timestamp': self.formatTime(record, self.datefmt), 'level': record.levelname, 'name': record.name, 'message': record.getMessage(), 'request_id': getattr(record, 'request_id', None), 'client_ip': getattr(record, 'client_ip', None), 'method': getattr(record, 'method', None), 'path': getattr(record, 'path', None), 'status': getattr(record, 'status', None), 'user': getattr(record, 'user', None), } if record.exc_info: log_record['exception'] = self.formatException(record.exc_info) return pyjson.dumps({k: v for k, v in log_record.items() if v is not None}) json_formatter = JsonFormatter() context_filter = ContextFilter() handlers = [logging.StreamHandler()] try: file_handler = RotatingFileHandler(LOG_FILE, maxBytes=5_000_000, backupCount=5, encoding='utf-8') file_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) file_handler.setFormatter(json_formatter) file_handler.addFilter(context_filter) handlers.append(file_handler) except Exception as e: print(f"Warning: Could not create rotating log file handler: {e}") for h in handlers: h.setFormatter(json_formatter) h.addFilter(context_filter) logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.INFO), handlers=handlers ) logger = logging.getLogger('picell') # Flask app setup app = Flask(__name__) CORS(app, supports_credentials=True, origins=['http://localhost', 'http://localhost:5173', 'http://localhost:8081', 'http://127.0.0.1', 'http://127.0.0.1:5173', 'http://127.0.0.1:8081']) # Development mode flag app.config['DEVELOPMENT_MODE'] = True # Set to True for development, False for production # Persist SECRET_KEY so sessions survive API restarts SECRET_KEY_FILE = os.path.join(os.environ.get('DATA_DIR', '/app/data'), '.flask_secret_key') if os.environ.get('SECRET_KEY'): _flask_secret = os.environ['SECRET_KEY'].encode() if isinstance(os.environ['SECRET_KEY'], str) else os.environ['SECRET_KEY'] elif os.path.exists(SECRET_KEY_FILE) and os.path.getsize(SECRET_KEY_FILE) > 0: with open(SECRET_KEY_FILE, 'rb') as _skf: _flask_secret = _skf.read() else: _flask_secret = os.urandom(32) try: os.makedirs(os.path.dirname(SECRET_KEY_FILE), exist_ok=True) _skf_fd = os.open(SECRET_KEY_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) with os.fdopen(_skf_fd, 'wb') as _skf: _skf.write(_flask_secret) except OSError as _e: logger.warning(f"Could not persist SECRET_KEY to disk: {_e}") app.config['SECRET_KEY'] = _flask_secret app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # config_manager, service_bus, log_manager and all other managers are imported # from managers.py above — no re-instantiation needed here. @app.before_request def enrich_log_context(): req_id = str(uuid.uuid4()) client_ip = request.remote_addr method = request.method path = request.path user = getattr(getattr(request, 'user', None), 'id', None) or 'anonymous' request_context.set({ 'request_id': req_id, 'client_ip': client_ip, 'method': method, 'path': path, 'user': user }) @app.before_request def enforce_auth(): """Enforce session-based authentication and role-based access control. Rules: - /api/auth/* is always public (login, logout, me, change-password) - Non-/api/ paths (e.g. /health) are always public - /api/peer/* is accessible to peer role only (admin gets 403) - All other /api/* routes require admin role Enforcement is active when auth_manager is a real AuthManager instance with at least one registered user. Tests that do not seed the auth store will see an empty user list and bypass enforcement, preserving backward-compatibility with pre-auth test suites. """ path = request.path # Always allow non-API paths and auth namespace if not path.startswith('/api/') or path.startswith('/api/auth/'): return None # Only enforce when auth_manager has been properly initialised and seeded. # When the user store is empty (file missing or unreadable — typical in # unit tests and fresh installs), bypass enforcement so pre-auth test # suites continue to work. 503 is only returned when the users file # exists and is readable but contains no accounts (explicit misconfiguration). try: from auth_manager import AuthManager as _AuthManager if not isinstance(auth_manager, _AuthManager): return None users = auth_manager.list_users() if not users: # Only fail closed when the auth file is readable but empty — # that's an explicit misconfiguration. If the file is missing or # unreadable (test env, wrong host path, permission denied), bypass # so pre-auth test suites continue to work. users_file = getattr(auth_manager, '_users_file', None) if users_file: try: with open(users_file, 'r') as _f: _f.read(1) return jsonify({'error': 'Authentication not configured. Set admin password first.'}), 503 except (PermissionError, FileNotFoundError, OSError): return None return None except Exception: return None username = session.get('username') if not username: return jsonify({'error': 'Not authenticated'}), 401 role = session.get('role') if path.startswith('/api/peer/'): if role != 'peer': return jsonify({'error': 'Forbidden'}), 403 else: if role != 'admin': return jsonify({'error': 'Forbidden'}), 403 return None @app.before_request def check_csrf(): """Double-submit CSRF protection for state-changing API requests. Applies to POST/PUT/DELETE/PATCH on /api/* paths, excluding /api/auth/*. Skipped entirely when app.config['TESTING'] is True so unit tests remain unaffected without needing to set CSRF headers. """ if app.config.get('TESTING'): return None if request.method not in ('POST', 'PUT', 'DELETE', 'PATCH'): return None path = request.path if not path.startswith('/api/') or path.startswith('/api/auth/'): return None token_session = session.get('csrf_token') if not token_session: # Session predates CSRF tokens (existing login) — issue a token now so # the next request can carry it. Don't block this request; the client # couldn't have known the token yet. session['csrf_token'] = secrets.token_hex(32) return None token_header = request.headers.get('X-CSRF-Token') if not token_header or token_header != token_session: return jsonify({'error': 'CSRF token missing or invalid'}), 403 return None @app.after_request def log_request(response): ctx = request_context.get({}) ctx['status'] = response.status_code logger.info(f"{ctx.get('method')} {ctx.get('path')} {ctx.get('status')}") return response @app.teardown_request def clear_log_context(exc): request_context.set({}) # Wire vault_manager into Flask app context (vault routes use current_app.vault_manager) app.vault_manager = vault_manager auth_routes.auth_manager = auth_manager # Apply firewall + DNS rules from stored peer settings (survives API restarts) def _configured_domain() -> str: return config_manager.configs.get('_identity', {}).get('domain', 'cell') def _apply_startup_enforcement(): try: peers = peer_registry.list_peers() firewall_manager.apply_all_peer_rules(peers) firewall_manager.apply_all_dns_rules(peers, COREFILE_PATH, _configured_domain(), cell_links=cell_link_manager.list_connections()) logger.info(f"Applied enforcement rules for {len(peers)} peers on startup") except Exception as e: logger.warning(f"Startup enforcement failed (non-fatal): {e}") def _bootstrap_dns(): try: identity = config_manager.configs.get('_identity', {}) cell_name = identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell')) domain = identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell')) ip_range = identity.get('ip_range', os.environ.get('CELL_IP_RANGE', '172.20.0.0/16')) network_manager.bootstrap_dns_records(cell_name, domain, ip_range) except Exception as e: logger.warning(f"DNS bootstrap failed (non-fatal): {e}") COREFILE_PATH = '/app/config/dns/Corefile' def _recover_pending_apply(): """If the previous all-services apply was interrupted (helper died mid-run), clear the 'applying' flag so the UI shows the changes as still pending (not stuck in an 'applying' spinner) and the user can retry.""" try: pending = config_manager.configs.get('_pending_restart', {}) if pending.get('applying') and pending.get('needs_restart'): config_manager.configs['_pending_restart']['applying'] = False config_manager._save_all_configs() logger.warning("Previous config apply did not complete — pending changes restored for retry") except Exception as e: logger.warning(f"Pending apply recovery check failed: {e}") _recover_pending_apply() # Run in background so startup isn't blocked waiting on docker exec threading.Thread(target=_apply_startup_enforcement, daemon=True).start() threading.Thread(target=_bootstrap_dns, daemon=True).start() # Register services with service bus service_bus.register_service('network', network_manager) service_bus.register_service('wireguard', wireguard_manager) service_bus.register_service('email', email_manager) service_bus.register_service('calendar', calendar_manager) service_bus.register_service('files', file_manager) service_bus.register_service('routing', routing_manager) service_bus.register_service('vault', app.vault_manager) service_bus.register_service('container', container_manager) # Register auth blueprint app.register_blueprint(auth_routes.auth_bp) # Register service blueprints (routes extracted from this file) from routes.email import bp as _email_bp from routes.calendar import bp as _calendar_bp from routes.files import bp as _files_bp from routes.network import bp as _network_bp from routes.wireguard import bp as _wireguard_bp from routes.cells import bp as _cells_bp from routes.peers import bp as _peers_bp from routes.routing import bp as _routing_bp from routes.vault import bp as _vault_bp from routes.containers import bp as _containers_bp from routes.services import bp as _services_bp from routes.peer_dashboard import bp as _peer_dashboard_bp from routes.config import bp as _config_bp app.register_blueprint(_email_bp) app.register_blueprint(_calendar_bp) app.register_blueprint(_files_bp) app.register_blueprint(_network_bp) app.register_blueprint(_wireguard_bp) app.register_blueprint(_cells_bp) app.register_blueprint(_peers_bp) app.register_blueprint(_routing_bp) app.register_blueprint(_vault_bp) app.register_blueprint(_containers_bp) app.register_blueprint(_services_bp) app.register_blueprint(_peer_dashboard_bp) app.register_blueprint(_config_bp) # Re-export config helpers so existing test imports/patches keep working from routes.config import ( _set_pending_restart, _clear_pending_restart, _collect_service_ports, _dedup_changes, ) # Unified health monitoring HEALTH_HISTORY_SIZE = 100 health_history = deque(maxlen=HEALTH_HISTORY_SIZE) health_monitor_running = True # Health alerting configuration HEALTH_ALERT_THRESHOLD = 3 # Number of consecutive failures before alert service_alert_counters = {} def perform_health_check(): """Perform a unified health check of all services, with alerting.""" try: # Use service bus to get health from all services result = { 'timestamp': datetime.utcnow().isoformat(), 'alerts': [] } # Get health from each service for service_name in service_bus.list_services(): try: service = service_bus.get_service(service_name) if hasattr(service, 'health_check'): health = service.health_check() else: health = service.get_status() result[service_name] = health except Exception as e: result[service_name] = {'error': str(e), 'status': 'offline'} # Health alerting logic — alert only when a service container is not running global service_alert_counters for service_name in service_bus.list_services(): if service_name in result: status = result[service_name] healthy = True if isinstance(status, dict): # Prefer status.running (container actually up) over healthy (connectivity tests) inner = status.get('status', {}) if isinstance(inner, dict): if 'running' in inner: healthy = inner['running'] elif 'status' in inner: healthy = str(inner['status']).lower() in ('ok', 'healthy', 'online', 'active') elif 'running' in status: healthy = status['running'] elif 'error' in status: healthy = False else: healthy = bool(status) # Only count as unhealthy if we're certain it's down if not healthy: service_alert_counters[service_name] = service_alert_counters.get(service_name, 0) + 1 if service_alert_counters[service_name] >= HEALTH_ALERT_THRESHOLD: alert_msg = f"ALERT: {service_name} unhealthy for {service_alert_counters[service_name]} consecutive checks." logger.warning(alert_msg) result['alerts'].append(alert_msg) # Publish alert event service_bus.publish_event(EventType.ERROR_OCCURRED, service_name, { 'error': alert_msg, 'service': service_name, 'consecutive_failures': service_alert_counters[service_name] }) else: # Reset counter if service is healthy if service_alert_counters.get(service_name, 0) > 0: logger.info(f"Service {service_name} recovered, resetting alert counter") service_alert_counters[service_name] = 0 logger.info(f"Unified health check: {result}") return result except Exception as e: logger.error(f"Unified health check failed: {e}") return {'error': str(e), 'timestamp': datetime.utcnow().isoformat()} def health_monitor_loop(): while health_monitor_running: with app.app_context(): health_result = perform_health_check() health_history.appendleft(health_result) # Publish health check event service_bus.publish_event(EventType.HEALTH_CHECK, 'api', health_result) time.sleep(60) # Check every 60 seconds # Start health monitor thread health_monitor_thread = threading.Thread(target=health_monitor_loop, daemon=True) health_monitor_thread.start() def _local_subnets(): """Return all subnets the container is directly connected to (from routing table).""" import ipaddress as _ipa, socket as _sock, struct as _struct nets = [] try: with open('/proc/net/route') as _f: for _line in _f.readlines()[1:]: _parts = _line.strip().split() if len(_parts) < 8 or _parts[0] == 'lo': continue _dest = _sock.inet_ntoa(_struct.pack('" still passes because the last entry is the # real IP appended by Caddy. An attacker directly hitting Flask on :3000 # could craft any XFF they like, but in the Docker topology port 3000 is # not exposed to the internet. remote_addr = request.remote_addr def _allowed(addr): if not addr: return False if addr in ('127.0.0.1', '::1', 'localhost'): return True try: import ipaddress as _ipa ip = _ipa.ip_address(addr.strip()) if ip.is_loopback: return True # Only trust loopback and Docker bridge (172.16.0.0/12). # Deliberately excludes 10.0.0.0/8 (WireGuard peer subnet) and # 192.168.0.0/16 (LAN) — VPN peers must not access local-only endpoints. if ip in _ipa.ip_network('172.16.0.0/12'): return True # Any subnet the container is directly attached to (handles non-RFC-1918 # Docker bridge networks such as 172.0.0.0/24). for _net in _local_subnets(): if ip in _net: return True except Exception: pass return False if _allowed(remote_addr): return True # Check the last X-Forwarded-For entry (appended by the trusted proxy). # Never trust any entry other than the last one. try: xff = request.headers.get('X-Forwarded-For', '') if xff: last_ip = xff.split(',')[-1].strip() if last_ip and _allowed(last_ip): return True except Exception: pass return False @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint.""" try: return jsonify({ "status": "healthy", "timestamp": datetime.utcnow().isoformat(), "version": "1.0.0" }) except Exception as e: logger.error(f"Health check failed: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/status', methods=['GET']) def get_cell_status(): """Get overall cell status.""" try: # Use service bus to get status from all services services_status = {} for service_name in service_bus.list_services(): try: service = service_bus.get_service(service_name) services_status[service_name] = service.get_status() except Exception as e: services_status[service_name] = {'error': str(e)} peers = peer_registry.list_peers() # Calculate actual uptime current_time = time.time() uptime_seconds = int(current_time - API_START_TIME) identity = config_manager.configs.get('_identity', {}) return jsonify({ "cell_name": identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell')), "domain": identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell')), "uptime": uptime_seconds, "peers_count": len(peers), "services": services_status, "timestamp": datetime.utcnow().isoformat() }) except Exception as e: logger.error(f"Error getting cell status: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/health/history', methods=['GET']) def get_health_history(): """Get recent unified health check results.""" return jsonify(list(health_history)) @app.route('/api/health/history/clear', methods=['POST']) def clear_health_history(): """Clear health history and reset alert counters.""" global service_alert_counters health_history.clear() service_alert_counters = {} return jsonify({'message': 'Health history cleared'}) if __name__ == '__main__': debug = os.environ.get('FLASK_DEBUG', '0') == '1' app.run(host='0.0.0.0', port=3000, debug=debug)