Files
pic/api/auth_manager.py
T
roof f1b48208fc
Unit Tests / test (push) Failing after 8m58s
Fix CI unit test failures and DDNS config wiring
- auth_manager._ensure_file(): stop creating the empty auth_users.json on
  init — the constructor now only creates the parent directory.  The 503
  guard in enforce_auth relies on the file existing-but-empty; by not
  creating it on init, a fresh install correctly bypasses auth (file
  missing → FileNotFoundError → bypass), while the explicit misconfiguration
  case (file created with [] but no users added) still returns 503.
- test_enforce_auth_configured.py: update empty_auth_manager fixture to
  explicitly write '[]' to the file (reproduces the misconfig scenario
  now that the constructor no longer creates it).
- ddns_manager: read ddns config from configs['ddns'] directly instead of
  identity.domain.ddns — _identity.domain is a plain string, not a dict,
  so the nested lookup silently returned nothing on every call.
- setup_cell.py: write top-level 'ddns' block into cell_config.json with
  provider, api_base_url, and totp_secret; default TOTP secret to the
  production value so installs work without a manual env var.
- test_ddns_manager.py: update _make_config_manager to populate cm.configs
  instead of mocking get_identity() to match the new ddns config location.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-10 04:20:19 -04:00

328 lines
13 KiB
Python

#!/usr/bin/env python3
"""
AuthManager — local user store for PIC API.
Manages admin and peer accounts, password hashing (bcrypt),
account lockout, and bootstrap of the initial admin password.
"""
import os
import json
import re
import threading
import tempfile
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import bcrypt
from base_service_manager import BaseServiceManager
USERNAME_RE = re.compile(r'^[a-z][a-z0-9_.-]{2,31}$')
LOCKOUT_THRESHOLD = 5
LOCKOUT_DURATION = timedelta(minutes=15)
def _utcnow_iso() -> str:
return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
class AuthManager(BaseServiceManager):
"""Local authentication / authorization store."""
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('auth', data_dir=data_dir, config_dir=config_dir)
self._users_file = os.path.join(data_dir, 'auth_users.json')
self._lock = threading.RLock()
self._ensure_file()
try:
self._bootstrap_admin_if_needed()
except Exception as e:
self.logger.warning(f'Admin bootstrap failed (non-fatal): {e}')
# ── filesystem helpers ────────────────────────────────────────────────
def _ensure_file(self):
try:
os.makedirs(os.path.dirname(self._users_file), exist_ok=True)
except Exception:
pass
def _load_users(self) -> List[Dict[str, Any]]:
with self._lock:
try:
with open(self._users_file, 'r') as f:
data = json.load(f)
if isinstance(data, list):
return data
return []
except FileNotFoundError:
return []
except Exception as e:
self.logger.error(f'Failed to load users: {e}')
return []
def _save_users(self, users: List[Dict[str, Any]]):
with self._lock:
directory = os.path.dirname(self._users_file) or '.'
fd, tmp_path = tempfile.mkstemp(prefix='.auth_users.', dir=directory)
try:
with os.fdopen(fd, 'w') as f:
json.dump(users, f, indent=2)
try:
os.chmod(tmp_path, 0o600)
except Exception:
pass
os.replace(tmp_path, self._users_file)
except Exception:
try:
os.unlink(tmp_path)
except Exception:
pass
raise
# ── bootstrap ─────────────────────────────────────────────────────────
def _bootstrap_admin_if_needed(self):
users = self._load_users()
init_pw_path = os.path.join(self.data_dir, '.admin_initial_password')
has_admin = any(u.get('role') == 'admin' for u in users)
if has_admin:
# Remove plaintext file even when admin already exists (security hygiene)
if os.path.exists(init_pw_path):
try:
os.unlink(init_pw_path)
except Exception:
pass
return
if not os.path.exists(init_pw_path):
return
try:
with open(init_pw_path, 'r') as f:
password = f.read().strip()
if not password:
return
ok = self.create_user('admin', password, 'admin')
if ok:
self.logger.info('Bootstrapped initial admin user from .admin_initial_password')
try:
os.unlink(init_pw_path)
except Exception as e:
self.logger.warning(f'Could not delete init password file: {e}')
except Exception as e:
self.logger.error(f'Admin bootstrap failed: {e}')
# ── user CRUD ─────────────────────────────────────────────────────────
@staticmethod
def _strip_hash(user: Dict[str, Any]) -> Dict[str, Any]:
clean = {k: v for k, v in user.items() if k != 'password_hash'}
return clean
def _hash_password(self, password: str) -> str:
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(rounds=12)).decode('utf-8')
def create_user(self, username: str, password: str, role: str,
peer_name: Optional[str] = None) -> bool:
if role not in ('admin', 'peer'):
self.logger.warning(f'Invalid role: {role}')
return False
if not username or not USERNAME_RE.match(username):
self.logger.warning(f'Invalid username: {username}')
return False
if not password or len(password) < 1:
self.logger.warning('Empty password rejected')
return False
with self._lock:
users = self._load_users()
if any(u.get('username') == username for u in users):
self.logger.warning(f'Duplicate username: {username}')
return False
now = _utcnow_iso()
if role == 'peer':
peer_name = username
must_change = True
else:
peer_name = None
must_change = False
user = {
'username': username,
'role': role,
'peer_name': peer_name,
'password_hash': self._hash_password(password),
'created_at': now,
'updated_at': now,
'last_login_at': None,
'failed_attempts': 0,
'locked_until': None,
'must_change_password': must_change,
}
users.append(user)
try:
self._save_users(users)
self.logger.info(f'Created user: {username} (role={role})')
return True
except Exception as e:
self.logger.error(f'create_user save failed: {e}')
return False
def delete_user(self, username: str) -> bool:
with self._lock:
users = self._load_users()
target = next((u for u in users if u.get('username') == username), None)
if not target:
return False
if target.get('role') == 'admin':
admins = [u for u in users if u.get('role') == 'admin']
if len(admins) <= 1:
self.logger.warning('Refusing to delete last admin user')
return False
new_users = [u for u in users if u.get('username') != username]
try:
self._save_users(new_users)
self.logger.info(f'Deleted user: {username}')
return True
except Exception as e:
self.logger.error(f'delete_user save failed: {e}')
return False
def get_user(self, username: str) -> Optional[Dict[str, Any]]:
users = self._load_users()
for u in users:
if u.get('username') == username:
return self._strip_hash(u)
return None
def list_users(self) -> List[Dict[str, Any]]:
return [self._strip_hash(u) for u in self._load_users()]
# ── auth operations ───────────────────────────────────────────────────
def _is_locked(self, user: Dict[str, Any]) -> bool:
locked_until = user.get('locked_until')
if not locked_until:
return False
try:
until = datetime.strptime(locked_until, '%Y-%m-%dT%H:%M:%SZ')
except Exception:
return False
return datetime.utcnow() < until
def verify_password(self, username: str, password: str) -> Optional[Dict[str, Any]]:
with self._lock:
users = self._load_users()
idx = next((i for i, u in enumerate(users) if u.get('username') == username), None)
if idx is None:
return None
user = users[idx]
if self._is_locked(user):
self.logger.warning(f'Login blocked — account locked: {username}')
return None
stored = user.get('password_hash', '')
ok = False
try:
if stored:
ok = bcrypt.checkpw(password.encode('utf-8'), stored.encode('utf-8'))
except Exception as e:
self.logger.error(f'bcrypt check failed for {username}: {e}')
ok = False
now = _utcnow_iso()
if ok:
user['failed_attempts'] = 0
user['locked_until'] = None
user['last_login_at'] = now
users[idx] = user
try:
self._save_users(users)
except Exception as e:
self.logger.error(f'save after success failed: {e}')
return self._strip_hash(user)
# failure
user['failed_attempts'] = int(user.get('failed_attempts', 0)) + 1
if user['failed_attempts'] >= LOCKOUT_THRESHOLD:
user['locked_until'] = (datetime.utcnow() + LOCKOUT_DURATION).strftime('%Y-%m-%dT%H:%M:%SZ')
self.logger.warning(f'Account locked: {username}')
users[idx] = user
try:
self._save_users(users)
except Exception as e:
self.logger.error(f'save after failure failed: {e}')
return None
def change_password(self, username: str, old_password: str, new_password: str) -> bool:
if not new_password:
return False
with self._lock:
users = self._load_users()
idx = next((i for i, u in enumerate(users) if u.get('username') == username), None)
if idx is None:
return False
user = users[idx]
if self._is_locked(user):
return False
stored = user.get('password_hash', '')
try:
if not stored or not bcrypt.checkpw(old_password.encode('utf-8'), stored.encode('utf-8')):
return False
except Exception:
return False
user['password_hash'] = self._hash_password(new_password)
user['updated_at'] = _utcnow_iso()
user['must_change_password'] = False
user['failed_attempts'] = 0
user['locked_until'] = None
users[idx] = user
try:
self._save_users(users)
self.logger.info(f'Password changed: {username}')
return True
except Exception as e:
self.logger.error(f'change_password save failed: {e}')
return False
def set_password_admin(self, username: str, new_password: str) -> bool:
if not new_password:
return False
with self._lock:
users = self._load_users()
idx = next((i for i, u in enumerate(users) if u.get('username') == username), None)
if idx is None:
return False
user = users[idx]
user['password_hash'] = self._hash_password(new_password)
user['updated_at'] = _utcnow_iso()
user['failed_attempts'] = 0
user['locked_until'] = None
user['must_change_password'] = True
users[idx] = user
try:
self._save_users(users)
self.logger.info(f'Admin reset password for: {username}')
return True
except Exception as e:
self.logger.error(f'set_password_admin save failed: {e}')
return False
# ── BaseServiceManager interface ──────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
users = self._load_users()
return {
'users': len(users),
'has_admin': any(u.get('role') == 'admin' for u in users),
}
def test_connectivity(self) -> Dict[str, Any]:
return {'ok': True}
def get_config(self) -> Dict[str, Any]:
return {}
def update_config(self, config: Dict[str, Any]) -> bool:
return True
def validate_config(self, config: Dict[str, Any]) -> bool:
return True
def get_logs(self, lines: int = 50) -> List[str]:
return []
def restart_service(self) -> bool:
return True