#!/usr/bin/env python3 """ AuthManager — local user store for PIC API. Manages admin and peer accounts, password hashing (bcrypt), account lockout, and bootstrap of the initial admin password. """ import os import json import re import threading import tempfile from datetime import datetime, timedelta from typing import Dict, List, Optional, Any import bcrypt from base_service_manager import BaseServiceManager USERNAME_RE = re.compile(r'^[a-z][a-z0-9_.-]{2,31}$') LOCKOUT_THRESHOLD = 5 LOCKOUT_DURATION = timedelta(minutes=15) def _utcnow_iso() -> str: return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') class AuthManager(BaseServiceManager): """Local authentication / authorization store.""" def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'): super().__init__('auth', data_dir=data_dir, config_dir=config_dir) self._users_file = os.path.join(data_dir, 'auth_users.json') self._lock = threading.RLock() self._ensure_file() try: self._bootstrap_admin_if_needed() except Exception as e: self.logger.warning(f'Admin bootstrap failed (non-fatal): {e}') # ── filesystem helpers ──────────────────────────────────────────────── def _ensure_file(self): try: os.makedirs(os.path.dirname(self._users_file), exist_ok=True) except Exception: pass def _load_users(self) -> List[Dict[str, Any]]: with self._lock: try: with open(self._users_file, 'r') as f: data = json.load(f) if isinstance(data, list): return data return [] except FileNotFoundError: return [] except Exception as e: self.logger.error(f'Failed to load users: {e}') return [] def _save_users(self, users: List[Dict[str, Any]]): with self._lock: directory = os.path.dirname(self._users_file) or '.' fd, tmp_path = tempfile.mkstemp(prefix='.auth_users.', dir=directory) try: with os.fdopen(fd, 'w') as f: json.dump(users, f, indent=2) try: os.chmod(tmp_path, 0o600) except Exception: pass os.replace(tmp_path, self._users_file) except Exception: try: os.unlink(tmp_path) except Exception: pass raise # ── bootstrap ───────────────────────────────────────────────────────── def _bootstrap_admin_if_needed(self): users = self._load_users() init_pw_path = os.path.join(self.data_dir, '.admin_initial_password') has_admin = any(u.get('role') == 'admin' for u in users) if has_admin: # Remove plaintext file even when admin already exists (security hygiene) if os.path.exists(init_pw_path): try: os.unlink(init_pw_path) except Exception: pass return if not os.path.exists(init_pw_path): return try: with open(init_pw_path, 'r') as f: password = f.read().strip() if not password: return ok = self.create_user('admin', password, 'admin') if ok: self.logger.info('Bootstrapped initial admin user from .admin_initial_password') try: os.unlink(init_pw_path) except Exception as e: self.logger.warning(f'Could not delete init password file: {e}') except Exception as e: self.logger.error(f'Admin bootstrap failed: {e}') # ── user CRUD ───────────────────────────────────────────────────────── @staticmethod def _strip_hash(user: Dict[str, Any]) -> Dict[str, Any]: clean = {k: v for k, v in user.items() if k != 'password_hash'} return clean def _hash_password(self, password: str) -> str: return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(rounds=12)).decode('utf-8') def create_user(self, username: str, password: str, role: str, peer_name: Optional[str] = None) -> bool: if role not in ('admin', 'peer'): self.logger.warning(f'Invalid role: {role}') return False if not username or not USERNAME_RE.match(username): self.logger.warning(f'Invalid username: {username}') return False if not password or len(password) < 1: self.logger.warning('Empty password rejected') return False with self._lock: users = self._load_users() if any(u.get('username') == username for u in users): self.logger.warning(f'Duplicate username: {username}') return False now = _utcnow_iso() if role == 'peer': peer_name = username must_change = True else: peer_name = None must_change = False user = { 'username': username, 'role': role, 'peer_name': peer_name, 'password_hash': self._hash_password(password), 'created_at': now, 'updated_at': now, 'last_login_at': None, 'failed_attempts': 0, 'locked_until': None, 'must_change_password': must_change, } users.append(user) try: self._save_users(users) self.logger.info(f'Created user: {username} (role={role})') return True except Exception as e: self.logger.error(f'create_user save failed: {e}') return False def delete_user(self, username: str) -> bool: with self._lock: users = self._load_users() target = next((u for u in users if u.get('username') == username), None) if not target: return False if target.get('role') == 'admin': admins = [u for u in users if u.get('role') == 'admin'] if len(admins) <= 1: self.logger.warning('Refusing to delete last admin user') return False new_users = [u for u in users if u.get('username') != username] try: self._save_users(new_users) self.logger.info(f'Deleted user: {username}') return True except Exception as e: self.logger.error(f'delete_user save failed: {e}') return False def get_user(self, username: str) -> Optional[Dict[str, Any]]: users = self._load_users() for u in users: if u.get('username') == username: return self._strip_hash(u) return None def list_users(self) -> List[Dict[str, Any]]: return [self._strip_hash(u) for u in self._load_users()] # ── auth operations ─────────────────────────────────────────────────── def _is_locked(self, user: Dict[str, Any]) -> bool: locked_until = user.get('locked_until') if not locked_until: return False try: until = datetime.strptime(locked_until, '%Y-%m-%dT%H:%M:%SZ') except Exception: return False return datetime.utcnow() < until def verify_password(self, username: str, password: str) -> Optional[Dict[str, Any]]: with self._lock: users = self._load_users() idx = next((i for i, u in enumerate(users) if u.get('username') == username), None) if idx is None: return None user = users[idx] if self._is_locked(user): self.logger.warning(f'Login blocked — account locked: {username}') return None stored = user.get('password_hash', '') ok = False try: if stored: ok = bcrypt.checkpw(password.encode('utf-8'), stored.encode('utf-8')) except Exception as e: self.logger.error(f'bcrypt check failed for {username}: {e}') ok = False now = _utcnow_iso() if ok: user['failed_attempts'] = 0 user['locked_until'] = None user['last_login_at'] = now users[idx] = user try: self._save_users(users) except Exception as e: self.logger.error(f'save after success failed: {e}') return self._strip_hash(user) # failure user['failed_attempts'] = int(user.get('failed_attempts', 0)) + 1 if user['failed_attempts'] >= LOCKOUT_THRESHOLD: user['locked_until'] = (datetime.utcnow() + LOCKOUT_DURATION).strftime('%Y-%m-%dT%H:%M:%SZ') self.logger.warning(f'Account locked: {username}') users[idx] = user try: self._save_users(users) except Exception as e: self.logger.error(f'save after failure failed: {e}') return None def change_password(self, username: str, old_password: str, new_password: str) -> bool: if not new_password: return False with self._lock: users = self._load_users() idx = next((i for i, u in enumerate(users) if u.get('username') == username), None) if idx is None: return False user = users[idx] if self._is_locked(user): return False stored = user.get('password_hash', '') try: if not stored or not bcrypt.checkpw(old_password.encode('utf-8'), stored.encode('utf-8')): return False except Exception: return False user['password_hash'] = self._hash_password(new_password) user['updated_at'] = _utcnow_iso() user['must_change_password'] = False user['failed_attempts'] = 0 user['locked_until'] = None users[idx] = user try: self._save_users(users) self.logger.info(f'Password changed: {username}') return True except Exception as e: self.logger.error(f'change_password save failed: {e}') return False def set_password_admin(self, username: str, new_password: str) -> bool: if not new_password: return False with self._lock: users = self._load_users() idx = next((i for i, u in enumerate(users) if u.get('username') == username), None) if idx is None: return False user = users[idx] user['password_hash'] = self._hash_password(new_password) user['updated_at'] = _utcnow_iso() user['failed_attempts'] = 0 user['locked_until'] = None user['must_change_password'] = True users[idx] = user try: self._save_users(users) self.logger.info(f'Admin reset password for: {username}') return True except Exception as e: self.logger.error(f'set_password_admin save failed: {e}') return False # ── BaseServiceManager interface ────────────────────────────────────── def get_status(self) -> Dict[str, Any]: users = self._load_users() return { 'users': len(users), 'has_admin': any(u.get('role') == 'admin' for u in users), } def test_connectivity(self) -> Dict[str, Any]: return {'ok': True} def get_config(self) -> Dict[str, Any]: return {} def update_config(self, config: Dict[str, Any]) -> bool: return True def validate_config(self, config: Dict[str, Any]) -> bool: return True def get_logs(self, lines: int = 50) -> List[str]: return [] def restart_service(self) -> bool: return True