Files
pic/api/account_manager.py
T
roof f3737acfa4
Unit Tests / test (push) Successful in 11m10s
fix: fall back to cell effective domain when email service domain not configured
When the email store service is installed but no explicit domain has been
set in its config, _provision_email now falls back to
config_manager.get_effective_domain() so peer account creation works
immediately without requiring a separate config step.

Also threads config_manager into AccountManager.__init__ (optional kwarg,
no existing callers break) so the fallback is available without a global
import.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 17:06:51 -04:00

299 lines
12 KiB
Python

"""
AccountManager — per-service credential provisioning for PIC peers.
Responsibilities:
- Dispatch account creation/deletion to each service's underlying manager
- Store per-peer per-service credentials securely (0o600 file)
- Provide credential retrieval for peer_config_template filling
- Bulk-deprovision a peer from all services on peer deletion
Credentials file format (data/peer_service_credentials.json):
{
"<service_id>": {
"<peer_username>": {"password": "..."}
}
}
Design note — plaintext passwords:
Credentials are stored in plaintext so the peer endpoint can return them to
the peer's device for one-time client configuration. The file is created with
0o600 so it is only readable by the process owner (same pattern used for
WireGuard keys and service_secrets.json).
"""
import json
import logging
import os
import secrets as _secrets_mod
import threading
from pathlib import Path
from typing import Dict, List, Optional
try:
import requests as _requests
except ImportError:
_requests = None
logger = logging.getLogger('picell')
_DISPATCH_PROVISION = {
'email_manager': '_provision_email',
'calendar_manager': '_provision_calendar',
'file_manager': '_provision_files',
}
_DISPATCH_DEPROVISION = {
'email_manager': '_deprovision_email',
'calendar_manager': '_deprovision_calendar',
'file_manager': '_deprovision_files',
}
_HTTP_TIMEOUT = 10
class AccountManager:
def __init__(self, service_registry, data_dir: str, config_manager=None, **managers):
"""
service_registry — ServiceRegistry instance
data_dir — host data directory (data/peer_service_credentials.json lives here)
config_manager — ConfigManager instance (used to resolve fallback email domain)
**managers — named manager instances: email_manager=..., calendar_manager=...,
file_manager=...
"""
self._registry = service_registry
self._creds_path = Path(data_dir) / 'peer_service_credentials.json'
self._config_manager = config_manager
self._managers = managers
self._lock = threading.Lock()
# ── Credential storage (0o600) ────────────────────────────────────────
def _load_creds(self) -> Dict:
if not self._creds_path.exists():
return {}
try:
with open(self._creds_path) as f:
return json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.warning('AccountManager: failed to load credentials: %s', e)
return {}
def _save_creds(self, creds: Dict) -> None:
tmp = str(self._creds_path) + '.tmp'
with open(tmp, 'w', opener=lambda path, flags: os.open(path, flags, 0o600)) as f:
json.dump(creds, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, str(self._creds_path))
# ── Per-manager provision / deprovision ───────────────────────────────
def _provision_email(self, manager, svc: Dict, peer_username: str, password: str) -> bool:
domain = (svc.get('config') or {}).get('domain', '')
if not domain and self._config_manager is not None:
domain = self._config_manager.get_effective_domain() or ''
if not domain:
raise ValueError("Email service has no 'domain' configured")
return manager.create_email_user(peer_username, domain, password)
def _deprovision_email(self, manager, svc: Dict, peer_username: str) -> bool:
domain = (svc.get('config') or {}).get('domain', '')
return manager.delete_email_user(peer_username, domain)
@staticmethod
def _provision_calendar(manager, _svc: Dict, peer_username: str, password: str) -> bool:
return manager.create_calendar_user(peer_username, password)
@staticmethod
def _deprovision_calendar(manager, _svc: Dict, peer_username: str) -> bool:
return manager.delete_calendar_user(peer_username)
@staticmethod
def _provision_files(manager, _svc: Dict, peer_username: str, password: str) -> bool:
return manager.create_user(peer_username, password)
@staticmethod
def _deprovision_files(manager, _svc: Dict, peer_username: str) -> bool:
return manager.delete_user(peer_username)
# ── HTTP dispatch (manager == "http") ────────────────────────────────
@staticmethod
def _http_base_url(svc: Dict) -> str:
"""Return the base URL for the service's /service-api endpoint."""
backend = svc.get('backend', '')
if not backend:
raise ValueError(f"Service {svc.get('id')!r} has no 'backend' configured")
return f'http://{backend}'
def _provision_http(self, svc: Dict, peer_username: str, password: str) -> bool:
if _requests is None:
raise RuntimeError('requests library is required for HTTP account dispatch')
url = self._http_base_url(svc) + '/service-api/accounts'
try:
resp = _requests.post(
url,
json={'username': peer_username, 'password': password},
timeout=_HTTP_TIMEOUT,
)
if resp.status_code in (200, 201):
return True
logger.warning('HTTP provision %s on %s returned %s: %s',
peer_username, svc.get('id'), resp.status_code, resp.text[:200])
return False
except Exception as exc:
raise RuntimeError(f'HTTP provision request failed: {exc}') from exc
def _deprovision_http(self, svc: Dict, peer_username: str) -> bool:
if _requests is None:
raise RuntimeError('requests library is required for HTTP account dispatch')
url = self._http_base_url(svc) + f'/service-api/accounts/{peer_username}'
try:
resp = _requests.delete(url, timeout=_HTTP_TIMEOUT)
if resp.status_code in (200, 204, 404):
return True
logger.warning('HTTP deprovision %s on %s returned %s: %s',
peer_username, svc.get('id'), resp.status_code, resp.text[:200])
return False
except Exception as exc:
raise RuntimeError(f'HTTP deprovision request failed: {exc}') from exc
# ── Service validation helper ─────────────────────────────────────────
def _resolve_service(self, service_id: str):
"""Return (svc, manager_name, manager) or raise ValueError.
manager is None when manager_name == 'http' — callers must check.
"""
svc = self._registry.get(service_id)
if svc is None:
raise ValueError(f'Unknown service: {service_id!r}')
accounts_cfg = svc.get('accounts') or {}
manager_name = accounts_cfg.get('manager')
if not manager_name:
raise ValueError(f'Service {service_id!r} does not support accounts')
if manager_name == 'http':
return svc, 'http', None
manager = self._managers.get(manager_name)
if manager is None:
raise ValueError(f'Manager {manager_name!r} is not registered with AccountManager')
return svc, manager_name, manager
# ── Public API ────────────────────────────────────────────────────────
def provision(self, service_id: str, peer_username: str,
password: str = None) -> Dict:
"""Create an account on the service for the peer; store and return credentials.
Raises ValueError if the service doesn't support accounts.
Raises RuntimeError if the underlying manager fails.
"""
svc, manager_name, manager = self._resolve_service(service_id)
if password is None:
password = _secrets_mod.token_urlsafe(16)
if manager_name == 'http':
ok = self._provision_http(svc, peer_username, password)
else:
dispatch = _DISPATCH_PROVISION.get(manager_name)
if dispatch is None:
raise ValueError(f'No provision dispatch for manager: {manager_name!r}')
ok = getattr(self, dispatch)(manager, svc, peer_username, password)
if not ok:
raise RuntimeError(
f'Provision of {peer_username!r} on {service_id!r} returned False — '
'check underlying service manager logs'
)
cred = {'password': password}
with self._lock:
all_creds = self._load_creds()
all_creds.setdefault(service_id, {})[peer_username] = cred
self._save_creds(all_creds)
logger.info('AccountManager: provisioned %s on %s', peer_username, service_id)
return cred
def deprovision(self, service_id: str, peer_username: str) -> bool:
"""Delete the peer's account on the service and clear stored credentials."""
svc, manager_name, manager = self._resolve_service(service_id)
if manager_name == 'http':
ok = self._deprovision_http(svc, peer_username)
else:
dispatch = _DISPATCH_DEPROVISION.get(manager_name)
if dispatch is None:
raise ValueError(f'No deprovision dispatch for manager: {manager_name!r}')
ok = getattr(self, dispatch)(manager, svc, peer_username)
with self._lock:
all_creds = self._load_creds()
svc_creds = all_creds.get(service_id, {})
if peer_username in svc_creds:
del svc_creds[peer_username]
if not svc_creds:
del all_creds[service_id]
self._save_creds(all_creds)
logger.info('AccountManager: deprovisioned %s from %s', peer_username, service_id)
return bool(ok)
def get_credentials(self, service_id: str, peer_username: str) -> Optional[Dict]:
"""Return stored credentials for peer+service, or None if not provisioned."""
with self._lock:
return self._load_creds().get(service_id, {}).get(peer_username)
def list_accounts(self, service_id: str) -> List[str]:
"""Return peer usernames provisioned on a service."""
with self._lock:
return list(self._load_creds().get(service_id, {}).keys())
def list_peer_services(self, peer_username: str) -> List[str]:
"""Return service IDs where this peer has a provisioned account."""
with self._lock:
creds = self._load_creds()
return [svc_id for svc_id, peers in creds.items() if peer_username in peers]
def is_provisioned(self, service_id: str, peer_username: str) -> bool:
return self.get_credentials(service_id, peer_username) is not None
def deprovision_peer(self, peer_username: str) -> Dict[str, bool]:
"""Remove a peer from every service they are provisioned on.
Called on peer deletion. Continues even if individual services fail.
Returns {service_id: success} for each service attempted.
"""
results: Dict[str, bool] = {}
for service_id in self.list_peer_services(peer_username):
try:
results[service_id] = self.deprovision(service_id, peer_username)
except Exception as e:
logger.warning('AccountManager: deprovision %s from %s failed: %s',
peer_username, service_id, e)
results[service_id] = False
return results
def get_all_credentials(self, peer_username: str) -> Dict[str, Dict]:
"""Return {service_id: {field: value}} for all services the peer is provisioned on."""
with self._lock:
creds = self._load_creds()
return {
svc_id: peers[peer_username]
for svc_id, peers in creds.items()
if peer_username in peers
}
def store_credentials(self, service_id: str, peer_username: str,
cred: Dict) -> None:
"""Directly store credentials without calling the underlying manager.
Used when a peer was provisioned through the legacy peers-POST route
so that their credentials become retrievable via AccountManager.
"""
with self._lock:
all_creds = self._load_creds()
all_creds.setdefault(service_id, {})[peer_username] = cred
self._save_creds(all_creds)