4ebcb1d077
Unit Tests / test (push) Successful in 7m29s
The apply_all_dns_rules() call at the end of _bootstrap_dns() was added to force reload 30s into the Corefile on startup. Now that reload 30s is removed (it broke CoreDNS zone serving), the call is unnecessary in LAN mode and actively harmful in DDNS mode: update_split_horizon_zone() already writes the correct Corefile with the split-horizon block; the subsequent apply_all_dns_rules() call would overwrite it without the split-horizon zones, causing all service subdomain lookups to return NXDOMAIN. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
916 lines
38 KiB
Python
916 lines
38 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Personal Internet Cell API Server
|
|
|
|
Provides REST API endpoints for managing:
|
|
- Cell status and configuration
|
|
- Network services (DNS, DHCP, NTP)
|
|
- WireGuard VPN and peer management
|
|
- Email, Calendar, and File services
|
|
- Routing and VPN gateway
|
|
- Vault and trust management (Phase 6)
|
|
"""
|
|
|
|
import os
|
|
import io
|
|
import json
|
|
import stat
|
|
import zipfile
|
|
import shutil
|
|
import logging
|
|
import secrets
|
|
from datetime import datetime
|
|
from flask import Flask, request, jsonify, current_app, send_file, session
|
|
from flask_cors import CORS
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
import json as pyjson
|
|
from logging.handlers import RotatingFileHandler
|
|
import uuid
|
|
import contextvars
|
|
|
|
# Track API start time for uptime calculation
|
|
API_START_TIME = time.time()
|
|
|
|
# Manager singletons — all instantiated in managers.py; imported here so routes can
|
|
# reference them by module-level name and test patches (`patch('app.X', mock)`) work.
|
|
from managers import (
|
|
config_manager, service_bus, log_manager,
|
|
network_manager, wireguard_manager, peer_registry,
|
|
email_manager, calendar_manager, file_manager,
|
|
routing_manager, vault_manager, container_manager,
|
|
cell_link_manager, auth_manager, setup_manager,
|
|
caddy_manager,
|
|
ddns_manager, service_store_manager,
|
|
connectivity_manager,
|
|
service_registry,
|
|
service_composer,
|
|
account_manager,
|
|
firewall_manager, EventType,
|
|
)
|
|
# Re-exports: tests do `from app import CellManager` and `from app import _resolve_peer_dns`
|
|
from cell_manager import CellManager
|
|
from wireguard_manager import _resolve_peer_dns
|
|
from port_registry import PORT_FIELDS, detect_conflicts
|
|
import auth_routes
|
|
from legacy_cleanup import cleanup_legacy_builtin_containers
|
|
|
|
# Context variable for request info
|
|
request_context = contextvars.ContextVar('request_context', default={})
|
|
|
|
# Set default log level and log file if not already defined
|
|
LOG_LEVEL = globals().get('LOG_LEVEL', 'INFO')
|
|
LOG_FILE = globals().get('LOG_FILE', 'picell.log')
|
|
|
|
class ContextFilter(logging.Filter):
|
|
def filter(self, record):
|
|
ctx = request_context.get({})
|
|
for k, v in ctx.items():
|
|
setattr(record, k, v)
|
|
return True
|
|
|
|
class JsonFormatter(logging.Formatter):
|
|
def format(self, record):
|
|
log_record = {
|
|
'timestamp': self.formatTime(record, self.datefmt),
|
|
'level': record.levelname,
|
|
'name': record.name,
|
|
'message': record.getMessage(),
|
|
'request_id': getattr(record, 'request_id', None),
|
|
'client_ip': getattr(record, 'client_ip', None),
|
|
'method': getattr(record, 'method', None),
|
|
'path': getattr(record, 'path', None),
|
|
'status': getattr(record, 'status', None),
|
|
'user': getattr(record, 'user', None),
|
|
}
|
|
if record.exc_info:
|
|
log_record['exception'] = self.formatException(record.exc_info)
|
|
return pyjson.dumps({k: v for k, v in log_record.items() if v is not None})
|
|
|
|
json_formatter = JsonFormatter()
|
|
context_filter = ContextFilter()
|
|
|
|
handlers = [logging.StreamHandler()]
|
|
try:
|
|
file_handler = RotatingFileHandler(LOG_FILE, maxBytes=5_000_000, backupCount=5, encoding='utf-8')
|
|
file_handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
|
|
file_handler.setFormatter(json_formatter)
|
|
file_handler.addFilter(context_filter)
|
|
handlers.append(file_handler)
|
|
except Exception as e:
|
|
print(f"Warning: Could not create rotating log file handler: {e}")
|
|
|
|
for h in handlers:
|
|
h.setFormatter(json_formatter)
|
|
h.addFilter(context_filter)
|
|
|
|
logging.basicConfig(
|
|
level=getattr(logging, LOG_LEVEL, logging.INFO),
|
|
handlers=handlers
|
|
)
|
|
logger = logging.getLogger('picell')
|
|
|
|
# Flask app setup
|
|
app = Flask(__name__)
|
|
CORS(app,
|
|
supports_credentials=True,
|
|
origins=['http://localhost', 'http://localhost:5173', 'http://localhost:8081',
|
|
'http://127.0.0.1', 'http://127.0.0.1:5173', 'http://127.0.0.1:8081'])
|
|
|
|
# Development mode flag
|
|
app.config['DEVELOPMENT_MODE'] = True # Set to True for development, False for production
|
|
|
|
# Persist SECRET_KEY so sessions survive API restarts
|
|
SECRET_KEY_FILE = os.path.join(os.environ.get('DATA_DIR', '/app/data'), '.flask_secret_key')
|
|
if os.environ.get('SECRET_KEY'):
|
|
_flask_secret = os.environ['SECRET_KEY'].encode() if isinstance(os.environ['SECRET_KEY'], str) else os.environ['SECRET_KEY']
|
|
elif os.path.exists(SECRET_KEY_FILE) and os.path.getsize(SECRET_KEY_FILE) > 0:
|
|
with open(SECRET_KEY_FILE, 'rb') as _skf:
|
|
_flask_secret = _skf.read()
|
|
else:
|
|
_flask_secret = os.urandom(32)
|
|
try:
|
|
os.makedirs(os.path.dirname(SECRET_KEY_FILE), exist_ok=True)
|
|
_skf_fd = os.open(SECRET_KEY_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
|
|
with os.fdopen(_skf_fd, 'wb') as _skf:
|
|
_skf.write(_flask_secret)
|
|
except OSError as _e:
|
|
logger.warning(f"Could not persist SECRET_KEY to disk: {_e}")
|
|
app.config['SECRET_KEY'] = _flask_secret
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
|
|
# Each PIC instance has a unique secret key — derive a short suffix from it so
|
|
# multiple instances accessed via the same hostname (e.g. localhost:portA vs
|
|
# localhost:portB) don't share session cookies and log each other out.
|
|
import hashlib as _hl
|
|
_cookie_suffix = _hl.sha256(_flask_secret).hexdigest()[:8]
|
|
app.config['SESSION_COOKIE_NAME'] = f'pic_sess_{_cookie_suffix}'
|
|
|
|
# config_manager, service_bus, log_manager and all other managers are imported
|
|
# from managers.py above — no re-instantiation needed here.
|
|
|
|
@app.before_request
|
|
def enrich_log_context():
|
|
req_id = str(uuid.uuid4())
|
|
client_ip = request.remote_addr
|
|
method = request.method
|
|
path = request.path
|
|
user = getattr(getattr(request, 'user', None), 'id', None) or 'anonymous'
|
|
request_context.set({
|
|
'request_id': req_id,
|
|
'client_ip': client_ip,
|
|
'method': method,
|
|
'path': path,
|
|
'user': user
|
|
})
|
|
|
|
@app.before_request
|
|
def enforce_setup():
|
|
"""Block API requests until the first-run wizard has been completed.
|
|
|
|
The setup routes, /health, and all non-/api/ paths are always allowed
|
|
through. Any other /api/* request while setup is incomplete receives
|
|
a 428 with a redirect hint to /setup.
|
|
|
|
Skipped entirely when app.config['TESTING'] is True so unit tests remain
|
|
unaffected without needing to mark setup as complete.
|
|
"""
|
|
if app.config.get('TESTING'):
|
|
return None
|
|
path = request.path
|
|
if (path.startswith('/api/setup') or
|
|
path == '/health' or
|
|
not path.startswith('/api/')):
|
|
return None
|
|
if not setup_manager.is_setup_complete():
|
|
return jsonify({'error': 'Setup required', 'redirect': '/setup'}), 428
|
|
|
|
|
|
@app.before_request
|
|
def enforce_auth():
|
|
"""Enforce session-based authentication and role-based access control.
|
|
|
|
Rules:
|
|
- /api/auth/* is always public (login, logout, me, change-password)
|
|
- Non-/api/ paths (e.g. /health) are always public
|
|
- /api/peer/* is accessible to peer role only (admin gets 403)
|
|
- All other /api/* routes require admin role
|
|
|
|
Enforcement is active when auth_manager is a real AuthManager instance
|
|
with at least one registered user. Tests that do not seed the auth
|
|
store will see an empty user list and bypass enforcement, preserving
|
|
backward-compatibility with pre-auth test suites.
|
|
"""
|
|
path = request.path
|
|
# Always allow non-API paths, auth namespace, and setup namespace
|
|
if not path.startswith('/api/') or path.startswith('/api/auth/') or path.startswith('/api/setup/'):
|
|
return None
|
|
# Cell peer-sync endpoints authenticate via source IP + WG pubkey — not session
|
|
if path.startswith('/api/cells/peer-sync/'):
|
|
return None
|
|
# Only enforce when auth_manager has been properly initialised and seeded.
|
|
# When the user store is empty (file missing or unreadable — typical in
|
|
# unit tests and fresh installs), bypass enforcement so pre-auth test
|
|
# suites continue to work. 503 is only returned when the users file
|
|
# exists and is readable but contains no accounts (explicit misconfiguration).
|
|
try:
|
|
from auth_manager import AuthManager as _AuthManager
|
|
if not isinstance(auth_manager, _AuthManager):
|
|
return None
|
|
users = auth_manager.list_users()
|
|
if not users:
|
|
users_file = getattr(auth_manager, '_users_file', None)
|
|
if users_file:
|
|
try:
|
|
with open(users_file, 'r') as _f:
|
|
_f.read(1)
|
|
return jsonify({'error': 'Authentication not configured. Set admin password first.'}), 503
|
|
except (PermissionError, FileNotFoundError, OSError):
|
|
return None
|
|
return None
|
|
except Exception:
|
|
return None
|
|
username = session.get('username')
|
|
if not username:
|
|
return jsonify({'error': 'Not authenticated'}), 401
|
|
role = session.get('role')
|
|
if path.startswith('/api/peer/'):
|
|
if role != 'peer':
|
|
return jsonify({'error': 'Forbidden'}), 403
|
|
else:
|
|
if role != 'admin':
|
|
return jsonify({'error': 'Forbidden'}), 403
|
|
return None
|
|
|
|
|
|
@app.before_request
|
|
def check_csrf():
|
|
"""Double-submit CSRF protection for state-changing API requests.
|
|
|
|
Applies to POST/PUT/DELETE/PATCH on /api/* paths, excluding /api/auth/*.
|
|
Skipped entirely when app.config['TESTING'] is True so unit tests remain
|
|
unaffected without needing to set CSRF headers.
|
|
"""
|
|
if app.config.get('TESTING'):
|
|
return None
|
|
if request.method not in ('POST', 'PUT', 'DELETE', 'PATCH'):
|
|
return None
|
|
path = request.path
|
|
if not path.startswith('/api/') or path.startswith('/api/auth/') or path.startswith('/api/setup/'):
|
|
return None
|
|
# peer-sync uses IP+pubkey auth — no session, no CSRF token possible
|
|
if path.startswith('/api/cells/peer-sync/'):
|
|
return None
|
|
token_session = session.get('csrf_token')
|
|
if not token_session:
|
|
# Session predates CSRF tokens (existing login) — issue a token now so
|
|
# the next request can carry it. Don't block this request; the client
|
|
# couldn't have known the token yet.
|
|
session['csrf_token'] = secrets.token_hex(32)
|
|
return None
|
|
token_header = request.headers.get('X-CSRF-Token')
|
|
if not token_header or token_header != token_session:
|
|
return jsonify({'error': 'CSRF token missing or invalid'}), 403
|
|
return None
|
|
|
|
|
|
@app.after_request
|
|
def log_request(response):
|
|
ctx = request_context.get({})
|
|
ctx['status'] = response.status_code
|
|
logger.info(f"{ctx.get('method')} {ctx.get('path')} {ctx.get('status')}")
|
|
return response
|
|
|
|
@app.teardown_request
|
|
def clear_log_context(exc):
|
|
request_context.set({})
|
|
|
|
# Wire vault_manager into Flask app context (vault routes use current_app.vault_manager)
|
|
app.vault_manager = vault_manager
|
|
auth_routes.auth_manager = auth_manager
|
|
|
|
# Apply firewall + DNS rules from stored peer settings (survives API restarts)
|
|
def _configured_domain() -> str:
|
|
identity = config_manager.configs.get('_identity', {})
|
|
# domain_name is the full FQDN (e.g. 'test5.pic.ngo'); fall back to domain
|
|
# (e.g. 'lan', 'dev') for cells that don't have a subdomain prefix.
|
|
return identity.get('domain_name') or identity.get('domain', 'cell')
|
|
|
|
|
|
def _restore_cell_wg_peers(cell_links):
|
|
"""Re-add any cell link [Peer] blocks that are missing from wg0.conf.
|
|
|
|
WireGuard peer blocks can be lost if the container is rebuilt or wg0.conf
|
|
is regenerated from scratch. Cell link data in cell_links.json is the
|
|
authoritative source — this function reconciles the conf file against it.
|
|
"""
|
|
try:
|
|
conf_content = wireguard_manager._read_config()
|
|
restored = 0
|
|
for link in cell_links:
|
|
pk = link.get('public_key', '')
|
|
if not pk or pk in conf_content:
|
|
continue
|
|
name = link.get('cell_name', '')
|
|
endpoint = link.get('endpoint', '')
|
|
vpn_subnet = link.get('vpn_subnet', '')
|
|
ok = wireguard_manager.add_cell_peer(
|
|
name=name, public_key=pk,
|
|
endpoint=endpoint, vpn_subnet=vpn_subnet,
|
|
)
|
|
if ok:
|
|
logger.info(f"Restored missing WG peer block for cell '{name}'")
|
|
restored += 1
|
|
else:
|
|
logger.warning(f"Failed to restore WG peer block for cell '{name}'")
|
|
if restored:
|
|
# Reload conf into the running WireGuard interface
|
|
wireguard_manager._syncconf()
|
|
except Exception as e:
|
|
logger.warning(f"_restore_cell_wg_peers failed (non-fatal): {e}")
|
|
|
|
|
|
def _apply_startup_enforcement():
|
|
try:
|
|
peers = peer_registry.list_peers()
|
|
cell_links = cell_link_manager.list_connections()
|
|
firewall_manager.reconcile_stale_peer_rules(peers)
|
|
import ipaddress as _ipa
|
|
try:
|
|
_wg_addr = wireguard_manager._get_configured_address()
|
|
_wg_subnet = str(_ipa.ip_network(_wg_addr, strict=False)) if _wg_addr else '10.0.0.0/24'
|
|
except Exception:
|
|
_wg_subnet = '10.0.0.0/24'
|
|
_cell_subnets = [l['vpn_subnet'] for l in cell_links if l.get('vpn_subnet')]
|
|
firewall_manager.apply_all_peer_rules(peers, wg_subnet=_wg_subnet, cell_subnets=_cell_subnets)
|
|
firewall_manager.apply_all_cell_rules(cell_links)
|
|
firewall_manager.ensure_forward_stateful()
|
|
firewall_manager.ensure_cell_api_dnat()
|
|
# Embed DNAT rules in PostUp so they survive WireGuard interface restarts,
|
|
# then also apply them immediately for the current session.
|
|
wireguard_manager.ensure_postup_dnat()
|
|
firewall_manager.ensure_dns_dnat()
|
|
firewall_manager.ensure_service_dnat()
|
|
# Allow Docker containers (cell-dns) to reach remote cell subnets via wg0.
|
|
firewall_manager.ensure_wg_masquerade()
|
|
firewall_manager.ensure_cell_subnet_routes(cell_links)
|
|
# Restore any cell link WireGuard peers that were lost from wg0.conf
|
|
# (happens if the container was rebuilt, wg0.conf was reset, etc.)
|
|
_restore_cell_wg_peers(cell_links)
|
|
wireguard_manager.sync_cell_routes()
|
|
firewall_manager.apply_all_dns_rules(peers, COREFILE_PATH, _configured_domain(),
|
|
cell_links=cell_links)
|
|
logger.info(f"Applied enforcement rules for {len(peers)} peers, {len(cell_links)} cells on startup")
|
|
# Phase 3: reapply policy routing rules for peers whose internet traffic is
|
|
# routed through an exit cell (ip rule entries don't survive container restart)
|
|
cell_links_map = {l['cell_name']: l for l in cell_links}
|
|
for peer in peers:
|
|
via_cell = peer.get('route_via')
|
|
if not via_cell:
|
|
continue
|
|
link = cell_links_map.get(via_cell)
|
|
if not link:
|
|
continue
|
|
peer_ip = peer.get('ip', '').split('/')[0]
|
|
if peer_ip:
|
|
wireguard_manager.apply_peer_route_via(peer_ip, via_wg_ip=link['dns_ip'])
|
|
sync_summary = cell_link_manager.replay_pending_pushes()
|
|
if sync_summary.get('attempted'):
|
|
logger.info(f"Startup permission sync: {sync_summary}")
|
|
# Remove legacy builtin containers from old main stack (one-shot, idempotent)
|
|
try:
|
|
cleanup_legacy_builtin_containers(config_manager)
|
|
except Exception as _cle:
|
|
logger.warning(f'legacy cleanup failed (non-fatal): {_cle}')
|
|
# Service store: re-apply firewall/caddy rules for installed services
|
|
try:
|
|
service_store_manager.reapply_on_startup()
|
|
except Exception as _sse:
|
|
logger.warning(f"service_store reapply_on_startup failed (non-fatal): {_sse}")
|
|
# Phase 5: re-apply extended-connectivity policy routing rules
|
|
try:
|
|
connectivity_manager.apply_routes()
|
|
except Exception as _ce:
|
|
logger.warning(f"connectivity apply_routes failed (non-fatal): {_ce}")
|
|
except Exception as e:
|
|
logger.warning(f"Startup enforcement failed (non-fatal): {e}")
|
|
|
|
def _bootstrap_dns():
|
|
try:
|
|
identity = config_manager.configs.get('_identity', {})
|
|
cell_name = identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell'))
|
|
domain = identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell'))
|
|
ip_range = identity.get('ip_range', os.environ.get('CELL_IP_RANGE', '172.20.0.0/16'))
|
|
domain_mode = identity.get('domain_mode', 'lan')
|
|
if domain_mode == 'lan':
|
|
# LAN mode: write full service records into the primary local zone.
|
|
network_manager.apply_ip_range(ip_range, cell_name, domain)
|
|
else:
|
|
# Non-LAN mode (DDNS/ACME): ensure the split-horizon zone is present so
|
|
# LAN clients resolve service subdomains to the internal Caddy IP.
|
|
# Never call apply_ip_range here — it would pollute the DDNS parent zone.
|
|
effective_domain = config_manager.get_effective_domain()
|
|
if effective_domain and effective_domain != domain:
|
|
import ip_utils
|
|
caddy_ip = ip_utils.get_service_ips(ip_range).get('caddy', '172.20.0.2')
|
|
# update_split_horizon_zone writes both the zone file and the Corefile
|
|
# (with the split-horizon block included). No separate apply_all_dns_rules
|
|
# call needed — that would overwrite the Corefile and drop the split-horizon block.
|
|
network_manager.update_split_horizon_zone(
|
|
effective_domain, caddy_ip, primary_domain=domain)
|
|
except Exception as e:
|
|
logger.warning(f"DNS bootstrap failed (non-fatal): {e}")
|
|
|
|
COREFILE_PATH = '/app/config/dns/Corefile'
|
|
|
|
|
|
def _recover_pending_apply():
|
|
"""If the previous all-services apply was interrupted (helper died mid-run),
|
|
clear the 'applying' flag so the UI shows the changes as still pending (not stuck
|
|
in an 'applying' spinner) and the user can retry."""
|
|
try:
|
|
pending = config_manager.configs.get('_pending_restart', {})
|
|
if pending.get('applying') and pending.get('needs_restart'):
|
|
config_manager.configs['_pending_restart']['applying'] = False
|
|
config_manager._save_all_configs()
|
|
logger.warning("Previous config apply did not complete — pending changes restored for retry")
|
|
except Exception as e:
|
|
logger.warning(f"Pending apply recovery check failed: {e}")
|
|
|
|
|
|
_recover_pending_apply()
|
|
|
|
|
|
def _sync_wg_keys():
|
|
try:
|
|
wireguard_manager._sync_keys_from_conf()
|
|
except Exception as e:
|
|
logger.warning(f"WireGuard key sync failed (non-fatal): {e}")
|
|
|
|
|
|
# Run in background so startup isn't blocked waiting on docker exec
|
|
threading.Thread(target=_sync_wg_keys, daemon=True).start()
|
|
threading.Thread(target=_apply_startup_enforcement, daemon=True).start()
|
|
threading.Thread(target=_bootstrap_dns, daemon=True).start()
|
|
|
|
# Register services with service bus
|
|
service_bus.register_service('network', network_manager)
|
|
service_bus.register_service('wireguard', wireguard_manager)
|
|
service_bus.register_service('email', email_manager)
|
|
service_bus.register_service('calendar', calendar_manager)
|
|
service_bus.register_service('files', file_manager)
|
|
service_bus.register_service('routing', routing_manager)
|
|
service_bus.register_service('vault', app.vault_manager)
|
|
service_bus.register_service('container', container_manager)
|
|
|
|
# Register auth blueprint
|
|
app.register_blueprint(auth_routes.auth_bp)
|
|
|
|
# Register setup blueprint (no auth required — runs before any account exists)
|
|
from routes.setup import setup_bp
|
|
app.register_blueprint(setup_bp)
|
|
|
|
# Register service blueprints (routes extracted from this file)
|
|
from routes.email import bp as _email_bp
|
|
from routes.calendar import bp as _calendar_bp
|
|
from routes.files import bp as _files_bp
|
|
from routes.network import bp as _network_bp
|
|
from routes.wireguard import bp as _wireguard_bp
|
|
from routes.cells import bp as _cells_bp
|
|
from routes.peers import bp as _peers_bp
|
|
from routes.routing import bp as _routing_bp
|
|
from routes.vault import bp as _vault_bp
|
|
from routes.containers import bp as _containers_bp
|
|
from routes.services import bp as _services_bp
|
|
from routes.peer_dashboard import bp as _peer_dashboard_bp
|
|
from routes.config import bp as _config_bp
|
|
app.register_blueprint(_email_bp)
|
|
app.register_blueprint(_calendar_bp)
|
|
app.register_blueprint(_files_bp)
|
|
app.register_blueprint(_network_bp)
|
|
app.register_blueprint(_wireguard_bp)
|
|
app.register_blueprint(_cells_bp)
|
|
app.register_blueprint(_peers_bp)
|
|
app.register_blueprint(_routing_bp)
|
|
app.register_blueprint(_vault_bp)
|
|
app.register_blueprint(_containers_bp)
|
|
app.register_blueprint(_services_bp)
|
|
app.register_blueprint(_peer_dashboard_bp)
|
|
app.register_blueprint(_config_bp)
|
|
|
|
from routes.service_store import store_bp
|
|
app.register_blueprint(store_bp)
|
|
|
|
# Re-export config helpers so existing test imports/patches keep working
|
|
from routes.config import (
|
|
_set_pending_restart, _clear_pending_restart,
|
|
_collect_service_ports, _dedup_changes,
|
|
)
|
|
|
|
# Unified health monitoring
|
|
HEALTH_HISTORY_SIZE = 100
|
|
health_history = deque(maxlen=HEALTH_HISTORY_SIZE)
|
|
health_monitor_running = True
|
|
|
|
# Health alerting configuration
|
|
HEALTH_ALERT_THRESHOLD = 3 # Number of consecutive failures before alert
|
|
service_alert_counters = {}
|
|
|
|
def perform_health_check():
|
|
"""Perform a unified health check of all services, with alerting."""
|
|
try:
|
|
# Use service bus to get health from all services
|
|
result = {
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'alerts': []
|
|
}
|
|
|
|
# email/calendar/files are optional store services — only check them when installed
|
|
_installed_store_ids = set(config_manager.get_installed_services())
|
|
_OPTIONAL_STORE_MANAGERS = frozenset({'email_manager', 'calendar_manager', 'file_manager'})
|
|
|
|
# Get health from each service
|
|
for service_name in service_bus.list_services():
|
|
if service_name in _OPTIONAL_STORE_MANAGERS:
|
|
# Map manager name to store service id (strip _manager suffix)
|
|
store_id = service_name.replace('_manager', '')
|
|
if store_id not in _installed_store_ids:
|
|
continue
|
|
try:
|
|
service = service_bus.get_service(service_name)
|
|
if hasattr(service, 'health_check'):
|
|
health = service.health_check()
|
|
else:
|
|
health = service.get_status()
|
|
result[service_name] = health
|
|
except Exception as e:
|
|
result[service_name] = {'error': str(e), 'status': 'offline'}
|
|
|
|
# Health alerting logic — alert only when a service container is not running
|
|
global service_alert_counters
|
|
for service_name in service_bus.list_services():
|
|
if service_name in result:
|
|
status = result[service_name]
|
|
healthy = True
|
|
|
|
if isinstance(status, dict):
|
|
# Prefer status.running (container actually up) over healthy (connectivity tests)
|
|
inner = status.get('status', {})
|
|
if isinstance(inner, dict):
|
|
if 'running' in inner:
|
|
healthy = inner['running']
|
|
elif 'status' in inner:
|
|
healthy = str(inner['status']).lower() in ('ok', 'healthy', 'online', 'active')
|
|
elif 'running' in status:
|
|
healthy = status['running']
|
|
elif 'error' in status:
|
|
healthy = False
|
|
else:
|
|
healthy = bool(status)
|
|
|
|
# Only count as unhealthy if we're certain it's down
|
|
if not healthy:
|
|
service_alert_counters[service_name] = service_alert_counters.get(service_name, 0) + 1
|
|
if service_alert_counters[service_name] >= HEALTH_ALERT_THRESHOLD:
|
|
alert_msg = f"ALERT: {service_name} unhealthy for {service_alert_counters[service_name]} consecutive checks."
|
|
logger.warning(alert_msg)
|
|
result['alerts'].append(alert_msg)
|
|
|
|
# Publish alert event
|
|
service_bus.publish_event(EventType.ERROR_OCCURRED, service_name, {
|
|
'error': alert_msg,
|
|
'service': service_name,
|
|
'consecutive_failures': service_alert_counters[service_name]
|
|
})
|
|
else:
|
|
# Reset counter if service is healthy
|
|
if service_alert_counters.get(service_name, 0) > 0:
|
|
logger.info(f"Service {service_name} recovered, resetting alert counter")
|
|
service_alert_counters[service_name] = 0
|
|
|
|
logger.info(f"Unified health check: {result}")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Unified health check failed: {e}")
|
|
return {'error': str(e), 'timestamp': datetime.utcnow().isoformat()}
|
|
|
|
def health_monitor_loop():
|
|
_cert_check_cycle = 0
|
|
while health_monitor_running:
|
|
with app.app_context():
|
|
health_result = perform_health_check()
|
|
health_history.appendleft(health_result)
|
|
service_bus.publish_event(EventType.HEALTH_CHECK, 'api', health_result)
|
|
# Re-anchor stateful rule every cycle: wg0 PostUp uses -I FORWARD which
|
|
# pushes ESTABLISHED,RELATED down below per-peer DROPs on restart.
|
|
firewall_manager.ensure_forward_stateful()
|
|
# Caddy health monitor: 3 consecutive failures triggers a restart.
|
|
try:
|
|
if caddy_manager.check_caddy_health():
|
|
caddy_manager.reset_health_failures()
|
|
else:
|
|
count = caddy_manager.increment_health_failure()
|
|
if count >= 3:
|
|
logger.warning(
|
|
"Caddy health check failed %d times \u2014 restarting",
|
|
count,
|
|
)
|
|
container_manager.restart_container('cell-caddy')
|
|
caddy_manager.reset_health_failures()
|
|
except Exception as _caddy_err:
|
|
logger.error("Caddy health monitor error: %s", _caddy_err)
|
|
# Refresh cert status every 60 cycles (\u2248 1 hour with a 60 s loop).
|
|
_cert_check_cycle += 1
|
|
if _cert_check_cycle >= 60:
|
|
_cert_check_cycle = 0
|
|
try:
|
|
caddy_manager.refresh_cert_status()
|
|
except Exception as _cert_err:
|
|
logger.warning("Cert status refresh failed (non-fatal): %s", _cert_err)
|
|
time.sleep(60) # Check every 60 seconds
|
|
|
|
# Start health monitor thread
|
|
health_monitor_thread = threading.Thread(target=health_monitor_loop, daemon=True)
|
|
health_monitor_thread.start()
|
|
|
|
# Start DDNS heartbeat thread (updates public IP every 5 minutes when a provider is configured)
|
|
ddns_manager.start_heartbeat()
|
|
|
|
def _local_subnets():
|
|
"""Return all subnets the container is directly connected to (from routing table)."""
|
|
import ipaddress as _ipa, socket as _sock, struct as _struct
|
|
nets = []
|
|
try:
|
|
with open('/proc/net/route') as _f:
|
|
for _line in _f.readlines()[1:]:
|
|
_parts = _line.strip().split()
|
|
if len(_parts) < 8 or _parts[0] == 'lo':
|
|
continue
|
|
_dest = _sock.inet_ntoa(_struct.pack('<I', int(_parts[1], 16)))
|
|
_mask = _sock.inet_ntoa(_struct.pack('<I', int(_parts[7], 16)))
|
|
if _dest == '0.0.0.0':
|
|
continue
|
|
nets.append(_ipa.ip_network(f'{_dest}/{_mask}', strict=False))
|
|
except Exception:
|
|
pass
|
|
return nets
|
|
|
|
|
|
def is_local_request():
|
|
# Trust the direct TCP peer (request.remote_addr) first — it is always
|
|
# the container or process making the connection and cannot be spoofed.
|
|
# In production Flask is behind Caddy inside Docker, so remote_addr is
|
|
# always Caddy's Docker IP (RFC-1918) and this check is sufficient.
|
|
#
|
|
# Additionally, when a trusted reverse-proxy (Caddy) is in the path, it
|
|
# appends the real client IP as the LAST entry of X-Forwarded-For.
|
|
# Trusting only the LAST XFF entry (not the first, which a client could
|
|
# set to anything) is safe: a spoofed first entry such as
|
|
# "XFF: 127.0.0.1, <real-ip>" still passes because the last entry is the
|
|
# real IP appended by Caddy. An attacker directly hitting Flask on :3000
|
|
# could craft any XFF they like, but in the Docker topology port 3000 is
|
|
# not exposed to the internet.
|
|
remote_addr = request.remote_addr
|
|
|
|
def _allowed(addr):
|
|
if not addr:
|
|
return False
|
|
if addr in ('127.0.0.1', '::1', 'localhost'):
|
|
return True
|
|
try:
|
|
import ipaddress as _ipa
|
|
ip = _ipa.ip_address(addr.strip())
|
|
if ip.is_loopback:
|
|
return True
|
|
# Only trust loopback and Docker bridge (172.16.0.0/12).
|
|
# Deliberately excludes 10.0.0.0/8 (WireGuard peer subnet) and
|
|
# 192.168.0.0/16 (LAN) — VPN peers must not access local-only endpoints.
|
|
if ip in _ipa.ip_network('172.16.0.0/12'):
|
|
return True
|
|
# Any subnet the container is directly attached to (handles non-RFC-1918
|
|
# Docker bridge networks such as 172.0.0.0/24).
|
|
for _net in _local_subnets():
|
|
if ip in _net:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
if _allowed(remote_addr):
|
|
return True
|
|
|
|
# Check the last X-Forwarded-For entry (appended by the trusted proxy).
|
|
# Never trust any entry other than the last one.
|
|
try:
|
|
xff = request.headers.get('X-Forwarded-For', '')
|
|
if xff:
|
|
last_ip = xff.split(',')[-1].strip()
|
|
if last_ip and _allowed(last_ip):
|
|
return True
|
|
except Exception:
|
|
pass
|
|
|
|
return False
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health_check():
|
|
"""Health check endpoint."""
|
|
try:
|
|
return jsonify({
|
|
"status": "healthy",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"version": "1.0.0"
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Health check failed: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route('/api/status', methods=['GET'])
|
|
def get_cell_status():
|
|
"""Get overall cell status."""
|
|
try:
|
|
# Use service bus to get status from all services
|
|
services_status = {}
|
|
for service_name in service_bus.list_services():
|
|
try:
|
|
service = service_bus.get_service(service_name)
|
|
services_status[service_name] = service.get_status()
|
|
except Exception as e:
|
|
services_status[service_name] = {'error': str(e)}
|
|
|
|
peers = peer_registry.list_peers()
|
|
|
|
# Calculate actual uptime
|
|
current_time = time.time()
|
|
uptime_seconds = int(current_time - API_START_TIME)
|
|
|
|
identity = config_manager.configs.get('_identity', {})
|
|
return jsonify({
|
|
"cell_name": identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell')),
|
|
"domain": identity.get('domain', os.environ.get('CELL_DOMAIN', 'cell')),
|
|
"effective_domain": config_manager.get_effective_domain(),
|
|
"uptime": uptime_seconds,
|
|
"peers_count": len(peers),
|
|
"services": services_status,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error getting cell status: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route('/api/health/history', methods=['GET'])
|
|
def get_health_history():
|
|
"""Get recent unified health check results."""
|
|
return jsonify(list(health_history))
|
|
|
|
@app.route('/api/health/history/clear', methods=['POST'])
|
|
def clear_health_history():
|
|
"""Clear health history and reset alert counters."""
|
|
global service_alert_counters
|
|
health_history.clear()
|
|
service_alert_counters = {}
|
|
return jsonify({'message': 'Health history cleared'})
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Phase 5 — Extended connectivity routes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route('/api/connectivity/status', methods=['GET'])
|
|
def connectivity_status():
|
|
"""Return connectivity manager status (configured exits, peer counts)."""
|
|
try:
|
|
return jsonify(connectivity_manager.get_status())
|
|
except Exception as e:
|
|
logger.error(f"connectivity_status: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/connectivity/exits', methods=['GET'])
|
|
def connectivity_list_exits():
|
|
"""List configured exits and their state."""
|
|
try:
|
|
return jsonify({'exits': connectivity_manager.list_exits()})
|
|
except Exception as e:
|
|
logger.error(f"connectivity_list_exits: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/connectivity/exits/wireguard', methods=['POST'])
|
|
def connectivity_upload_wireguard():
|
|
"""Upload an external WireGuard config (becomes wg_ext0)."""
|
|
try:
|
|
data = request.get_json(silent=True) or {}
|
|
conf_text = data.get('conf_text', '')
|
|
if not isinstance(conf_text, str) or not conf_text.strip():
|
|
return jsonify({'ok': False, 'error': 'conf_text is required'}), 400
|
|
result = connectivity_manager.upload_wireguard_ext(conf_text)
|
|
if result.get('ok'):
|
|
return jsonify(result)
|
|
return jsonify(result), 400
|
|
except Exception as e:
|
|
logger.error(f"connectivity_upload_wireguard: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/connectivity/exits/openvpn', methods=['POST'])
|
|
def connectivity_upload_openvpn():
|
|
"""Upload an OpenVPN profile (.ovpn)."""
|
|
try:
|
|
data = request.get_json(silent=True) or {}
|
|
ovpn_text = data.get('ovpn_text', '')
|
|
name = data.get('name', 'default')
|
|
if not isinstance(ovpn_text, str) or not ovpn_text.strip():
|
|
return jsonify({'ok': False, 'error': 'ovpn_text is required'}), 400
|
|
result = connectivity_manager.upload_openvpn(ovpn_text, name=name)
|
|
if result.get('ok'):
|
|
return jsonify(result)
|
|
return jsonify(result), 400
|
|
except Exception as e:
|
|
logger.error(f"connectivity_upload_openvpn: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/connectivity/exits/apply', methods=['POST'])
|
|
def connectivity_apply_routes():
|
|
"""Idempotently re-apply all connectivity policy routing rules."""
|
|
try:
|
|
result = connectivity_manager.apply_routes()
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"connectivity_apply_routes: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/connectivity/peers/<peer_name>/exit', methods=['PUT'])
|
|
def connectivity_set_peer_exit(peer_name: str):
|
|
"""Assign a peer to an egress exit type."""
|
|
try:
|
|
data = request.get_json(silent=True) or {}
|
|
exit_via = data.get('exit_via')
|
|
if not isinstance(exit_via, str):
|
|
return jsonify({'ok': False, 'error': 'exit_via is required'}), 400
|
|
result = connectivity_manager.set_peer_exit(peer_name, exit_via)
|
|
if result.get('ok'):
|
|
return jsonify(result)
|
|
return jsonify(result), 400
|
|
except Exception as e:
|
|
logger.error(f"connectivity_set_peer_exit({peer_name}): {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/connectivity/peers', methods=['GET'])
|
|
def connectivity_get_peer_exits():
|
|
"""Return {peer_name: exit_type} for all peers."""
|
|
try:
|
|
return jsonify({'peers': connectivity_manager.get_peer_exits()})
|
|
except Exception as e:
|
|
logger.error(f"connectivity_get_peer_exits: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/caddy/cert-status', methods=['GET'])
|
|
def caddy_cert_status():
|
|
"""Return TLS certificate status (expiry, days remaining, status).
|
|
|
|
Refreshes from Caddy if the cached value is older than 5 minutes.
|
|
For LAN mode returns {'status': 'internal'}; for ACME modes returns
|
|
expiry info read via SSL handshake with the Caddy container.
|
|
"""
|
|
try:
|
|
return jsonify(caddy_manager.get_cert_status_fresh(max_age_seconds=300))
|
|
except Exception as e:
|
|
logger.error(f"caddy_cert_status: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/egress/status', methods=['GET'])
|
|
def egress_status():
|
|
"""Return egress status for all installed services that have an egress config."""
|
|
try:
|
|
return jsonify(egress_manager.get_status())
|
|
except Exception as e:
|
|
logger.error(f"egress_status: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/egress/services/<service_id>/exit', methods=['PUT'])
|
|
def egress_set_service_exit(service_id: str):
|
|
"""Persist and immediately apply a per-service egress override."""
|
|
try:
|
|
data = request.get_json(silent=True) or {}
|
|
exit_type = data.get('exit_type')
|
|
if not isinstance(exit_type, str):
|
|
return jsonify({'ok': False, 'error': 'exit_type is required'}), 400
|
|
result = egress_manager.set_service_exit(service_id, exit_type)
|
|
if result.get('ok'):
|
|
return jsonify(result)
|
|
return jsonify(result), 400
|
|
except Exception as e:
|
|
logger.error(f"egress_set_service_exit({service_id}): {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
debug = os.environ.get('FLASK_DEBUG', '0') == '1'
|
|
app.run(host='0.0.0.0', port=3000, debug=debug) |