#!/usr/bin/env python3 """ SetupManager — first-run wizard backend for PIC. Handles validation, locking, and atomic completion of the initial setup wizard. Called by api/routes/setup.py. """ import fcntl import logging import os import re from typing import Any, Dict, List logger = logging.getLogger(__name__) # Top 30 representative IANA time zones shown in the wizard AVAILABLE_TIMEZONES = [ 'UTC', 'America/New_York', 'America/Chicago', 'America/Denver', 'America/Los_Angeles', 'America/Anchorage', 'America/Honolulu', 'America/Sao_Paulo', 'America/Argentina/Buenos_Aires', 'America/Toronto', 'America/Vancouver', 'America/Mexico_City', 'Europe/London', 'Europe/Paris', 'Europe/Berlin', 'Europe/Madrid', 'Europe/Rome', 'Europe/Amsterdam', 'Europe/Moscow', 'Europe/Istanbul', 'Africa/Cairo', 'Africa/Johannesburg', 'Asia/Dubai', 'Asia/Kolkata', 'Asia/Bangkok', 'Asia/Shanghai', 'Asia/Tokyo', 'Asia/Seoul', 'Australia/Sydney', 'Pacific/Auckland', ] AVAILABLE_SERVICES = [ 'email', 'calendar', 'files', 'wireguard', ] VALID_DOMAIN_MODES = {'pic_ngo', 'cloudflare', 'duckdns', 'http01', 'lan'} CELL_NAME_RE = re.compile(r'^[a-z][a-z0-9-]{1,30}$') class SetupManager: """Manages the first-run setup wizard state and completion.""" def __init__(self, config_manager, auth_manager): self.config_manager = config_manager self.auth_manager = auth_manager # ── state helpers ───────────────────────────────────────────────────── def is_setup_complete(self) -> bool: """Return True if setup has already been completed.""" return bool(self.config_manager.get_identity().get('setup_complete', False)) def get_setup_status(self) -> Dict[str, Any]: """Return current setup status and wizard metadata.""" return { 'complete': self.is_setup_complete(), 'available_services': AVAILABLE_SERVICES, 'available_timezones': AVAILABLE_TIMEZONES, } # ── validation ──────────────────────────────────────────────────────── def validate_cell_name(self, name: str) -> List[str]: """Validate a proposed cell name. Returns a list of error strings.""" errors: List[str] = [] if not name: errors.append('Cell name is required.') return errors if not CELL_NAME_RE.match(name): errors.append( 'Cell name must start with a lowercase letter, be 2–31 characters, ' 'and contain only lowercase letters, digits, and hyphens.' ) if name.startswith('-') or name.endswith('-'): errors.append('Cell name must not start or end with a hyphen.') return errors def validate_password(self, password: str) -> List[str]: """Validate admin password strength. Returns a list of error strings.""" errors: List[str] = [] if not password: errors.append('Password is required.') return errors if len(password) < 12: errors.append('Password must be at least 12 characters long.') if not re.search(r'[A-Z]', password): errors.append('Password must contain at least one uppercase letter.') if not re.search(r'[a-z]', password): errors.append('Password must contain at least one lowercase letter.') if not re.search(r'\d', password): errors.append('Password must contain at least one digit.') return errors # ── main completion ─────────────────────────────────────────────────── def complete_setup(self, payload: Dict[str, Any]) -> Dict[str, Any]: """Run all validation, then atomically complete the setup wizard. Returns ``{'success': True, 'redirect': '/login'}`` on success or ``{'success': False, 'errors': [...]}`` on any failure. """ errors: List[str] = [] # ── validate inputs ──────────────────────────────────────────────── cell_name = payload.get('cell_name', '') password = payload.get('password', '') domain_mode = payload.get('domain_mode', '') timezone = payload.get('timezone', '') services_enabled = payload.get('services_enabled', []) ddns_provider = payload.get('ddns_provider', 'none') errors.extend(self.validate_cell_name(cell_name)) errors.extend(self.validate_password(password)) if domain_mode not in VALID_DOMAIN_MODES: errors.append( f"domain_mode must be one of: {', '.join(sorted(VALID_DOMAIN_MODES))}." ) if not timezone or not isinstance(timezone, str): errors.append('timezone is required.') if not isinstance(services_enabled, list): errors.append('services_enabled must be a list.') if errors: return {'success': False, 'errors': errors} # ── acquire file lock to prevent double-completion ───────────────── lock_path = os.path.join( os.environ.get('DATA_DIR', '/app/data'), 'api', '.setup.lock' ) try: os.makedirs(os.path.dirname(lock_path), exist_ok=True) except OSError: pass try: lock_fd = open(lock_path, 'w') fcntl.flock(lock_fd, fcntl.LOCK_EX) except OSError as exc: logger.error(f'Could not acquire setup lock: {exc}') return {'success': False, 'errors': ['Setup lock could not be acquired. Try again.']} try: # Re-check inside lock if self.is_setup_complete(): return {'success': False, 'errors': ['Setup has already been completed.']} # ── create or update admin user ──────────────────────────────── # The installer may have bootstrapped an admin account from a # generated password. The wizard's job is to set the real password, # so update it if the account already exists. ok = self.auth_manager.create_user( username='admin', password=password, role='admin', ) if not ok: ok = self.auth_manager.update_password('admin', password) if not ok: return {'success': False, 'errors': ['Failed to set admin password.']} # ── persist identity fields ──────────────────────────────────── self.config_manager.set_identity_field('cell_name', cell_name) self.config_manager.set_identity_field('domain_mode', domain_mode) self.config_manager.set_identity_field('timezone', timezone) self.config_manager.set_identity_field('services_enabled', services_enabled) self.config_manager.set_identity_field('ddns_provider', ddns_provider) # NOTE: DDNS registration is deferred to Phase 3. # For now we just store ddns_provider in config. logger.info( 'DDNS registration skipped (Phase 1). ' 'DDNS registration will happen in Phase 3. ' f'ddns_provider={ddns_provider!r} stored in identity config.' ) # ── mark setup complete (must be last) ───────────────────────── self.config_manager.set_identity_field('setup_complete', True) logger.info(f"Setup completed. cell_name={cell_name!r}, domain_mode={domain_mode!r}") return {'success': True, 'redirect': '/login'} finally: try: fcntl.flock(lock_fd, fcntl.LOCK_UN) lock_fd.close() except Exception: pass