Files
pic/api/setup_manager.py
T
roof f550f04ce2
Unit Tests / test (push) Successful in 15m29s
Fix DDNS registration and wizard pre-fill after installer run
DDNS registration (setup_cell.py):
- Replace pyotp dependency with stdlib TOTP (HMAC-SHA1, RFC 6238)
  pyotp is only available inside the Docker container, not on the host
  where setup_cell.py runs — registration was silently skipped every time
- OTP header still sent if generation succeeds; omitted gracefully if not

Wizard pre-fill (setup_manager + Setup.jsx):
- GET /api/setup/status now returns 'preconfigured' dict with cell_name,
  domain_mode, domain_name, and provider tokens from installer-written config
- Setup.jsx fetches status on mount and pre-fills all form state so the
  user only needs to set password, services, and timezone — not re-enter
  the identity they already configured in the bash installer
- Fails silently so wizard still works on fresh installs with no config

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 12:22:53 -04:00

229 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
SetupManager — first-run wizard backend for PIC.
Handles validation, locking, and atomic completion of the initial setup
wizard. Called by api/routes/setup.py.
"""
import fcntl
import logging
import os
import re
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
# Top 30 representative IANA time zones shown in the wizard
AVAILABLE_TIMEZONES = [
'UTC',
'America/New_York',
'America/Chicago',
'America/Denver',
'America/Los_Angeles',
'America/Anchorage',
'America/Honolulu',
'America/Sao_Paulo',
'America/Argentina/Buenos_Aires',
'America/Toronto',
'America/Vancouver',
'America/Mexico_City',
'Europe/London',
'Europe/Paris',
'Europe/Berlin',
'Europe/Madrid',
'Europe/Rome',
'Europe/Amsterdam',
'Europe/Moscow',
'Europe/Istanbul',
'Africa/Cairo',
'Africa/Johannesburg',
'Asia/Dubai',
'Asia/Kolkata',
'Asia/Bangkok',
'Asia/Shanghai',
'Asia/Tokyo',
'Asia/Seoul',
'Australia/Sydney',
'Pacific/Auckland',
]
AVAILABLE_SERVICES = [
'email',
'calendar',
'files',
'wireguard',
]
VALID_DOMAIN_MODES = {'pic_ngo', 'cloudflare', 'duckdns', 'http01', 'lan'}
CELL_NAME_RE = re.compile(r'^[a-z][a-z0-9-]{1,30}$')
class SetupManager:
"""Manages the first-run setup wizard state and completion."""
def __init__(self, config_manager, auth_manager):
self.config_manager = config_manager
self.auth_manager = auth_manager
# ── state helpers ─────────────────────────────────────────────────────
def is_setup_complete(self) -> bool:
"""Return True if setup has already been completed."""
return bool(self.config_manager.get_identity().get('setup_complete', False))
def get_setup_status(self) -> Dict[str, Any]:
"""Return current setup status, wizard metadata, and any pre-configured identity."""
identity = self.config_manager.get_identity()
preconfigured = {
k: v for k, v in {
'cell_name': identity.get('cell_name', ''),
'domain_mode': identity.get('domain_mode', ''),
'domain_name': identity.get('domain_name', ''),
'cloudflare_api_token': identity.get('cloudflare_api_token', ''),
'duckdns_token': identity.get('duckdns_token', ''),
}.items() if v
}
return {
'complete': self.is_setup_complete(),
'available_services': AVAILABLE_SERVICES,
'available_timezones': AVAILABLE_TIMEZONES,
'preconfigured': preconfigured,
}
# ── validation ────────────────────────────────────────────────────────
def validate_cell_name(self, name: str) -> List[str]:
"""Validate a proposed cell name. Returns a list of error strings."""
errors: List[str] = []
if not name:
errors.append('Cell name is required.')
return errors
if not CELL_NAME_RE.match(name):
errors.append(
'Cell name must start with a lowercase letter, be 231 characters, '
'and contain only lowercase letters, digits, and hyphens.'
)
if name.startswith('-') or name.endswith('-'):
errors.append('Cell name must not start or end with a hyphen.')
return errors
def validate_password(self, password: str) -> List[str]:
"""Validate admin password strength. Returns a list of error strings."""
errors: List[str] = []
if not password:
errors.append('Password is required.')
return errors
if len(password) < 12:
errors.append('Password must be at least 12 characters long.')
if not re.search(r'[A-Z]', password):
errors.append('Password must contain at least one uppercase letter.')
if not re.search(r'[a-z]', password):
errors.append('Password must contain at least one lowercase letter.')
if not re.search(r'\d', password):
errors.append('Password must contain at least one digit.')
return errors
# ── main completion ───────────────────────────────────────────────────
def complete_setup(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Run all validation, then atomically complete the setup wizard.
Returns ``{'success': True, 'redirect': '/login'}`` on success or
``{'success': False, 'errors': [...]}`` on any failure.
"""
errors: List[str] = []
# ── validate inputs ────────────────────────────────────────────────
cell_name = payload.get('cell_name', '')
password = payload.get('password', '')
domain_mode = payload.get('domain_mode', '')
domain_name = payload.get('domain_name', '')
timezone = payload.get('timezone', '')
services_enabled = payload.get('services_enabled', [])
ddns_provider = payload.get('ddns_provider', 'none')
cloudflare_api_token = payload.get('cloudflare_api_token', '')
duckdns_token = payload.get('duckdns_token', '')
errors.extend(self.validate_cell_name(cell_name))
errors.extend(self.validate_password(password))
if domain_mode not in VALID_DOMAIN_MODES:
errors.append(
f"domain_mode must be one of: {', '.join(sorted(VALID_DOMAIN_MODES))}."
)
if not timezone or not isinstance(timezone, str):
errors.append('timezone is required.')
if not isinstance(services_enabled, list):
errors.append('services_enabled must be a list.')
if errors:
return {'success': False, 'errors': errors}
# ── acquire file lock to prevent double-completion ─────────────────
lock_path = os.path.join(
os.environ.get('DATA_DIR', '/app/data'), 'api', '.setup.lock'
)
try:
os.makedirs(os.path.dirname(lock_path), exist_ok=True)
except OSError:
pass
try:
lock_fd = open(lock_path, 'w')
fcntl.flock(lock_fd, fcntl.LOCK_EX)
except OSError as exc:
logger.error(f'Could not acquire setup lock: {exc}')
return {'success': False, 'errors': ['Setup lock could not be acquired. Try again.']}
try:
# Re-check inside lock
if self.is_setup_complete():
return {'success': False, 'errors': ['Setup has already been completed.']}
# ── create or update admin user ────────────────────────────────
# The installer may have bootstrapped an admin account from a
# generated password. The wizard's job is to set the real password,
# so update it if the account already exists.
ok = self.auth_manager.create_user(
username='admin',
password=password,
role='admin',
)
if not ok:
ok = self.auth_manager.update_password('admin', password)
if not ok:
return {'success': False, 'errors': ['Failed to set admin password.']}
# ── persist identity fields ────────────────────────────────────
self.config_manager.set_identity_field('cell_name', cell_name)
self.config_manager.set_identity_field('domain_mode', domain_mode)
if domain_name:
self.config_manager.set_identity_field('domain_name', domain_name)
self.config_manager.set_identity_field('timezone', timezone)
self.config_manager.set_identity_field('services_enabled', services_enabled)
self.config_manager.set_identity_field('ddns_provider', ddns_provider)
if cloudflare_api_token:
self.config_manager.set_identity_field('cloudflare_api_token', cloudflare_api_token)
if duckdns_token:
self.config_manager.set_identity_field('duckdns_token', duckdns_token)
logger.info(
'DDNS registration deferred to Phase 3. '
f'ddns_provider={ddns_provider!r} domain_name={domain_name!r}'
)
# ── mark setup complete (must be last) ─────────────────────────
self.config_manager.set_identity_field('setup_complete', True)
logger.info(f"Setup completed. cell_name={cell_name!r}, domain_mode={domain_mode!r}")
return {'success': True, 'redirect': '/login'}
finally:
try:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
except Exception:
pass