fix(P2): peer add rollback, helper failure recovery, manager extraction (A2/A3/A5)

A3 — Peer add atomicity: track firewall_applied flag and call
clear_peer_rules() during rollback so partial peer-add failures
don't leave stale iptables rules behind. Added test.

A2 — Pending config flag: instead of clearing before spawning the
helper container (fire-and-forget), set applying=True and let the
helper clear it on success by writing to cell_config.json via a
mounted /app/data volume. On API restart after a failed apply,
_recover_pending_apply() resets the applying flag so the UI shows
pending changes and the user can retry. GET /api/config/pending now
includes the applying field.

A5 (foundation) — Extract all manager instantiation into managers.py.
app.py re-exports every name so existing test patches (patch('app.X'))
continue to work unchanged. 1021 unit tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-01 05:27:39 -04:00
parent 2455fe189e
commit d54844cd44
4 changed files with 218 additions and 76 deletions
+72 -73
View File
@@ -33,23 +33,20 @@ import contextvars
# Track API start time for uptime calculation
API_START_TIME = time.time()
from network_manager import NetworkManager
from wireguard_manager import WireGuardManager, _resolve_peer_dns
from peer_registry import PeerRegistry
from email_manager import EmailManager
from calendar_manager import CalendarManager
from file_manager import FileManager
from routing_manager import RoutingManager
# Manager singletons — all instantiated in managers.py; imported here so routes can
# reference them by module-level name and test patches (`patch('app.X', mock)`) work.
from managers import (
config_manager, service_bus, log_manager,
network_manager, wireguard_manager, peer_registry,
email_manager, calendar_manager, file_manager,
routing_manager, vault_manager, container_manager,
cell_link_manager, auth_manager,
firewall_manager, EventType,
)
# Re-exports: tests do `from app import CellManager` and `from app import _resolve_peer_dns`
from cell_manager import CellManager
from vault_manager import VaultManager
from container_manager import ContainerManager
from config_manager import ConfigManager
from service_bus import ServiceBus, EventType
from log_manager import LogManager
from cell_link_manager import CellLinkManager
import firewall_manager
from wireguard_manager import _resolve_peer_dns
from port_registry import PORT_FIELDS, detect_conflicts
from auth_manager import AuthManager
import auth_routes
# Context variable for request info
@@ -137,41 +134,8 @@ app.config['SECRET_KEY'] = _flask_secret
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# Initialize enhanced components
config_manager = ConfigManager(
config_file=os.path.join(os.environ.get('CONFIG_DIR', '/app/config'), 'cell_config.json'),
data_dir=os.environ.get('DATA_DIR', '/app/data'),
)
service_bus = ServiceBus()
log_manager = LogManager(log_dir='./data/logs')
# Initialize service loggers
service_log_configs = {
'network': {'level': 'INFO', 'formatter': 'json', 'console': False},
'wireguard': {'level': 'INFO', 'formatter': 'json', 'console': False},
'email': {'level': 'INFO', 'formatter': 'json', 'console': False},
'calendar': {'level': 'INFO', 'formatter': 'json', 'console': False},
'files': {'level': 'INFO', 'formatter': 'json', 'console': False},
'routing': {'level': 'INFO', 'formatter': 'json', 'console': False},
'vault': {'level': 'INFO', 'formatter': 'json', 'console': False},
'api': {'level': 'INFO', 'formatter': 'json', 'console': True}
}
for service, config in service_log_configs.items():
log_manager.add_service_logger(service, config)
# Apply any persisted log level overrides
_levels_file = os.path.join(os.path.dirname(__file__), 'config', 'log_levels.json')
if os.path.exists(_levels_file):
try:
with open(_levels_file) as _f:
for _svc, _lvl in json.load(_f).items():
log_manager.set_service_level(_svc, _lvl)
except Exception:
pass
# Start service bus
service_bus.start()
# config_manager, service_bus, log_manager and all other managers are imported
# from managers.py above — no re-instantiation needed here.
@app.before_request
def enrich_log_context():
@@ -285,24 +249,8 @@ def log_request(response):
def clear_log_context(exc):
request_context.set({})
# Initialize managers — paths configurable via env for testing
_DATA_DIR = os.environ.get('DATA_DIR', '/app/data')
_CONFIG_DIR = os.environ.get('CONFIG_DIR', '/app/config')
network_manager = NetworkManager(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
wireguard_manager = WireGuardManager(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
peer_registry = PeerRegistry(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
email_manager = EmailManager(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
calendar_manager = CalendarManager(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
file_manager = FileManager(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
routing_manager = RoutingManager(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
app.vault_manager = VaultManager(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
container_manager = ContainerManager(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
cell_link_manager = CellLinkManager(
data_dir=_DATA_DIR, config_dir=_CONFIG_DIR,
wireguard_manager=wireguard_manager, network_manager=network_manager,
)
auth_manager = AuthManager(data_dir=_DATA_DIR, config_dir=_CONFIG_DIR)
# Wire vault_manager into Flask app context (vault routes use current_app.vault_manager)
app.vault_manager = vault_manager
auth_routes.auth_manager = auth_manager
# Apply firewall + DNS rules from stored peer settings (survives API restarts)
@@ -332,6 +280,23 @@ def _bootstrap_dns():
COREFILE_PATH = '/app/config/dns/Corefile'
def _recover_pending_apply():
"""If the previous all-services apply was interrupted (helper died mid-run),
clear the 'applying' flag so the UI shows the changes as still pending (not stuck
in an 'applying' spinner) and the user can retry."""
try:
pending = config_manager.configs.get('_pending_restart', {})
if pending.get('applying') and pending.get('needs_restart'):
config_manager.configs['_pending_restart']['applying'] = False
config_manager._save_all_configs()
logger.warning("Previous config apply did not complete — pending changes restored for retry")
except Exception as e:
logger.warning(f"Pending apply recovery check failed: {e}")
_recover_pending_apply()
# Run in background so startup isn't blocked waiting on docker exec
threading.Thread(target=_apply_startup_enforcement, daemon=True).start()
threading.Thread(target=_bootstrap_dns, daemon=True).start()
@@ -972,6 +937,7 @@ def get_pending_config():
pending = config_manager.configs.get('_pending_restart', {})
return jsonify({
'needs_restart': pending.get('needs_restart', False),
'applying': pending.get('applying', False),
'changed_at': pending.get('changed_at'),
'changes': pending.get('changes', []),
'containers': pending.get('containers', ['*']),
@@ -1023,9 +989,10 @@ def apply_pending_config():
if not pending.get('needs_restart'):
return jsonify({'message': 'No pending changes to apply'})
# Get project working dir and image name from our own container labels
# Get project working dir, image name, and data-dir host path from our container labels/mounts
project_dir = '/home/roof/pic'
api_image = 'pic_api:latest' # fallback (docker-compose v1 naming)
data_host_path = '/home/roof/pic/data/api' # fallback
try:
import docker as _docker_sdk
_client = _docker_sdk.from_env()
@@ -1036,6 +1003,11 @@ def apply_pending_config():
tags = _self.image.tags
if tags:
api_image = tags[0]
# Find the host-side path for /app/data so the helper can clear the pending flag
for _m in _self.attrs.get('Mounts', []):
if _m.get('Destination') == '/app/data':
data_host_path = _m.get('Source', data_host_path)
break
except Exception:
pass
@@ -1053,20 +1025,39 @@ def apply_pending_config():
# API container itself, killing this background thread mid-operation.
# Spawn an independent helper container (same image as cell-api) that has docker
# CLI and survives cell-api being stopped/recreated.
# Clear pending flag now — the helper runs fire-and-forget and we cannot track
# its exit code from within the API process (it may restart us).
_clear_pending_restart()
#
# Mark as "applying" rather than clearing early. The helper clears the flag on
# success by writing to cell_config.json directly (via the /app/data mount).
# If the helper fails, needs_restart stays True so the UI continues showing
# pending changes after the API restarts. On the next startup, if "applying"
# is still set, _recover_pending_apply() resets it so the user can retry.
config_manager.configs['_pending_restart']['applying'] = True
config_manager._save_all_configs()
# Encode the clear script in base64 to avoid shell-quoting issues.
import base64 as _b64
_clear_py = (
"import json,os; p='/app/data/cell_config.json';"
"f=open(p); d=json.load(f); f.close();"
"d['_pending_restart']={'needs_restart':False,'changes':[],'containers':[],'network_recreate':False};"
"tmp=p+'.tmp'; open(tmp,'w').write(json.dumps(d,indent=2)); os.replace(tmp,p)"
)
_b64_cmd = _b64.b64encode(_clear_py.encode()).decode()
clear_flag_cmd = f"python3 -c \"import base64; exec(base64.b64decode('{_b64_cmd}').decode())\""
if needs_network_recreate:
helper_script = (
f'sleep 2'
f' && docker compose --project-directory {project_dir}'
f' -f {host_compose} --env-file {host_env} down'
f' && {clear_flag_cmd}'
f' && docker compose --project-directory {project_dir}'
f' -f {host_compose} --env-file {host_env} up -d'
)
else:
helper_script = (
f'sleep 2'
f' && {clear_flag_cmd}'
f' && docker compose --project-directory {project_dir}'
f' -f {host_compose} --env-file {host_env} up -d'
)
@@ -1077,6 +1068,7 @@ def apply_pending_config():
['docker', 'run', '--rm',
'-v', '/var/run/docker.sock:/var/run/docker.sock',
'-v', f'{project_dir}:{project_dir}',
'-v', f'{data_host_path}:/app/data',
'--entrypoint', 'sh',
api_image,
'-c', helper_script],
@@ -1965,6 +1957,7 @@ def add_peer():
}
peer_added_to_registry = False
firewall_applied = False
try:
# Step 1: Add to registry
success = peer_registry.add_peer(peer_info)
@@ -1987,6 +1980,7 @@ def add_peer():
# Step 2: Firewall rules (critical)
firewall_manager.apply_peer_rules(peer_info['ip'], peer_info)
firewall_applied = True
# Step 3: Add peer to WireGuard server config (non-fatal if WG is not running)
wg_allowed = f"{assigned_ip}/32" if '/' not in assigned_ip else assigned_ip
@@ -2001,7 +1995,12 @@ def add_peer():
return jsonify({"message": f"Peer {peer_name} added successfully", "ip": assigned_ip}), 201
except Exception as e:
# Rollback registry entry if we got past that step
# Rollback: clear firewall rules first, then remove from registry
if firewall_applied:
try:
firewall_manager.clear_peer_rules(peer_info['ip'])
except Exception:
pass
if peer_added_to_registry:
try:
peer_registry.remove_peer(peer_name)