Files
pic/api/setup_manager.py
T
roof f550f04ce2
Unit Tests / test (push) Successful in 15m29s
Fix DDNS registration and wizard pre-fill after installer run
DDNS registration (setup_cell.py):
- Replace pyotp dependency with stdlib TOTP (HMAC-SHA1, RFC 6238)
  pyotp is only available inside the Docker container, not on the host
  where setup_cell.py runs — registration was silently skipped every time
- OTP header still sent if generation succeeds; omitted gracefully if not

Wizard pre-fill (setup_manager + Setup.jsx):
- GET /api/setup/status now returns 'preconfigured' dict with cell_name,
  domain_mode, domain_name, and provider tokens from installer-written config
- Setup.jsx fetches status on mount and pre-fills all form state so the
  user only needs to set password, services, and timezone — not re-enter
  the identity they already configured in the bash installer
- Fails silently so wizard still works on fresh installs with no config

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 12:22:53 -04:00

229 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""
SetupManager — first-run wizard backend for PIC.
Handles validation, locking, and atomic completion of the initial setup
wizard. Called by api/routes/setup.py.
"""
import fcntl
import logging
import os
import re
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
# Top 30 representative IANA time zones shown in the wizard
AVAILABLE_TIMEZONES = [
'UTC',
'America/New_York',
'America/Chicago',
'America/Denver',
'America/Los_Angeles',
'America/Anchorage',
'America/Honolulu',
'America/Sao_Paulo',
'America/Argentina/Buenos_Aires',
'America/Toronto',
'America/Vancouver',
'America/Mexico_City',
'Europe/London',
'Europe/Paris',
'Europe/Berlin',
'Europe/Madrid',
'Europe/Rome',
'Europe/Amsterdam',
'Europe/Moscow',
'Europe/Istanbul',
'Africa/Cairo',
'Africa/Johannesburg',
'Asia/Dubai',
'Asia/Kolkata',
'Asia/Bangkok',
'Asia/Shanghai',
'Asia/Tokyo',
'Asia/Seoul',
'Australia/Sydney',
'Pacific/Auckland',
]
AVAILABLE_SERVICES = [
'email',
'calendar',
'files',
'wireguard',
]
VALID_DOMAIN_MODES = {'pic_ngo', 'cloudflare', 'duckdns', 'http01', 'lan'}
CELL_NAME_RE = re.compile(r'^[a-z][a-z0-9-]{1,30}$')
class SetupManager:
"""Manages the first-run setup wizard state and completion."""
def __init__(self, config_manager, auth_manager):
self.config_manager = config_manager
self.auth_manager = auth_manager
# ── state helpers ─────────────────────────────────────────────────────
def is_setup_complete(self) -> bool:
"""Return True if setup has already been completed."""
return bool(self.config_manager.get_identity().get('setup_complete', False))
def get_setup_status(self) -> Dict[str, Any]:
"""Return current setup status, wizard metadata, and any pre-configured identity."""
identity = self.config_manager.get_identity()
preconfigured = {
k: v for k, v in {
'cell_name': identity.get('cell_name', ''),
'domain_mode': identity.get('domain_mode', ''),
'domain_name': identity.get('domain_name', ''),
'cloudflare_api_token': identity.get('cloudflare_api_token', ''),
'duckdns_token': identity.get('duckdns_token', ''),
}.items() if v
}
return {
'complete': self.is_setup_complete(),
'available_services': AVAILABLE_SERVICES,
'available_timezones': AVAILABLE_TIMEZONES,
'preconfigured': preconfigured,
}
# ── validation ────────────────────────────────────────────────────────
def validate_cell_name(self, name: str) -> List[str]:
"""Validate a proposed cell name. Returns a list of error strings."""
errors: List[str] = []
if not name:
errors.append('Cell name is required.')
return errors
if not CELL_NAME_RE.match(name):
errors.append(
'Cell name must start with a lowercase letter, be 2–31 characters, '
'and contain only lowercase letters, digits, and hyphens.'
)
if name.startswith('-') or name.endswith('-'):
errors.append('Cell name must not start or end with a hyphen.')
return errors
def validate_password(self, password: str) -> List[str]:
"""Validate admin password strength. Returns a list of error strings."""
errors: List[str] = []
if not password:
errors.append('Password is required.')
return errors
if len(password) < 12:
errors.append('Password must be at least 12 characters long.')
if not re.search(r'[A-Z]', password):
errors.append('Password must contain at least one uppercase letter.')
if not re.search(r'[a-z]', password):
errors.append('Password must contain at least one lowercase letter.')
if not re.search(r'\d', password):
errors.append('Password must contain at least one digit.')
return errors
# ── main completion ───────────────────────────────────────────────────
def complete_setup(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Run all validation, then atomically complete the setup wizard.
Returns ``{'success': True, 'redirect': '/login'}`` on success or
``{'success': False, 'errors': [...]}`` on any failure.
"""
errors: List[str] = []
# ── validate inputs ────────────────────────────────────────────────
cell_name = payload.get('cell_name', '')
password = payload.get('password', '')
domain_mode = payload.get('domain_mode', '')
domain_name = payload.get('domain_name', '')
timezone = payload.get('timezone', '')
services_enabled = payload.get('services_enabled', [])
ddns_provider = payload.get('ddns_provider', 'none')
cloudflare_api_token = payload.get('cloudflare_api_token', '')
duckdns_token = payload.get('duckdns_token', '')
errors.extend(self.validate_cell_name(cell_name))
errors.extend(self.validate_password(password))
if domain_mode not in VALID_DOMAIN_MODES:
errors.append(
f"domain_mode must be one of: {', '.join(sorted(VALID_DOMAIN_MODES))}."
)
if not timezone or not isinstance(timezone, str):
errors.append('timezone is required.')
if not isinstance(services_enabled, list):
errors.append('services_enabled must be a list.')
if errors:
return {'success': False, 'errors': errors}
# ── acquire file lock to prevent double-completion ─────────────────
lock_path = os.path.join(
os.environ.get('DATA_DIR', '/app/data'), 'api', '.setup.lock'
)
try:
os.makedirs(os.path.dirname(lock_path), exist_ok=True)
except OSError:
pass
try:
lock_fd = open(lock_path, 'w')
fcntl.flock(lock_fd, fcntl.LOCK_EX)
except OSError as exc:
logger.error(f'Could not acquire setup lock: {exc}')
return {'success': False, 'errors': ['Setup lock could not be acquired. Try again.']}
try:
# Re-check inside lock
if self.is_setup_complete():
return {'success': False, 'errors': ['Setup has already been completed.']}
# ── create or update admin user ────────────────────────────────
# The installer may have bootstrapped an admin account from a
# generated password. The wizard's job is to set the real password,
# so update it if the account already exists.
ok = self.auth_manager.create_user(
username='admin',
password=password,
role='admin',
)
if not ok:
ok = self.auth_manager.update_password('admin', password)
if not ok:
return {'success': False, 'errors': ['Failed to set admin password.']}
# ── persist identity fields ────────────────────────────────────
self.config_manager.set_identity_field('cell_name', cell_name)
self.config_manager.set_identity_field('domain_mode', domain_mode)
if domain_name:
self.config_manager.set_identity_field('domain_name', domain_name)
self.config_manager.set_identity_field('timezone', timezone)
self.config_manager.set_identity_field('services_enabled', services_enabled)
self.config_manager.set_identity_field('ddns_provider', ddns_provider)
if cloudflare_api_token:
self.config_manager.set_identity_field('cloudflare_api_token', cloudflare_api_token)
if duckdns_token:
self.config_manager.set_identity_field('duckdns_token', duckdns_token)
logger.info(
'DDNS registration deferred to Phase 3. '
f'ddns_provider={ddns_provider!r} domain_name={domain_name!r}'
)
# ── mark setup complete (must be last) ─────────────────────────
self.config_manager.set_identity_field('setup_complete', True)
logger.info(f"Setup completed. cell_name={cell_name!r}, domain_mode={domain_mode!r}")
return {'success': True, 'redirect': '/login'}
finally:
try:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
except Exception:
pass