8650704316
Backend: - AuthManager (api/auth_manager.py): server-side user store with bcrypt password hashing, account lockout after 5 failed attempts (15 min), and atomic file writes - AuthRoutes (api/auth_routes.py): Blueprint at /api/auth/* — login, logout, me, change-password, admin reset-password, list-users - app.py: register auth_bp blueprint; add enforce_auth before_request hook (401 for unauthenticated, 403 for wrong role; only active when auth store has users so pre-auth tests remain green); instantiate AuthManager; update POST /api/peers to require password >= 10 chars and auto-provision email + calendar + files + auth accounts with full rollback on any failure; extend DELETE /api/peers to tear down all four service accounts; add /api/peer/dashboard and /api/peer/services peer-scoped routes; fix is_local_request to also trust the last X-Forwarded-For entry appended by the reverse proxy (Caddy) - Role-based access: admin for /api/* (except /api/auth/* which is public and /api/peer/* which is peer-only) - setup_cell.py: generate and print initial admin password, store in .admin_initial_password with 0600 permissions; cleaned up on first admin login Frontend: - AuthContext.jsx: React context with login/logout/me state and Axios interceptor for automatic 401 redirect - PrivateRoute.jsx: route guard component - Login.jsx: login page with error handling and must-change-password redirect - AccountSettings.jsx: change-password form for any authenticated user - PeerDashboard.jsx: peer-role landing page (IP, service list) - MyServices.jsx: peer service links page - App.jsx, Sidebar.jsx: AuthContext integration, logout button, PrivateRoute wrappers, peer-role routing - Peers.jsx, WireGuard.jsx, api.js: auth-aware API calls Tests: 100 new auth tests all pass (test_auth_manager, test_auth_routes, test_route_protection, test_peer_provisioning). Fix pre-existing test failures: update WireGuard test keys to valid 44-char base64 format (test_wireguard_manager, test_peer_wg_integration), add password field and service manager mocks to test_api_endpoints peer tests, add auth helpers to conftest.py. Full suite: 845 passed, 0 failures. Fixed: .admin_initial_password security cleanup on bootstrap, username minimum length (3 chars enforced by USERNAME_RE regex) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
338 lines
13 KiB
Python
338 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AuthManager — local user store for PIC API.
|
|
|
|
Manages admin and peer accounts, password hashing (bcrypt),
|
|
account lockout, and bootstrap of the initial admin password.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import re
|
|
import threading
|
|
import tempfile
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
import bcrypt
|
|
|
|
from base_service_manager import BaseServiceManager
|
|
|
|
|
|
USERNAME_RE = re.compile(r'^[a-z][a-z0-9_.-]{2,31}$')
|
|
LOCKOUT_THRESHOLD = 5
|
|
LOCKOUT_DURATION = timedelta(minutes=15)
|
|
|
|
|
|
def _utcnow_iso() -> str:
|
|
return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
|
|
class AuthManager(BaseServiceManager):
|
|
"""Local authentication / authorization store."""
|
|
|
|
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
|
|
super().__init__('auth', data_dir=data_dir, config_dir=config_dir)
|
|
self._users_file = os.path.join(data_dir, 'auth_users.json')
|
|
self._lock = threading.RLock()
|
|
self._ensure_file()
|
|
try:
|
|
self._bootstrap_admin_if_needed()
|
|
except Exception as e:
|
|
self.logger.warning(f'Admin bootstrap failed (non-fatal): {e}')
|
|
|
|
# ── filesystem helpers ────────────────────────────────────────────────
|
|
def _ensure_file(self):
|
|
try:
|
|
os.makedirs(os.path.dirname(self._users_file), exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
if not os.path.exists(self._users_file):
|
|
try:
|
|
with open(self._users_file, 'w') as f:
|
|
f.write('[]')
|
|
try:
|
|
os.chmod(self._users_file, 0o600)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
self.logger.error(f'Could not create users file: {e}')
|
|
|
|
def _load_users(self) -> List[Dict[str, Any]]:
|
|
with self._lock:
|
|
try:
|
|
with open(self._users_file, 'r') as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
return data
|
|
return []
|
|
except FileNotFoundError:
|
|
return []
|
|
except Exception as e:
|
|
self.logger.error(f'Failed to load users: {e}')
|
|
return []
|
|
|
|
def _save_users(self, users: List[Dict[str, Any]]):
|
|
with self._lock:
|
|
directory = os.path.dirname(self._users_file) or '.'
|
|
fd, tmp_path = tempfile.mkstemp(prefix='.auth_users.', dir=directory)
|
|
try:
|
|
with os.fdopen(fd, 'w') as f:
|
|
json.dump(users, f, indent=2)
|
|
try:
|
|
os.chmod(tmp_path, 0o600)
|
|
except Exception:
|
|
pass
|
|
os.replace(tmp_path, self._users_file)
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
# ── bootstrap ─────────────────────────────────────────────────────────
|
|
def _bootstrap_admin_if_needed(self):
|
|
users = self._load_users()
|
|
init_pw_path = os.path.join(self.data_dir, '.admin_initial_password')
|
|
has_admin = any(u.get('role') == 'admin' for u in users)
|
|
if has_admin:
|
|
# Remove plaintext file even when admin already exists (security hygiene)
|
|
if os.path.exists(init_pw_path):
|
|
try:
|
|
os.unlink(init_pw_path)
|
|
except Exception:
|
|
pass
|
|
return
|
|
if not os.path.exists(init_pw_path):
|
|
return
|
|
try:
|
|
with open(init_pw_path, 'r') as f:
|
|
password = f.read().strip()
|
|
if not password:
|
|
return
|
|
ok = self.create_user('admin', password, 'admin')
|
|
if ok:
|
|
self.logger.info('Bootstrapped initial admin user from .admin_initial_password')
|
|
try:
|
|
os.unlink(init_pw_path)
|
|
except Exception as e:
|
|
self.logger.warning(f'Could not delete init password file: {e}')
|
|
except Exception as e:
|
|
self.logger.error(f'Admin bootstrap failed: {e}')
|
|
|
|
# ── user CRUD ─────────────────────────────────────────────────────────
|
|
@staticmethod
|
|
def _strip_hash(user: Dict[str, Any]) -> Dict[str, Any]:
|
|
clean = {k: v for k, v in user.items() if k != 'password_hash'}
|
|
return clean
|
|
|
|
def _hash_password(self, password: str) -> str:
|
|
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(rounds=12)).decode('utf-8')
|
|
|
|
def create_user(self, username: str, password: str, role: str,
|
|
peer_name: Optional[str] = None) -> bool:
|
|
if role not in ('admin', 'peer'):
|
|
self.logger.warning(f'Invalid role: {role}')
|
|
return False
|
|
if not username or not USERNAME_RE.match(username):
|
|
self.logger.warning(f'Invalid username: {username}')
|
|
return False
|
|
if not password or len(password) < 1:
|
|
self.logger.warning('Empty password rejected')
|
|
return False
|
|
with self._lock:
|
|
users = self._load_users()
|
|
if any(u.get('username') == username for u in users):
|
|
self.logger.warning(f'Duplicate username: {username}')
|
|
return False
|
|
now = _utcnow_iso()
|
|
if role == 'peer':
|
|
peer_name = username
|
|
must_change = True
|
|
else:
|
|
peer_name = None
|
|
must_change = False
|
|
user = {
|
|
'username': username,
|
|
'role': role,
|
|
'peer_name': peer_name,
|
|
'password_hash': self._hash_password(password),
|
|
'created_at': now,
|
|
'updated_at': now,
|
|
'last_login_at': None,
|
|
'failed_attempts': 0,
|
|
'locked_until': None,
|
|
'must_change_password': must_change,
|
|
}
|
|
users.append(user)
|
|
try:
|
|
self._save_users(users)
|
|
self.logger.info(f'Created user: {username} (role={role})')
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f'create_user save failed: {e}')
|
|
return False
|
|
|
|
def delete_user(self, username: str) -> bool:
|
|
with self._lock:
|
|
users = self._load_users()
|
|
target = next((u for u in users if u.get('username') == username), None)
|
|
if not target:
|
|
return False
|
|
if target.get('role') == 'admin':
|
|
admins = [u for u in users if u.get('role') == 'admin']
|
|
if len(admins) <= 1:
|
|
self.logger.warning('Refusing to delete last admin user')
|
|
return False
|
|
new_users = [u for u in users if u.get('username') != username]
|
|
try:
|
|
self._save_users(new_users)
|
|
self.logger.info(f'Deleted user: {username}')
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f'delete_user save failed: {e}')
|
|
return False
|
|
|
|
def get_user(self, username: str) -> Optional[Dict[str, Any]]:
|
|
users = self._load_users()
|
|
for u in users:
|
|
if u.get('username') == username:
|
|
return self._strip_hash(u)
|
|
return None
|
|
|
|
def list_users(self) -> List[Dict[str, Any]]:
|
|
return [self._strip_hash(u) for u in self._load_users()]
|
|
|
|
# ── auth operations ───────────────────────────────────────────────────
|
|
def _is_locked(self, user: Dict[str, Any]) -> bool:
|
|
locked_until = user.get('locked_until')
|
|
if not locked_until:
|
|
return False
|
|
try:
|
|
until = datetime.strptime(locked_until, '%Y-%m-%dT%H:%M:%SZ')
|
|
except Exception:
|
|
return False
|
|
return datetime.utcnow() < until
|
|
|
|
def verify_password(self, username: str, password: str) -> Optional[Dict[str, Any]]:
|
|
with self._lock:
|
|
users = self._load_users()
|
|
idx = next((i for i, u in enumerate(users) if u.get('username') == username), None)
|
|
if idx is None:
|
|
return None
|
|
user = users[idx]
|
|
if self._is_locked(user):
|
|
self.logger.warning(f'Login blocked — account locked: {username}')
|
|
return None
|
|
stored = user.get('password_hash', '')
|
|
ok = False
|
|
try:
|
|
if stored:
|
|
ok = bcrypt.checkpw(password.encode('utf-8'), stored.encode('utf-8'))
|
|
except Exception as e:
|
|
self.logger.error(f'bcrypt check failed for {username}: {e}')
|
|
ok = False
|
|
now = _utcnow_iso()
|
|
if ok:
|
|
user['failed_attempts'] = 0
|
|
user['locked_until'] = None
|
|
user['last_login_at'] = now
|
|
users[idx] = user
|
|
try:
|
|
self._save_users(users)
|
|
except Exception as e:
|
|
self.logger.error(f'save after success failed: {e}')
|
|
return self._strip_hash(user)
|
|
# failure
|
|
user['failed_attempts'] = int(user.get('failed_attempts', 0)) + 1
|
|
if user['failed_attempts'] >= LOCKOUT_THRESHOLD:
|
|
user['locked_until'] = (datetime.utcnow() + LOCKOUT_DURATION).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
self.logger.warning(f'Account locked: {username}')
|
|
users[idx] = user
|
|
try:
|
|
self._save_users(users)
|
|
except Exception as e:
|
|
self.logger.error(f'save after failure failed: {e}')
|
|
return None
|
|
|
|
def change_password(self, username: str, old_password: str, new_password: str) -> bool:
|
|
if not new_password:
|
|
return False
|
|
with self._lock:
|
|
users = self._load_users()
|
|
idx = next((i for i, u in enumerate(users) if u.get('username') == username), None)
|
|
if idx is None:
|
|
return False
|
|
user = users[idx]
|
|
if self._is_locked(user):
|
|
return False
|
|
stored = user.get('password_hash', '')
|
|
try:
|
|
if not stored or not bcrypt.checkpw(old_password.encode('utf-8'), stored.encode('utf-8')):
|
|
return False
|
|
except Exception:
|
|
return False
|
|
user['password_hash'] = self._hash_password(new_password)
|
|
user['updated_at'] = _utcnow_iso()
|
|
user['must_change_password'] = False
|
|
user['failed_attempts'] = 0
|
|
user['locked_until'] = None
|
|
users[idx] = user
|
|
try:
|
|
self._save_users(users)
|
|
self.logger.info(f'Password changed: {username}')
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f'change_password save failed: {e}')
|
|
return False
|
|
|
|
def set_password_admin(self, username: str, new_password: str) -> bool:
|
|
if not new_password:
|
|
return False
|
|
with self._lock:
|
|
users = self._load_users()
|
|
idx = next((i for i, u in enumerate(users) if u.get('username') == username), None)
|
|
if idx is None:
|
|
return False
|
|
user = users[idx]
|
|
user['password_hash'] = self._hash_password(new_password)
|
|
user['updated_at'] = _utcnow_iso()
|
|
user['failed_attempts'] = 0
|
|
user['locked_until'] = None
|
|
user['must_change_password'] = True
|
|
users[idx] = user
|
|
try:
|
|
self._save_users(users)
|
|
self.logger.info(f'Admin reset password for: {username}')
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f'set_password_admin save failed: {e}')
|
|
return False
|
|
|
|
# ── BaseServiceManager interface ──────────────────────────────────────
|
|
def get_status(self) -> Dict[str, Any]:
|
|
users = self._load_users()
|
|
return {
|
|
'users': len(users),
|
|
'has_admin': any(u.get('role') == 'admin' for u in users),
|
|
}
|
|
|
|
def test_connectivity(self) -> Dict[str, Any]:
|
|
return {'ok': True}
|
|
|
|
def get_config(self) -> Dict[str, Any]:
|
|
return {}
|
|
|
|
def update_config(self, config: Dict[str, Any]) -> bool:
|
|
return True
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> bool:
|
|
return True
|
|
|
|
def get_logs(self, lines: int = 50) -> List[str]:
|
|
return []
|
|
|
|
def restart_service(self) -> bool:
|
|
return True
|