feat: add Steps 1-4 implementation files (AccountManager, ServiceComposer, builtins, tests)
Unit Tests / test (push) Successful in 11m24s

These files were created during Steps 1-4 of the services architecture but were
never staged: AccountManager (per-service credential provisioning), ServiceComposer
(docker-compose lifecycle), built-in service manifests for email/calendar/files,
and their test suites (158 tests). Also un-tracks .coverage binaries that were
accidentally committed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 04:39:19 -04:00
parent dc7b316cbd
commit 2f5370bd98
12 changed files with 2443 additions and 0 deletions
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.
+237
View File
@@ -0,0 +1,237 @@
"""
AccountManager — per-service credential provisioning for PIC peers.
Responsibilities:
- Dispatch account creation/deletion to each service's underlying manager
- Store per-peer per-service credentials securely (0o600 file)
- Provide credential retrieval for peer_config_template filling
- Bulk-deprovision a peer from all services on peer deletion
Credentials file format (data/peer_service_credentials.json):
{
"<service_id>": {
"<peer_username>": {"password": "..."}
}
}
Design note — plaintext passwords:
Credentials are stored in plaintext so the peer endpoint can return them to
the peer's device for one-time client configuration. The file is created with
0o600 so it is only readable by the process owner (same pattern used for
WireGuard keys and service_secrets.json).
"""
import json
import logging
import os
import secrets as _secrets_mod
import threading
from pathlib import Path
from typing import Dict, List, Optional
logger = logging.getLogger('picell')
_DISPATCH_PROVISION = {
'email_manager': '_provision_email',
'calendar_manager': '_provision_calendar',
'file_manager': '_provision_files',
}
_DISPATCH_DEPROVISION = {
'email_manager': '_deprovision_email',
'calendar_manager': '_deprovision_calendar',
'file_manager': '_deprovision_files',
}
class AccountManager:
def __init__(self, service_registry, data_dir: str, **managers):
"""
service_registry — ServiceRegistry instance
data_dir — host data directory (data/peer_service_credentials.json lives here)
**managers — named manager instances: email_manager=..., calendar_manager=...,
file_manager=...
"""
self._registry = service_registry
self._creds_path = Path(data_dir) / 'peer_service_credentials.json'
self._managers = managers
self._lock = threading.Lock()
# ── Credential storage (0o600) ────────────────────────────────────────
def _load_creds(self) -> Dict:
if not self._creds_path.exists():
return {}
try:
with open(self._creds_path) as f:
return json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.warning('AccountManager: failed to load credentials: %s', e)
return {}
def _save_creds(self, creds: Dict) -> None:
tmp = str(self._creds_path) + '.tmp'
with open(tmp, 'w', opener=lambda path, flags: os.open(path, flags, 0o600)) as f:
json.dump(creds, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, str(self._creds_path))
# ── Per-manager provision / deprovision ───────────────────────────────
def _provision_email(self, manager, svc: Dict, peer_username: str, password: str) -> bool:
domain = (svc.get('config') or {}).get('domain', '')
if not domain:
raise ValueError("Email service has no 'domain' configured")
return manager.create_email_user(peer_username, domain, password)
def _deprovision_email(self, manager, svc: Dict, peer_username: str) -> bool:
domain = (svc.get('config') or {}).get('domain', '')
return manager.delete_email_user(peer_username, domain)
@staticmethod
def _provision_calendar(manager, _svc: Dict, peer_username: str, password: str) -> bool:
return manager.create_calendar_user(peer_username, password)
@staticmethod
def _deprovision_calendar(manager, _svc: Dict, peer_username: str) -> bool:
return manager.delete_calendar_user(peer_username)
@staticmethod
def _provision_files(manager, _svc: Dict, peer_username: str, password: str) -> bool:
return manager.create_user(peer_username, password)
@staticmethod
def _deprovision_files(manager, _svc: Dict, peer_username: str) -> bool:
return manager.delete_user(peer_username)
# ── Service validation helper ─────────────────────────────────────────
def _resolve_service(self, service_id: str):
"""Return (svc, manager_name, manager) or raise ValueError."""
svc = self._registry.get(service_id)
if svc is None:
raise ValueError(f'Unknown service: {service_id!r}')
accounts_cfg = svc.get('accounts') or {}
manager_name = accounts_cfg.get('manager')
if not manager_name:
raise ValueError(f'Service {service_id!r} does not support accounts')
manager = self._managers.get(manager_name)
if manager is None:
raise ValueError(f'Manager {manager_name!r} is not registered with AccountManager')
return svc, manager_name, manager
# ── Public API ────────────────────────────────────────────────────────
def provision(self, service_id: str, peer_username: str,
password: str = None) -> Dict:
"""Create an account on the service for the peer; store and return credentials.
Raises ValueError if the service doesn't support accounts.
Raises RuntimeError if the underlying manager fails.
"""
svc, manager_name, manager = self._resolve_service(service_id)
if password is None:
password = _secrets_mod.token_urlsafe(16)
dispatch = _DISPATCH_PROVISION.get(manager_name)
if dispatch is None:
raise ValueError(f'No provision dispatch for manager: {manager_name!r}')
fn = getattr(self, dispatch)
ok = fn(manager, svc, peer_username, password)
if not ok:
raise RuntimeError(
f'Provision of {peer_username!r} on {service_id!r} returned False — '
'check underlying service manager logs'
)
cred = {'password': password}
with self._lock:
all_creds = self._load_creds()
all_creds.setdefault(service_id, {})[peer_username] = cred
self._save_creds(all_creds)
logger.info('AccountManager: provisioned %s on %s', peer_username, service_id)
return cred
def deprovision(self, service_id: str, peer_username: str) -> bool:
"""Delete the peer's account on the service and clear stored credentials."""
svc, manager_name, manager = self._resolve_service(service_id)
dispatch = _DISPATCH_DEPROVISION.get(manager_name)
if dispatch is None:
raise ValueError(f'No deprovision dispatch for manager: {manager_name!r}')
fn = getattr(self, dispatch)
ok = fn(manager, svc, peer_username)
with self._lock:
all_creds = self._load_creds()
svc_creds = all_creds.get(service_id, {})
if peer_username in svc_creds:
del svc_creds[peer_username]
if not svc_creds:
del all_creds[service_id]
self._save_creds(all_creds)
logger.info('AccountManager: deprovisioned %s from %s', peer_username, service_id)
return bool(ok)
def get_credentials(self, service_id: str, peer_username: str) -> Optional[Dict]:
"""Return stored credentials for peer+service, or None if not provisioned."""
with self._lock:
return self._load_creds().get(service_id, {}).get(peer_username)
def list_accounts(self, service_id: str) -> List[str]:
"""Return peer usernames provisioned on a service."""
with self._lock:
return list(self._load_creds().get(service_id, {}).keys())
def list_peer_services(self, peer_username: str) -> List[str]:
"""Return service IDs where this peer has a provisioned account."""
with self._lock:
creds = self._load_creds()
return [svc_id for svc_id, peers in creds.items() if peer_username in peers]
def is_provisioned(self, service_id: str, peer_username: str) -> bool:
return self.get_credentials(service_id, peer_username) is not None
def deprovision_peer(self, peer_username: str) -> Dict[str, bool]:
"""Remove a peer from every service they are provisioned on.
Called on peer deletion. Continues even if individual services fail.
Returns {service_id: success} for each service attempted.
"""
results: Dict[str, bool] = {}
for service_id in self.list_peer_services(peer_username):
try:
results[service_id] = self.deprovision(service_id, peer_username)
except Exception as e:
logger.warning('AccountManager: deprovision %s from %s failed: %s',
peer_username, service_id, e)
results[service_id] = False
return results
def get_all_credentials(self, peer_username: str) -> Dict[str, Dict]:
"""Return {service_id: {field: value}} for all services the peer is provisioned on."""
with self._lock:
creds = self._load_creds()
return {
svc_id: peers[peer_username]
for svc_id, peers in creds.items()
if peer_username in peers
}
def store_credentials(self, service_id: str, peer_username: str,
cred: Dict) -> None:
"""Directly store credentials without calling the underlying manager.
Used when a peer was provisioned through the legacy peers-POST route
so that their credentials become retrievable via AccountManager.
"""
with self._lock:
all_creds = self._load_creds()
all_creds.setdefault(service_id, {})[peer_username] = cred
self._save_creds(all_creds)
+310
View File
@@ -0,0 +1,310 @@
"""
ServiceComposer — docker-compose generation and container lifecycle for PIC services.
Responsibilities:
- Render compose-template.yml → per-service docker-compose.yml with PIC_* substitution
- Manage store-service container lifecycle (up / down / restart / status / reconfigure)
- Manage builtin-service restarts and status via the main compose stack
- Generate and persist PIC_SECRET_* variables in a dedicated secrets file
Template variable reference (for compose-template.yml authors):
${PIC_CFG_<KEY>} — value from manifest config_schema, uppercased
${PIC_SECRET_<NAME>} — auto-generated random secret, persisted across reconfigures
${PIC_DOMAIN} — effective domain (e.g. cell.pic.ngo)
${PIC_CELL_NAME} — cell name (e.g. mycell)
${PIC_SERVICE_ID} — service identifier (e.g. nextcloud)
"""
import json
import logging
import os
import re
import secrets as _secrets_lib
import shutil
import subprocess
import threading
from typing import Dict, List, Optional
logger = logging.getLogger('picell')
_SECRET_RE = re.compile(r'\$\{(PIC_SECRET_\w+)\}')
_SAFE_ID_RE = re.compile(r'^[a-z0-9][a-z0-9_-]{0,63}$')
class ServiceComposer:
def __init__(self, config_manager, data_dir: str):
self.cm = config_manager
self.data_dir = data_dir
self._services_dir = os.path.join(data_dir, 'services')
self._secrets_path = os.path.join(data_dir, 'service_secrets.json')
self._lock = threading.Lock()
# ── Path helpers ──────────────────────────────────────────────────────
@staticmethod
def _validate_service_id(service_id: str) -> None:
"""Raise ValueError if service_id could be used for path traversal."""
if not _SAFE_ID_RE.match(service_id):
raise ValueError(
f'Invalid service_id {service_id!r}: '
'must match ^[a-z0-9][a-z0-9_-]{{0,63}}$'
)
def _svc_dir(self, service_id: str) -> str:
self._validate_service_id(service_id)
candidate = os.path.join(self._services_dir, service_id)
# Paranoia: ensure the resolved path stays inside _services_dir
real_base = os.path.realpath(self._services_dir)
real_cand = os.path.realpath(candidate)
if not real_cand.startswith(real_base + os.sep) and real_cand != real_base:
raise ValueError(f'service_id {service_id!r} escapes services directory')
return candidate
def _compose_path(self, service_id: str) -> str:
return os.path.join(self._svc_dir(service_id), 'docker-compose.yml')
def has_compose_file(self, service_id: str) -> bool:
try:
return os.path.exists(self._compose_path(service_id))
except ValueError:
return False
# ── Secrets management ────────────────────────────────────────────────
def _load_secrets(self) -> Dict:
if not os.path.exists(self._secrets_path):
return {}
try:
with open(self._secrets_path) as f:
return json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.warning('ServiceComposer: failed to load secrets: %s', e)
return {}
def _save_secrets(self, secrets: Dict) -> None:
tmp = self._secrets_path + '.tmp'
# 0o600: readable only by the process owner — secrets must not be world-readable
with open(tmp, 'w',
opener=lambda path, flags: os.open(path, flags, 0o600)) as f:
json.dump(secrets, f, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, self._secrets_path)
def _get_or_create_secret(self, service_id: str, var_name: str) -> str:
with self._lock:
secrets = self._load_secrets()
svc_secrets = secrets.setdefault(service_id, {})
if var_name not in svc_secrets:
svc_secrets[var_name] = _secrets_lib.token_urlsafe(24)
self._save_secrets(secrets)
return svc_secrets[var_name]
def _clear_secrets(self, service_id: str) -> None:
with self._lock:
secrets = self._load_secrets()
if service_id in secrets:
del secrets[service_id]
self._save_secrets(secrets)
# ── Template rendering ────────────────────────────────────────────────
def render_template(self, service_id: str, manifest: Dict,
template_content: str) -> str:
"""
Substitute all PIC_* variables in a compose-template.yml string.
Returns the rendered compose YAML.
"""
schema = manifest.get('config_schema') or {}
saved = self.cm.configs.get(service_id, {})
config: Dict = {k: v['default'] for k, v in schema.items() if 'default' in v}
config.update({k: saved[k] for k in schema if k in saved})
identity = self.cm.get_identity()
domain = self.cm.get_effective_domain() or identity.get('domain', 'cell.local')
cell_name = identity.get('cell_name', 'mycell')
result = template_content
for key, value in config.items():
# Strip newlines/tabs to prevent YAML injection (a config string containing
# \n could inject new YAML keys into the compose file)
safe_val = str(value).replace('\n', '').replace('\r', '').replace('\t', ' ')
result = result.replace(f'${{PIC_CFG_{key.upper()}}}', safe_val)
result = result.replace('${PIC_DOMAIN}', domain)
result = result.replace('${PIC_CELL_NAME}', cell_name)
result = result.replace('${PIC_SERVICE_ID}', service_id)
# PIC_SECRET_* — generate on first use, reuse on reconfigure
for match in _SECRET_RE.finditer(template_content):
var_name = match.group(1)
secret = self._get_or_create_secret(service_id, var_name)
result = result.replace(f'${{{var_name}}}', secret)
return result
def write_compose(self, service_id: str, manifest: Dict,
template_content: str) -> str:
"""Render and atomically write the per-service compose file. Returns rendered content."""
os.makedirs(self._svc_dir(service_id), exist_ok=True)
content = self.render_template(service_id, manifest, template_content)
path = self._compose_path(service_id)
tmp = path + '.tmp'
with open(tmp, 'w') as f:
f.write(content)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
logger.info('ServiceComposer: wrote compose file for %s', service_id)
return content
# ── Subprocess helper ─────────────────────────────────────────────────
def _run(self, cmd: List[str], timeout: int = 120) -> Dict:
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode != 0 and r.stderr:
logger.warning('ServiceComposer command failed: %s', r.stderr.strip())
return {
'ok': r.returncode == 0,
'stdout': r.stdout.strip(),
'stderr': r.stderr.strip(),
}
except subprocess.TimeoutExpired:
return {'ok': False, 'error': 'docker compose command timed out'}
except Exception as e:
logger.error('ServiceComposer._run error: %s', e)
return {'ok': False, 'error': str(e)}
@staticmethod
def _parse_ps_json(output: str) -> List[Dict]:
"""Parse `docker compose ps --format json` output (one JSON object per line)."""
containers = []
for line in output.splitlines():
line = line.strip()
if not line:
continue
try:
containers.append(json.loads(line))
except json.JSONDecodeError:
pass
return containers
# ── Store-service lifecycle (per-service compose file) ────────────────
def _store_cmd(self, service_id: str, *args, timeout: int = 120) -> Dict:
compose_file = self._compose_path(service_id)
if not os.path.exists(compose_file):
return {'ok': False, 'error': f'No compose file found for service {service_id!r}'}
cmd = [
'docker', 'compose',
'-f', compose_file,
'--project-name', f'pic-{service_id}',
*args,
]
return self._run(cmd, timeout)
def up(self, service_id: str) -> Dict:
# 600s: image pulls on slow connections can take several minutes
return self._store_cmd(service_id, 'up', '-d', '--remove-orphans', timeout=600)
def down(self, service_id: str, remove_volumes: bool = False) -> Dict:
args = ['down']
if remove_volumes:
args.append('--volumes')
return self._store_cmd(service_id, *args)
def restart(self, service_id: str) -> Dict:
return self._store_cmd(service_id, 'restart')
def status(self, service_id: str) -> Dict:
result = self._store_cmd(service_id, 'ps', '--format', 'json')
result['containers'] = self._parse_ps_json(result.get('stdout', ''))
return result
def reconfigure(self, service_id: str, manifest: Dict,
template_content: str) -> Dict:
"""Re-render the compose file then re-apply with `up -d` (rolling update)."""
self.write_compose(service_id, manifest, template_content)
return self.up(service_id)
def install(self, service_id: str, manifest: Dict,
template_content: str) -> Dict:
"""Write compose file and start containers."""
self.write_compose(service_id, manifest, template_content)
return self.up(service_id)
def remove(self, service_id: str, purge_data: bool = False) -> Dict:
"""Stop containers, optionally delete compose file, secrets, and service data dir."""
result = self.down(service_id, remove_volumes=purge_data)
if purge_data:
self._clear_secrets(service_id)
svc_dir = self._svc_dir(service_id) # already validates service_id + realpath
if os.path.isdir(svc_dir):
# Final realpath check: reject symlinks that escape the services dir
real_svc = os.path.realpath(svc_dir)
real_base = os.path.realpath(self._services_dir)
if not real_svc.startswith(real_base + os.sep):
logger.error('ServiceComposer: refusing rmtree outside services dir: %s', svc_dir)
else:
try:
shutil.rmtree(svc_dir)
except OSError as e:
logger.warning('ServiceComposer: could not remove %s: %s', svc_dir, e)
elif os.path.exists(self._compose_path(service_id)):
# Remove compose file even without purge so stale file doesn't confuse future installs
try:
os.remove(self._compose_path(service_id))
except OSError:
pass
return result
# ── Builtin-service lifecycle (main compose stack) ─────────────────────
@staticmethod
def _main_compose() -> str:
return os.environ.get('COMPOSE_FILE', '/app/docker-compose.yml')
def restart_builtin(self, container_names: List[str]) -> Dict:
"""Restart one or more containers that live in the main docker-compose stack."""
if not container_names:
return {'ok': False, 'error': 'No container names provided'}
cmd = ['docker', 'compose', '-f', self._main_compose(),
'restart', *container_names]
return self._run(cmd)
def status_builtin(self, container_names: List[str]) -> Dict:
"""Return status of containers from the main compose stack."""
if not container_names:
return {'ok': False, 'error': 'No container names provided'}
cmd = ['docker', 'compose', '-f', self._main_compose(),
'ps', '--format', 'json', *container_names]
result = self._run(cmd)
result['containers'] = self._parse_ps_json(result.get('stdout', ''))
return result
# ── Unified lifecycle (dispatches based on service kind) ───────────────
def restart_service(self, service_id: str, manifest: Dict) -> Dict:
"""
Restart any service — builtin or store — using the right compose stack.
Builtin: uses manifest.containers + main docker-compose.yml.
Store: uses per-service compose file.
"""
if manifest.get('kind') == 'builtin':
containers = manifest.get('containers') or []
return self.restart_builtin(containers)
return self.restart(service_id)
def status_service(self, service_id: str, manifest: Dict) -> Dict:
"""
Return container status for any service.
Builtin: queries manifest.containers from main compose stack.
Store: queries per-service compose project.
"""
if manifest.get('kind') == 'builtin':
containers = manifest.get('containers') or []
return self.status_builtin(containers)
return self.status(service_id)
@@ -0,0 +1,68 @@
{
"schema_version": 3,
"id": "calendar",
"name": "Calendar & Contacts",
"description": "Radicale CalDAV / CardDAV server",
"version": "1.0.0",
"author": "pic",
"kind": "builtin",
"min_pic_version": "1.0",
"capabilities": {
"has_subdomain": true,
"has_accounts": true,
"has_admin_config": true,
"has_storage": true,
"has_egress": true,
"has_api_hooks": false
},
"subdomain": "calendar",
"extra_subdomains": [],
"backend": "cell-radicale:5232",
"containers": ["cell-radicale"],
"config_schema": {
"port": {
"type": "integer",
"label": "CalDAV port (internal)",
"default": 5232,
"min": 1,
"max": 65535
}
},
"peer_config_template": {
"caldav_url": "https://calendar.{domain}/{peer.username}/",
"carddav_url": "https://calendar.{domain}/{peer.username}/",
"username": "{peer.username}",
"password": "{peer.service_credentials.calendar.password}"
},
"accounts": {
"manager": "calendar_manager",
"credentials": ["password"]
},
"compose": null,
"backup": {
"volumes": [
{"container": "cell-radicale", "path": "/data", "name": "radicale_data"}
],
"config_paths": [
"config/radicale"
]
},
"egress": {
"default": "default",
"allowed": ["default", "wireguard_ext", "openvpn", "tor"]
},
"storage": {
"primary_path": "data/radicale",
"quota_mb": null
}
}
+99
View File
@@ -0,0 +1,99 @@
{
"schema_version": 3,
"id": "email",
"name": "Email",
"description": "Postfix (SMTP) + Dovecot (IMAP) email server with Rainloop webmail",
"version": "1.0.0",
"author": "pic",
"kind": "builtin",
"min_pic_version": "1.0",
"capabilities": {
"has_subdomain": true,
"has_accounts": true,
"has_admin_config": true,
"has_storage": true,
"has_egress": true,
"has_api_hooks": false
},
"subdomain": "mail",
"extra_subdomains": ["webmail"],
"backend": "cell-rainloop:8888",
"containers": ["cell-mail", "cell-rainloop"],
"config_schema": {
"domain": {
"type": "string",
"label": "Mail domain",
"required": true
},
"smtp_port": {
"type": "integer",
"label": "SMTP port",
"default": 25,
"min": 1,
"max": 65535
},
"submission_port": {
"type": "integer",
"label": "Submission port",
"default": 587,
"min": 1,
"max": 65535
},
"imap_port": {
"type": "integer",
"label": "IMAP port",
"default": 993,
"min": 1,
"max": 65535
},
"webmail_port": {
"type": "integer",
"label": "Webmail port (internal)",
"default": 8888,
"min": 1,
"max": 65535
}
},
"peer_config_template": {
"imap_server": "{domain}",
"imap_port": "{config.imap_port}",
"smtp_server": "{domain}",
"smtp_port": "{config.submission_port}",
"webmail_url": "https://mail.{domain}/",
"username": "{peer.username}@{domain}",
"password": "{peer.service_credentials.email.password}"
},
"accounts": {
"manager": "email_manager",
"credentials": ["password"]
},
"compose": null,
"backup": {
"volumes": [
{"container": "cell-mail", "path": "/var/mail", "name": "maildata"},
{"container": "cell-mail", "path": "/var/mail-state", "name": "mailstate"},
{"container": "cell-rainloop", "path": "/rainloop/data", "name": "rainloop"}
],
"config_paths": [
"config/mail"
]
},
"egress": {
"default": "default",
"allowed": ["default", "wireguard_ext", "openvpn", "tor"]
},
"storage": {
"primary_path": "data/maildata",
"quota_mb": null
}
}
+79
View File
@@ -0,0 +1,79 @@
{
"schema_version": 3,
"id": "files",
"name": "File Storage",
"description": "FileGator browser UI + WebDAV network drive",
"version": "1.0.0",
"author": "pic",
"kind": "builtin",
"min_pic_version": "1.0",
"capabilities": {
"has_subdomain": true,
"has_accounts": true,
"has_admin_config": true,
"has_storage": true,
"has_egress": true,
"has_api_hooks": false
},
"subdomain": "files",
"extra_subdomains": ["webdav"],
"backend": "cell-filegator:8080",
"extra_backends": {
"webdav": "cell-webdav:80"
},
"containers": ["cell-filegator", "cell-webdav"],
"config_schema": {
"manager_port": {
"type": "integer",
"label": "FileGator port (internal)",
"default": 8082,
"min": 1,
"max": 65535
},
"port": {
"type": "integer",
"label": "WebDAV port (internal)",
"default": 8080,
"min": 1,
"max": 65535
}
},
"peer_config_template": {
"files_url": "https://files.{domain}/",
"webdav_url": "https://webdav.{domain}/",
"username": "{peer.username}",
"password": "{peer.service_credentials.files.password}"
},
"accounts": {
"manager": "file_manager",
"credentials": ["password"]
},
"compose": null,
"backup": {
"volumes": [
{"container": "cell-filegator", "path": "/var/www/filegator/private", "name": "filegator"},
{"container": "cell-webdav", "path": "/var/lib/dav", "name": "files"}
],
"config_paths": [
"config/webdav"
]
},
"egress": {
"default": "default",
"allowed": ["default", "wireguard_ext", "openvpn", "tor"]
},
"storage": {
"primary_path": "data/files",
"quota_mb": null
}
}
BIN
View File
Binary file not shown.
+441
View File
@@ -0,0 +1,441 @@
"""
Tests for AccountManager — per-service credential provisioning.
Covers:
- provision: dispatches to right manager method, stores credentials, generates password
- deprovision: calls manager method, removes stored credentials
- get_credentials / list_accounts / list_peer_services
- deprovision_peer: bulk cleanup on peer deletion
- store_credentials: direct storage (used by peers-POST legacy route)
- get_all_credentials: returns all creds for a peer
- credential file is created with 0o600
- unknown service / missing manager errors
"""
import json
import os
import stat
import threading
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
from account_manager import AccountManager
# ── helpers ────────────────────────────────────────────────────────────────────
def _make_am(tmp_path: Path, registry=None, **managers) -> AccountManager:
if registry is None:
registry = _make_registry()
return AccountManager(service_registry=registry, data_dir=str(tmp_path), **managers)
def _make_registry(services=None):
reg = MagicMock()
if services is None:
services = {
'email': {
'id': 'email', 'kind': 'builtin',
'accounts': {'manager': 'email_manager', 'credentials': ['password']},
'config': {'domain': 'example.com', 'smtp_port': 25},
},
'calendar': {
'id': 'calendar', 'kind': 'builtin',
'accounts': {'manager': 'calendar_manager', 'credentials': ['password']},
'config': {},
},
'files': {
'id': 'files', 'kind': 'builtin',
'accounts': {'manager': 'file_manager', 'credentials': ['password']},
'config': {},
},
}
reg.get.side_effect = lambda svc_id: services.get(svc_id)
return reg
def _make_email_mgr(ok=True):
m = MagicMock()
m.create_email_user.return_value = ok
m.delete_email_user.return_value = ok
return m
def _make_cal_mgr(ok=True):
m = MagicMock()
m.create_calendar_user.return_value = ok
m.delete_calendar_user.return_value = ok
return m
def _make_file_mgr(ok=True):
m = MagicMock()
m.create_user.return_value = ok
m.delete_user.return_value = ok
return m
# ── Provision ─────────────────────────────────────────────────────────────────
class TestProvision(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.email_mgr = _make_email_mgr()
self.cal_mgr = _make_cal_mgr()
self.file_mgr = _make_file_mgr()
self.am = _make_am(
self.tmp,
email_manager=self.email_mgr,
calendar_manager=self.cal_mgr,
file_manager=self.file_mgr,
)
def test_provision_email_calls_create_email_user(self):
self.am.provision('email', 'alice', password='s3cret')
self.email_mgr.create_email_user.assert_called_once_with('alice', 'example.com', 's3cret')
def test_provision_calendar_calls_create_calendar_user(self):
self.am.provision('calendar', 'alice', password='s3cret')
self.cal_mgr.create_calendar_user.assert_called_once_with('alice', 's3cret')
def test_provision_files_calls_create_user(self):
self.am.provision('files', 'alice', password='s3cret')
self.file_mgr.create_user.assert_called_once_with('alice', 's3cret')
def test_provision_generates_password_when_none_given(self):
creds = self.am.provision('email', 'alice')
self.assertIn('password', creds)
self.assertTrue(len(creds['password']) >= 16)
def test_provision_returns_credential_dict(self):
creds = self.am.provision('email', 'alice', password='mypassword')
self.assertEqual(creds, {'password': 'mypassword'})
def test_provision_stores_credentials(self):
self.am.provision('email', 'alice', password='pw')
stored = self.am.get_credentials('email', 'alice')
self.assertEqual(stored, {'password': 'pw'})
def test_provision_multiple_peers_stored_independently(self):
self.am.provision('email', 'alice', password='pw-alice')
self.am.provision('email', 'bob', password='pw-bob')
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'pw-alice'})
self.assertEqual(self.am.get_credentials('email', 'bob'), {'password': 'pw-bob'})
def test_provision_raises_for_unknown_service(self):
with self.assertRaises(ValueError):
self.am.provision('doesnotexist', 'alice')
def test_provision_raises_when_service_has_no_accounts(self):
reg = _make_registry({'nosvc': {'id': 'nosvc', 'accounts': {}, 'config': {}}})
am = _make_am(self.tmp, registry=reg, email_manager=self.email_mgr)
with self.assertRaises(ValueError):
am.provision('nosvc', 'alice')
def test_provision_raises_when_manager_not_registered(self):
am = _make_am(self.tmp) # no managers passed
with self.assertRaises(ValueError):
am.provision('email', 'alice')
def test_provision_raises_runtime_error_when_manager_returns_false(self):
am = _make_am(self.tmp, email_manager=_make_email_mgr(ok=False))
with self.assertRaises(RuntimeError):
am.provision('email', 'alice')
def test_provision_email_raises_when_domain_not_configured(self):
reg = _make_registry({'email': {
'id': 'email', 'accounts': {'manager': 'email_manager'},
'config': {'domain': ''},
}})
am = _make_am(self.tmp, registry=reg, email_manager=self.email_mgr)
with self.assertRaises(ValueError):
am.provision('email', 'alice')
# ── Credential file permissions ───────────────────────────────────────────────
class TestCredentialFilePermissions(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.am = _make_am(self.tmp, email_manager=_make_email_mgr())
def test_credentials_file_created_with_0600(self):
self.am.provision('email', 'alice', password='pw')
creds_path = self.tmp / 'peer_service_credentials.json'
mode = stat.S_IMODE(creds_path.stat().st_mode)
self.assertEqual(mode, 0o600, f'Expected 0o600, got {oct(mode)}')
# ── Deprovision ───────────────────────────────────────────────────────────────
class TestDeprovision(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.email_mgr = _make_email_mgr()
self.cal_mgr = _make_cal_mgr()
self.file_mgr = _make_file_mgr()
self.am = _make_am(
self.tmp,
email_manager=self.email_mgr,
calendar_manager=self.cal_mgr,
file_manager=self.file_mgr,
)
self.am.provision('email', 'alice', password='pw')
def test_deprovision_email_calls_delete_email_user(self):
self.am.deprovision('email', 'alice')
self.email_mgr.delete_email_user.assert_called_once_with('alice', 'example.com')
def test_deprovision_removes_stored_credentials(self):
self.am.deprovision('email', 'alice')
self.assertIsNone(self.am.get_credentials('email', 'alice'))
def test_deprovision_returns_true_on_success(self):
ok = self.am.deprovision('email', 'alice')
self.assertTrue(ok)
def test_deprovision_raises_for_unknown_service(self):
with self.assertRaises(ValueError):
self.am.deprovision('ghost', 'alice')
def test_deprovision_removes_service_entry_when_last_peer_gone(self):
self.am.deprovision('email', 'alice')
creds_file = self.tmp / 'peer_service_credentials.json'
data = json.loads(creds_file.read_text())
self.assertNotIn('email', data)
def test_deprovision_calendar_calls_delete_calendar_user(self):
self.am.provision('calendar', 'alice', password='pw')
self.am.deprovision('calendar', 'alice')
self.cal_mgr.delete_calendar_user.assert_called_once_with('alice')
def test_deprovision_files_calls_delete_user(self):
self.am.provision('files', 'alice', password='pw')
self.am.deprovision('files', 'alice')
self.file_mgr.delete_user.assert_called_once_with('alice')
# ── Queries ───────────────────────────────────────────────────────────────────
class TestQueries(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.am = _make_am(
self.tmp,
email_manager=_make_email_mgr(),
calendar_manager=_make_cal_mgr(),
file_manager=_make_file_mgr(),
)
self.am.provision('email', 'alice', password='pw-alice-email')
self.am.provision('email', 'bob', password='pw-bob-email')
self.am.provision('calendar', 'alice', password='pw-alice-cal')
def test_get_credentials_returns_stored(self):
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'pw-alice-email'})
def test_get_credentials_returns_none_for_unknown_peer(self):
self.assertIsNone(self.am.get_credentials('email', 'nobody'))
def test_get_credentials_returns_none_for_unknown_service(self):
self.assertIsNone(self.am.get_credentials('ghost', 'alice'))
def test_list_accounts_returns_provisioned_peers(self):
accounts = self.am.list_accounts('email')
self.assertIn('alice', accounts)
self.assertIn('bob', accounts)
def test_list_accounts_empty_for_unprovisioned_service(self):
self.assertEqual(self.am.list_accounts('files'), [])
def test_list_peer_services_returns_all_services_for_peer(self):
services = self.am.list_peer_services('alice')
self.assertIn('email', services)
self.assertIn('calendar', services)
def test_list_peer_services_returns_empty_for_unknown_peer(self):
self.assertEqual(self.am.list_peer_services('nobody'), [])
def test_is_provisioned_true_when_account_exists(self):
self.assertTrue(self.am.is_provisioned('email', 'alice'))
def test_is_provisioned_false_when_no_account(self):
self.assertFalse(self.am.is_provisioned('email', 'nobody'))
def test_get_all_credentials_returns_all_services(self):
all_creds = self.am.get_all_credentials('alice')
self.assertIn('email', all_creds)
self.assertIn('calendar', all_creds)
self.assertEqual(all_creds['email'], {'password': 'pw-alice-email'})
def test_get_all_credentials_empty_for_unknown_peer(self):
self.assertEqual(self.am.get_all_credentials('nobody'), {})
# ── Bulk deprovision ──────────────────────────────────────────────────────────
class TestDeprovisionPeer(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.email_mgr = _make_email_mgr()
self.cal_mgr = _make_cal_mgr()
self.am = _make_am(
self.tmp,
email_manager=self.email_mgr,
calendar_manager=self.cal_mgr,
file_manager=_make_file_mgr(),
)
self.am.provision('email', 'alice', password='pw')
self.am.provision('calendar', 'alice', password='pw')
def test_deprovision_peer_removes_from_all_services(self):
self.am.deprovision_peer('alice')
self.assertIsNone(self.am.get_credentials('email', 'alice'))
self.assertIsNone(self.am.get_credentials('calendar', 'alice'))
def test_deprovision_peer_returns_results_dict(self):
results = self.am.deprovision_peer('alice')
self.assertIn('email', results)
self.assertIn('calendar', results)
self.assertTrue(results['email'])
self.assertTrue(results['calendar'])
def test_deprovision_peer_continues_after_one_service_fails(self):
self.email_mgr.delete_email_user.side_effect = RuntimeError('smtp down')
results = self.am.deprovision_peer('alice')
self.assertFalse(results.get('email'))
# calendar should still succeed even though email failed
self.assertTrue(results.get('calendar'))
def test_deprovision_peer_no_op_for_unknown_peer(self):
results = self.am.deprovision_peer('nobody')
self.assertEqual(results, {})
# ── Direct credential storage ─────────────────────────────────────────────────
class TestStoreCredentials(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.am = _make_am(self.tmp)
def test_store_credentials_makes_them_retrievable(self):
self.am.store_credentials('email', 'alice', {'password': 'mypassword'})
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'mypassword'})
def test_store_credentials_overwrites_existing(self):
self.am.store_credentials('email', 'alice', {'password': 'old'})
self.am.store_credentials('email', 'alice', {'password': 'new'})
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'new'})
def test_store_credentials_creates_file_with_0600(self):
self.am.store_credentials('email', 'alice', {'password': 'pw'})
creds_path = self.tmp / 'peer_service_credentials.json'
mode = stat.S_IMODE(creds_path.stat().st_mode)
self.assertEqual(mode, 0o600)
# ── Thread safety ─────────────────────────────────────────────────────────────
class TestThreadSafety(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.am = _make_am(self.tmp)
def test_concurrent_store_credentials_no_data_loss(self):
errors = []
def worker(peer_name):
try:
self.am.store_credentials('email', peer_name, {'password': f'pw-{peer_name}'})
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker, args=(f'peer{i}',)) for i in range(20)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(errors, [])
accounts = self.am.list_accounts('email')
self.assertEqual(len(accounts), 20)
class TestEdgeCases(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.email_mgr = _make_email_mgr()
self.am = _make_am(self.tmp, email_manager=self.email_mgr,
calendar_manager=_make_cal_mgr(),
file_manager=_make_file_mgr())
def test_deprovision_peer_never_provisioned_returns_empty(self):
self.assertEqual(self.am.deprovision_peer('ghost'), {})
def test_deprovision_clears_credentials_even_when_manager_returns_false(self):
"""Credentials are removed even if underlying manager reports failure."""
self.am.provision('email', 'alice', password='pw')
self.email_mgr.delete_email_user.return_value = False
self.am.deprovision('email', 'alice')
self.assertIsNone(self.am.get_credentials('email', 'alice'))
def test_provision_twice_overwrites_credentials(self):
self.am.provision('email', 'alice', password='first')
self.am.provision('email', 'alice', password='second')
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'second'})
def test_provision_twice_calls_manager_both_times(self):
self.am.provision('email', 'alice', password='first')
self.am.provision('email', 'alice', password='second')
self.assertEqual(self.email_mgr.create_email_user.call_count, 2)
def test_corrupted_credentials_file_returns_empty_and_continues(self):
"""A corrupted JSON file is treated as empty rather than crashing."""
creds_path = self.tmp / 'peer_service_credentials.json'
creds_path.write_text('{invalid json}')
result = self.am.get_all_credentials('alice')
self.assertEqual(result, {})
def test_file_permissions_preserved_on_second_write(self):
"""0o600 must hold even after overwriting with a second provision."""
self.am.provision('email', 'alice', password='first')
self.am.provision('email', 'bob', password='second')
creds_path = self.tmp / 'peer_service_credentials.json'
mode = stat.S_IMODE(creds_path.stat().st_mode)
self.assertEqual(mode, 0o600, f'Expected 0o600 after overwrite, got {oct(mode)}')
def test_generated_password_is_url_safe(self):
"""token_urlsafe must not produce + or / characters."""
creds = self.am.provision('email', 'alice')
pwd = creds['password']
self.assertNotIn('+', pwd)
self.assertNotIn('/', pwd)
def test_store_then_deprovision_removes_credentials(self):
"""store_credentials + deprovision should cleanly remove the entry."""
self.am.store_credentials('email', 'alice', {'password': 'stored'})
self.am.deprovision('email', 'alice')
self.assertIsNone(self.am.get_credentials('email', 'alice'))
if __name__ == '__main__':
unittest.main()
+354
View File
@@ -0,0 +1,354 @@
"""
Tests for service-volume backup/restore in ConfigManager.
Covers:
- _backup_service_volumes: happy path, container not running, timeout
- _restore_service_volumes: happy path, missing archive, unknown service
- backup_config: passes service_registry, records includes_service_data
- restore_config: passes service_registry on full restore, not on selective
"""
import json
import subprocess
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch, call
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
from config_manager import ConfigManager
def _make_cm(tmp_path: Path) -> ConfigManager:
cfg_file = tmp_path / 'cell_config.json'
cfg_file.write_text('{}')
cm = ConfigManager(config_file=str(cfg_file), data_dir=str(tmp_path))
return cm
def _make_registry(plan=None):
"""Return a mock ServiceRegistry with a preset backup plan."""
reg = MagicMock()
reg.get_backup_plan.return_value = plan if plan is not None else [
{
'service_id': 'email',
'volumes': [
{'container': 'cell-mail', 'path': '/var/mail', 'name': 'maildata'},
{'container': 'cell-mail', 'path': '/var/mail-state', 'name': 'mailstate'},
],
'config_paths': [],
},
{
'service_id': 'calendar',
'volumes': [
{'container': 'cell-radicale', 'path': '/data', 'name': 'radicale_data'},
],
'config_paths': [],
},
]
return reg
class TestBackupServiceVolumesHappyPath(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.cm = _make_cm(self.tmp)
self.backup_path = self.tmp / 'test_backup'
self.backup_path.mkdir()
def _run_backup(self, registry=None):
if registry is None:
registry = _make_registry()
self.cm._backup_service_volumes(self.backup_path, registry)
@patch('config_manager.subprocess.run')
def test_creates_service_data_dir(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stderr=b'')
self._run_backup()
self.assertTrue((self.backup_path / 'service_data' / 'email').is_dir())
self.assertTrue((self.backup_path / 'service_data' / 'calendar').is_dir())
@patch('config_manager.subprocess.run')
def test_calls_docker_exec_tar_for_each_volume(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stderr=b'')
self._run_backup()
commands = [tuple(c.args[0]) for c in mock_run.call_args_list]
self.assertIn(
('docker', 'exec', '--', 'cell-mail', 'tar', '-C', '/var/mail', '-czf', '-', '.'),
commands,
)
self.assertIn(
('docker', 'exec', '--', 'cell-mail', 'tar', '-C', '/var/mail-state', '-czf', '-', '.'),
commands,
)
self.assertIn(
('docker', 'exec', '--', 'cell-radicale', 'tar', '-C', '/data', '-czf', '-', '.'),
commands,
)
@patch('config_manager.subprocess.run')
def test_writes_archive_files(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stderr=b'')
self._run_backup()
self.assertTrue((self.backup_path / 'service_data' / 'email' / 'maildata.tar.gz').exists())
self.assertTrue((self.backup_path / 'service_data' / 'email' / 'mailstate.tar.gz').exists())
self.assertTrue((self.backup_path / 'service_data' / 'calendar' / 'radicale_data.tar.gz').exists())
@patch('config_manager.subprocess.run')
def test_removes_archive_on_nonzero_returncode(self, mock_run):
mock_run.return_value = MagicMock(returncode=1, stderr=b'container not running')
self._run_backup()
self.assertFalse(
(self.backup_path / 'service_data' / 'email' / 'maildata.tar.gz').exists()
)
@patch('config_manager.subprocess.run')
def test_continues_after_one_volume_fails(self, mock_run):
def side_effect(cmd, **kwargs):
if 'cell-mail' in cmd:
return MagicMock(returncode=1, stderr=b'error')
return MagicMock(returncode=0, stderr=b'')
mock_run.side_effect = side_effect
self._run_backup()
# radicale should still succeed
self.assertTrue(
(self.backup_path / 'service_data' / 'calendar' / 'radicale_data.tar.gz').exists()
)
@patch('config_manager.subprocess.run', side_effect=subprocess.TimeoutExpired('docker', 300))
def test_timeout_removes_partial_archive(self, _mock_run):
self._run_backup()
# no archive should remain after a timeout
for svc in ('email', 'calendar'):
for name in ('maildata', 'mailstate', 'radicale_data'):
self.assertFalse(
(self.backup_path / 'service_data' / svc / f'{name}.tar.gz').exists()
)
@patch('config_manager.subprocess.run')
def test_empty_volumes_list_skipped(self, mock_run):
registry = _make_registry(plan=[
{'service_id': 'widget', 'volumes': [], 'config_paths': []}
])
self.cm._backup_service_volumes(self.backup_path, registry)
mock_run.assert_not_called()
@patch('config_manager.subprocess.run')
def test_get_backup_plan_exception_is_handled(self, mock_run):
registry = MagicMock()
registry.get_backup_plan.side_effect = RuntimeError('registry unavailable')
# should not raise
self.cm._backup_service_volumes(self.backup_path, registry)
mock_run.assert_not_called()
@patch('config_manager.subprocess.run')
def test_unsafe_container_name_rejected(self, mock_run):
registry = _make_registry(plan=[{
'service_id': 'evil', 'config_paths': [],
'volumes': [{'container': '-it cell-api', 'path': '/data', 'name': 'data'}],
}])
self.cm._backup_service_volumes(self.backup_path, registry)
mock_run.assert_not_called()
@patch('config_manager.subprocess.run')
def test_path_traversal_in_volume_path_rejected(self, mock_run):
registry = _make_registry(plan=[{
'service_id': 'evil', 'config_paths': [],
'volumes': [{'container': 'cell-mail', 'path': '/../etc', 'name': 'etc'}],
}])
self.cm._backup_service_volumes(self.backup_path, registry)
mock_run.assert_not_called()
@patch('config_manager.subprocess.run')
def test_relative_volume_path_rejected(self, mock_run):
registry = _make_registry(plan=[{
'service_id': 'evil', 'config_paths': [],
'volumes': [{'container': 'cell-mail', 'path': 'data/maildata', 'name': 'data'}],
}])
self.cm._backup_service_volumes(self.backup_path, registry)
mock_run.assert_not_called()
@patch('config_manager.subprocess.run')
def test_unsafe_volume_name_rejected(self, mock_run):
registry = _make_registry(plan=[{
'service_id': 'evil', 'config_paths': [],
'volumes': [{'container': 'cell-mail', 'path': '/var/mail', 'name': '../../etc/passwd'}],
}])
self.cm._backup_service_volumes(self.backup_path, registry)
mock_run.assert_not_called()
@patch('config_manager.subprocess.run')
def test_atomic_write_no_archive_on_partial_failure(self, mock_run):
"""If an exception occurs during subprocess, no .tar.gz file should remain."""
mock_run.side_effect = OSError('disk full')
self._run_backup()
for f in self.backup_path.rglob('*.tar.gz'):
self.fail(f'Archive {f} should not exist after exception during backup')
class TestRestoreServiceVolumes(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.cm = _make_cm(self.tmp)
self.backup_path = self.tmp / 'test_backup'
# Prepare a realistic backup structure
svc_data = self.backup_path / 'service_data'
(svc_data / 'email').mkdir(parents=True)
(svc_data / 'email' / 'maildata.tar.gz').write_bytes(b'fake-archive')
(svc_data / 'calendar').mkdir(parents=True)
(svc_data / 'calendar' / 'radicale_data.tar.gz').write_bytes(b'fake-archive')
def _make_registry_with_manifests(self):
reg = MagicMock()
def get_side_effect(service_id):
manifests = {
'email': {'backup': {'volumes': [
{'container': 'cell-mail', 'path': '/var/mail', 'name': 'maildata'},
]}},
'calendar': {'backup': {'volumes': [
{'container': 'cell-radicale', 'path': '/data', 'name': 'radicale_data'},
]}},
}
return manifests.get(service_id)
reg.get.side_effect = get_side_effect
return reg
@patch('config_manager.subprocess.run')
def test_calls_docker_exec_tar_for_each_archive(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stderr=b'')
registry = self._make_registry_with_manifests()
self.cm._restore_service_volumes(self.backup_path, registry)
commands = [tuple(c.args[0]) for c in mock_run.call_args_list]
self.assertIn(
('docker', 'exec', '-i', '--', 'cell-mail', 'tar', '-C', '/var/mail', '-xzf', '-'),
commands,
)
self.assertIn(
('docker', 'exec', '-i', '--', 'cell-radicale', 'tar', '-C', '/data', '-xzf', '-'),
commands,
)
@patch('config_manager.subprocess.run')
def test_skips_missing_archive(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stderr=b'')
registry = MagicMock()
registry.get.return_value = {'backup': {'volumes': [
{'container': 'cell-mail', 'path': '/var/mail', 'name': 'no_such_archive'},
]}}
self.cm._restore_service_volumes(self.backup_path, registry)
mock_run.assert_not_called()
@patch('config_manager.subprocess.run')
def test_skips_unknown_service(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stderr=b'')
registry = MagicMock()
registry.get.return_value = None
self.cm._restore_service_volumes(self.backup_path, registry)
mock_run.assert_not_called()
@patch('config_manager.subprocess.run')
def test_no_service_data_dir_is_noop(self, mock_run):
empty_backup = self.tmp / 'empty_backup'
empty_backup.mkdir()
registry = self._make_registry_with_manifests()
self.cm._restore_service_volumes(empty_backup, registry)
mock_run.assert_not_called()
@patch('config_manager.subprocess.run', side_effect=subprocess.TimeoutExpired('docker', 300))
def test_timeout_is_handled_gracefully(self, _mock_run):
registry = self._make_registry_with_manifests()
# should not raise
self.cm._restore_service_volumes(self.backup_path, registry)
@patch('config_manager.subprocess.run')
def test_continues_after_docker_exec_failure(self, mock_run):
call_count = [0]
def side_effect(cmd, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
return MagicMock(returncode=1, stderr=b'container not running')
return MagicMock(returncode=0, stderr=b'')
mock_run.side_effect = side_effect
registry = self._make_registry_with_manifests()
self.cm._restore_service_volumes(self.backup_path, registry)
self.assertEqual(call_count[0], 2)
class TestBackupConfigWithRegistry(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.cm = _make_cm(self.tmp)
@patch.object(ConfigManager, '_backup_service_volumes')
def test_backup_calls_volume_backup_when_registry_given(self, mock_bsv):
registry = _make_registry()
self.cm.backup_config(service_registry=registry)
mock_bsv.assert_called_once()
args = mock_bsv.call_args
self.assertIs(args[0][1], registry)
@patch.object(ConfigManager, '_backup_service_volumes')
def test_backup_skips_volume_backup_when_no_registry(self, mock_bsv):
self.cm.backup_config(service_registry=None)
mock_bsv.assert_not_called()
@patch.object(ConfigManager, '_backup_service_volumes')
def test_manifest_records_includes_service_data_true(self, _mock_bsv):
registry = _make_registry()
backup_id = self.cm.backup_config(service_registry=registry)
manifest = json.loads((self.cm.backup_dir / backup_id / 'manifest.json').read_text())
self.assertTrue(manifest['includes_service_data'])
@patch.object(ConfigManager, '_backup_service_volumes')
def test_manifest_records_includes_service_data_false(self, _mock_bsv):
backup_id = self.cm.backup_config(service_registry=None)
manifest = json.loads((self.cm.backup_dir / backup_id / 'manifest.json').read_text())
self.assertFalse(manifest['includes_service_data'])
class TestRestoreConfigWithRegistry(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp = Path(tempfile.mkdtemp())
self.cm = _make_cm(self.tmp)
# Create a minimal backup
backup_id = 'backup_20260101_000000'
bp = self.cm.backup_dir / backup_id
bp.mkdir(parents=True)
(bp / 'cell_config.json').write_text('{}')
manifest = {'backup_id': backup_id, 'timestamp': '2026-01-01T00:00:00', 'services': []}
(bp / 'manifest.json').write_text(json.dumps(manifest))
self.backup_id = backup_id
@patch.object(ConfigManager, '_restore_service_volumes')
def test_full_restore_calls_volume_restore_when_registry_given(self, mock_rsv):
registry = _make_registry()
self.cm.restore_config(self.backup_id, service_registry=registry)
mock_rsv.assert_called_once()
args = mock_rsv.call_args
self.assertIs(args[0][1], registry)
@patch.object(ConfigManager, '_restore_service_volumes')
def test_full_restore_skips_volume_restore_when_no_registry(self, mock_rsv):
self.cm.restore_config(self.backup_id, service_registry=None)
mock_rsv.assert_not_called()
@patch.object(ConfigManager, '_restore_service_volumes')
def test_selective_restore_never_calls_volume_restore(self, mock_rsv):
"""Volume restore is skipped for selective restores (service list specified)."""
registry = _make_registry()
self.cm.restore_config(self.backup_id, services=['email'], service_registry=registry)
mock_rsv.assert_not_called()
if __name__ == '__main__':
unittest.main()
+508
View File
@@ -0,0 +1,508 @@
"""
Unit tests for ServiceComposer.
All subprocess calls and filesystem writes are mocked no Docker daemon required.
"""
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch, mock_open, call
sys.path.insert(0, str(Path(__file__).parent.parent / 'api'))
from service_composer import ServiceComposer, _SECRET_RE
# ── Helpers ─────────────────────────────────────────────────────────────────
def _make_cm(identity=None, service_config=None) -> MagicMock:
cm = MagicMock()
ident = identity or {'cell_name': 'testcell', 'domain': 'cell.local', 'domain_mode': 'lan'}
cm.get_identity.return_value = ident
cm.get_effective_domain.return_value = ident.get('domain', 'cell.local')
cm.configs = {}
if service_config:
cm.configs.update(service_config)
return cm
def _make_manifest(service_id='myservice', kind='store', schema=None):
return {
'id': service_id,
'kind': kind,
'config_schema': schema or {
'port': {'type': 'integer', 'default': 8080},
'username': {'type': 'string', 'default': 'admin'},
},
'containers': [f'cell-{service_id}'],
}
def _composer(cm=None, data_dir=None):
if data_dir is None:
data_dir = '/fake/data'
return ServiceComposer(config_manager=cm or _make_cm(), data_dir=data_dir)
# ── Template rendering ────────────────────────────────────────────────────────
class TestRenderTemplate(unittest.TestCase):
def setUp(self):
self.cm = _make_cm(service_config={'myservice': {'port': 9090}})
self.composer = _composer(self.cm)
def test_substitutes_pic_cfg_uppercase(self):
manifest = _make_manifest()
template = 'PORT=${PIC_CFG_PORT}'
result = self.composer.render_template('myservice', manifest, template)
self.assertEqual(result, 'PORT=9090')
def test_substitutes_default_when_no_saved_config(self):
cm = _make_cm()
composer = _composer(cm)
manifest = _make_manifest()
template = 'USER=${PIC_CFG_USERNAME}'
result = composer.render_template('myservice', manifest, template)
self.assertEqual(result, 'USER=admin')
def test_pic_domain_substituted(self):
manifest = _make_manifest()
template = 'DOMAIN=${PIC_DOMAIN}'
result = self.composer.render_template('myservice', manifest, template)
self.assertIn('cell.local', result)
def test_pic_cell_name_substituted(self):
manifest = _make_manifest()
template = 'CELL=${PIC_CELL_NAME}'
result = self.composer.render_template('myservice', manifest, template)
self.assertIn('testcell', result)
def test_pic_service_id_substituted(self):
manifest = _make_manifest(service_id='myservice')
template = 'ID=${PIC_SERVICE_ID}'
result = self.composer.render_template('myservice', manifest, template)
self.assertEqual(result, 'ID=myservice')
def test_pic_secret_generated_and_substituted(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = _composer(data_dir=tmpdir)
manifest = _make_manifest()
template = 'PASS=${PIC_SECRET_DB_PASS}'
result = composer.render_template('myservice', manifest, template)
self.assertNotIn('${PIC_SECRET_DB_PASS}', result)
self.assertNotEqual(result, 'PASS=')
# Secret is a non-empty string
password = result.replace('PASS=', '')
self.assertTrue(len(password) > 8)
def test_pic_secret_stable_across_calls(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = _composer(data_dir=tmpdir)
manifest = _make_manifest()
template = 'P=${PIC_SECRET_MY_PASS}'
r1 = composer.render_template('myservice', manifest, template)
r2 = composer.render_template('myservice', manifest, template)
self.assertEqual(r1, r2)
def test_pic_secret_different_per_service(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = _composer(data_dir=tmpdir)
m1 = _make_manifest('svc1')
m2 = _make_manifest('svc2')
t = 'P=${PIC_SECRET_PASS}'
r1 = composer.render_template('svc1', m1, t)
r2 = composer.render_template('svc2', m2, t)
self.assertNotEqual(r1, r2)
def test_multiple_secrets_all_replaced(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = _composer(data_dir=tmpdir)
manifest = _make_manifest()
template = 'A=${PIC_SECRET_KEY_A}\nB=${PIC_SECRET_KEY_B}'
result = composer.render_template('myservice', manifest, template)
self.assertNotIn('${PIC_SECRET_', result)
def test_no_unknown_vars_left_from_schema(self):
# Use a fresh composer with no saved config so defaults apply
composer = _composer(_make_cm())
manifest = _make_manifest(schema={
'port': {'type': 'integer', 'default': 3000},
})
template = 'PORT=${PIC_CFG_PORT}\nOTHER=${PIC_CFG_UNKNOWN}'
result = composer.render_template('myservice', manifest, template)
# Known var substituted with default, unknown left alone (no crash)
self.assertIn('PORT=3000', result)
self.assertIn('${PIC_CFG_UNKNOWN}', result)
# ── Write compose file ────────────────────────────────────────────────────────
class TestWriteCompose(unittest.TestCase):
def test_writes_rendered_content_to_correct_path(self):
with tempfile.TemporaryDirectory() as tmpdir:
cm = _make_cm()
composer = ServiceComposer(config_manager=cm, data_dir=tmpdir)
manifest = _make_manifest()
template = 'PORT=${PIC_CFG_PORT}'
composer.write_compose('myservice', manifest, template)
expected_path = os.path.join(
tmpdir, 'services', 'myservice', 'docker-compose.yml'
)
self.assertTrue(os.path.exists(expected_path))
with open(expected_path) as f:
content = f.read()
self.assertIn('8080', content)
def test_has_compose_file_false_before_write(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = _composer(data_dir=tmpdir)
self.assertFalse(composer.has_compose_file('newservice'))
def test_has_compose_file_true_after_write(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir)
manifest = _make_manifest()
composer.write_compose('myservice', manifest, 'content: true')
self.assertTrue(composer.has_compose_file('myservice'))
def test_atomic_write_via_tmp_file(self):
"""If fsync fails, the compose file should not be partially written."""
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir)
manifest = _make_manifest()
# Should not raise even if fsync not available
composer.write_compose('myservice', manifest, 'content: yes')
path = os.path.join(tmpdir, 'services', 'myservice', 'docker-compose.yml')
self.assertTrue(os.path.exists(path))
# ── Secrets ───────────────────────────────────────────────────────────────────
class TestSecrets(unittest.TestCase):
def test_secrets_persisted_to_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = _composer(data_dir=tmpdir)
composer._get_or_create_secret('svc', 'PIC_SECRET_PASS')
secrets_path = os.path.join(tmpdir, 'service_secrets.json')
self.assertTrue(os.path.exists(secrets_path))
with open(secrets_path) as f:
data = json.load(f)
self.assertIn('svc', data)
self.assertIn('PIC_SECRET_PASS', data['svc'])
def test_clear_secrets_removes_service_entry(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = _composer(data_dir=tmpdir)
composer._get_or_create_secret('svc', 'PIC_SECRET_KEY')
composer._clear_secrets('svc')
secrets = composer._load_secrets()
self.assertNotIn('svc', secrets)
def test_clear_secrets_noop_when_no_secrets_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = _composer(data_dir=tmpdir)
# Should not raise
composer._clear_secrets('nonexistent')
def test_load_secrets_returns_empty_when_file_missing(self):
composer = _composer(data_dir='/nonexistent/path')
self.assertEqual(composer._load_secrets(), {})
# ── Subprocess execution ──────────────────────────────────────────────────────
class TestDockerComposeExecution(unittest.TestCase):
def _composer_with_compose_file(self, tmpdir, service_id='myservice'):
composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir)
svc_dir = os.path.join(tmpdir, 'services', service_id)
os.makedirs(svc_dir)
with open(os.path.join(svc_dir, 'docker-compose.yml'), 'w') as f:
f.write('services:\n app:\n image: nginx\n')
return composer
@patch('service_composer.subprocess.run')
def test_up_calls_docker_compose_up(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = self._composer_with_compose_file(tmpdir)
composer.up('myservice')
cmd = mock_run.call_args[0][0]
self.assertIn('up', cmd)
self.assertIn('-d', cmd)
self.assertIn('--project-name', cmd)
self.assertIn('pic-myservice', cmd)
@patch('service_composer.subprocess.run')
def test_down_calls_docker_compose_down(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = self._composer_with_compose_file(tmpdir)
composer.down('myservice')
cmd = mock_run.call_args[0][0]
self.assertIn('down', cmd)
@patch('service_composer.subprocess.run')
def test_down_with_purge_passes_volumes_flag(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = self._composer_with_compose_file(tmpdir)
composer.down('myservice', remove_volumes=True)
cmd = mock_run.call_args[0][0]
self.assertIn('--volumes', cmd)
@patch('service_composer.subprocess.run')
def test_restart_calls_docker_compose_restart(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = self._composer_with_compose_file(tmpdir)
composer.restart('myservice')
cmd = mock_run.call_args[0][0]
self.assertIn('restart', cmd)
@patch('service_composer.subprocess.run')
def test_status_parses_json_output(self, mock_run):
container_info = {'Name': 'myservice-app', 'State': 'running'}
mock_run.return_value = MagicMock(
returncode=0,
stdout=json.dumps(container_info),
stderr='',
)
with tempfile.TemporaryDirectory() as tmpdir:
composer = self._composer_with_compose_file(tmpdir)
result = composer.status('myservice')
self.assertTrue(result['ok'])
self.assertEqual(len(result['containers']), 1)
self.assertEqual(result['containers'][0]['Name'], 'myservice-app')
@patch('service_composer.subprocess.run')
def test_status_returns_empty_containers_on_bad_json(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='not json', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = self._composer_with_compose_file(tmpdir)
result = composer.status('myservice')
self.assertEqual(result['containers'], [])
def test_store_cmd_returns_error_when_no_compose_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
composer = _composer(data_dir=tmpdir)
result = composer.up('nonexistent')
self.assertFalse(result['ok'])
self.assertIn('No compose file', result['error'])
@patch('service_composer.subprocess.run')
def test_up_uses_600s_timeout(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = self._composer_with_compose_file(tmpdir)
composer.up('myservice')
_, kwargs = mock_run.call_args
self.assertGreaterEqual(kwargs.get('timeout', 0), 600)
@patch('service_composer.subprocess.run')
def test_run_returns_error_on_timeout(self, mock_run):
import subprocess
mock_run.side_effect = subprocess.TimeoutExpired(cmd=[], timeout=120)
composer = _composer()
result = composer._run(['docker', 'compose', 'up'])
self.assertFalse(result['ok'])
self.assertIn('timed out', result['error'])
@patch('service_composer.subprocess.run')
def test_run_returns_false_on_nonzero_exit(self, mock_run):
mock_run.return_value = MagicMock(returncode=1, stdout='', stderr='error msg')
composer = _composer()
result = composer._run(['docker', 'compose', 'up'])
self.assertFalse(result['ok'])
self.assertEqual(result['stderr'], 'error msg')
# ── Builtin lifecycle ─────────────────────────────────────────────────────────
class TestBuiltinLifecycle(unittest.TestCase):
@patch('service_composer.subprocess.run')
def test_restart_builtin_includes_container_names(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
composer = _composer()
composer.restart_builtin(['cell-radicale'])
cmd = mock_run.call_args[0][0]
self.assertIn('cell-radicale', cmd)
self.assertIn('restart', cmd)
@patch('service_composer.subprocess.run')
def test_status_builtin_includes_container_names(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
composer = _composer()
composer.status_builtin(['cell-mail', 'cell-rainloop'])
cmd = mock_run.call_args[0][0]
self.assertIn('cell-mail', cmd)
self.assertIn('cell-rainloop', cmd)
def test_restart_builtin_empty_list_returns_error(self):
composer = _composer()
result = composer.restart_builtin([])
self.assertFalse(result['ok'])
def test_status_builtin_empty_list_returns_error(self):
composer = _composer()
result = composer.status_builtin([])
self.assertFalse(result['ok'])
# ── Unified dispatch ──────────────────────────────────────────────────────────
class TestUnifiedDispatch(unittest.TestCase):
@patch('service_composer.subprocess.run')
def test_restart_service_builtin_uses_main_compose(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
composer = _composer()
manifest = _make_manifest(kind='builtin')
manifest['containers'] = ['cell-myservice']
composer.restart_service('myservice', manifest)
cmd = mock_run.call_args[0][0]
self.assertIn('cell-myservice', cmd)
# Main compose flag present
self.assertIn('-f', cmd)
@patch('service_composer.subprocess.run')
def test_restart_service_store_uses_per_service_compose(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir)
# Create compose file for the store service
svc_dir = os.path.join(tmpdir, 'services', 'storesvc')
os.makedirs(svc_dir)
with open(os.path.join(svc_dir, 'docker-compose.yml'), 'w') as f:
f.write('services:\n app:\n image: nginx\n')
manifest = _make_manifest('storesvc', kind='store')
composer.restart_service('storesvc', manifest)
cmd = mock_run.call_args[0][0]
self.assertIn('pic-storesvc', cmd)
# ── Remove ────────────────────────────────────────────────────────────────────
class TestServiceIdValidation(unittest.TestCase):
def test_valid_ids_accepted(self):
for sid in ('email', 'my-service', 'svc123', 'a1b2-c3'):
ServiceComposer._validate_service_id(sid) # should not raise
def test_dotdot_rejected(self):
with self.assertRaises(ValueError):
ServiceComposer._validate_service_id('..')
def test_dot_rejected(self):
with self.assertRaises(ValueError):
ServiceComposer._validate_service_id('.')
def test_slash_rejected(self):
with self.assertRaises(ValueError):
ServiceComposer._validate_service_id('evil/path')
def test_uppercase_rejected(self):
with self.assertRaises(ValueError):
ServiceComposer._validate_service_id('MyService')
def test_empty_string_rejected(self):
with self.assertRaises(ValueError):
ServiceComposer._validate_service_id('')
def test_newline_in_config_value_stripped(self):
"""A newline in a config value must not create a new YAML key (injection)."""
cm = _make_cm(service_config={'svc': {'port': '80\nnewline_attack: true'}})
composer = _composer(cm)
manifest = _make_manifest(schema={'port': {'type': 'string', 'default': '80'}})
result = composer.render_template('svc', manifest, 'PORT=${PIC_CFG_PORT}')
# The newline is stripped — 'newline_attack' is concatenated, not a separate YAML key
self.assertNotIn('\n', result)
class TestRemove(unittest.TestCase):
@patch('service_composer.subprocess.run')
def test_remove_deletes_compose_file(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir)
svc_dir = os.path.join(tmpdir, 'services', 'oldsvc')
os.makedirs(svc_dir)
compose_file = os.path.join(svc_dir, 'docker-compose.yml')
with open(compose_file, 'w') as f:
f.write('services: {}')
composer.remove('oldsvc', purge_data=False)
self.assertFalse(os.path.exists(compose_file))
@patch('service_composer.subprocess.run')
def test_remove_purge_deletes_service_directory(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir)
svc_dir = os.path.join(tmpdir, 'services', 'purgesvc')
os.makedirs(svc_dir)
with open(os.path.join(svc_dir, 'docker-compose.yml'), 'w') as f:
f.write('services: {}')
with open(os.path.join(svc_dir, 'data.txt'), 'w') as f:
f.write('important data')
composer.remove('purgesvc', purge_data=True)
self.assertFalse(os.path.exists(svc_dir))
@patch('service_composer.subprocess.run')
def test_remove_purge_clears_secrets(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir)
composer._get_or_create_secret('purgesvc', 'PIC_SECRET_KEY')
svc_dir = os.path.join(tmpdir, 'services', 'purgesvc')
os.makedirs(svc_dir)
with open(os.path.join(svc_dir, 'docker-compose.yml'), 'w') as f:
f.write('services: {}')
composer.remove('purgesvc', purge_data=True)
secrets = composer._load_secrets()
self.assertNotIn('purgesvc', secrets)
# ── Parse ps json ─────────────────────────────────────────────────────────────
class TestParsePsJson(unittest.TestCase):
def test_single_json_object(self):
line = json.dumps({'Name': 'c1', 'State': 'running'})
result = ServiceComposer._parse_ps_json(line)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['Name'], 'c1')
def test_multiple_json_lines(self):
lines = '\n'.join([
json.dumps({'Name': 'c1'}),
json.dumps({'Name': 'c2'}),
])
result = ServiceComposer._parse_ps_json(lines)
self.assertEqual(len(result), 2)
def test_ignores_blank_lines(self):
lines = '\n'.join([json.dumps({'Name': 'c1'}), '', json.dumps({'Name': 'c2'})])
result = ServiceComposer._parse_ps_json(lines)
self.assertEqual(len(result), 2)
def test_returns_empty_list_for_empty_output(self):
self.assertEqual(ServiceComposer._parse_ps_json(''), [])
def test_bad_json_lines_skipped(self):
lines = '\n'.join(['not json', json.dumps({'Name': 'c1'})])
result = ServiceComposer._parse_ps_json(lines)
self.assertEqual(len(result), 1)
if __name__ == '__main__':
unittest.main()
+347
View File
@@ -0,0 +1,347 @@
"""
Unit tests for ServiceRegistry.
Tests load actual built-in manifests from api/services/builtins/ and verify
that the registry merges config correctly, returns expected routes/backup plans,
and handles missing manifests gracefully.
"""
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
sys.path.insert(0, str(Path(__file__).parent.parent / 'api'))
from service_registry import ServiceRegistry, _BUILTINS_DIR
def _make_cm(configs: dict = None) -> MagicMock:
cm = MagicMock()
cm.configs = configs or {}
cm.get_installed_services.return_value = {}
return cm
class TestBuiltinManifests(unittest.TestCase):
"""Verify the built-in manifest files are valid JSON with required fields."""
def _load(self, service_id: str) -> dict:
path = os.path.join(_BUILTINS_DIR, service_id, 'manifest.json')
self.assertTrue(os.path.exists(path), f'Missing manifest for {service_id}')
with open(path) as f:
return json.load(f)
def _assert_required(self, manifest: dict):
for field in ('schema_version', 'id', 'name', 'kind', 'capabilities'):
self.assertIn(field, manifest, f'Missing required field: {field}')
caps = manifest['capabilities']
for cap in ('has_subdomain', 'has_accounts', 'has_admin_config',
'has_storage', 'has_egress', 'has_api_hooks'):
self.assertIn(cap, caps, f'Missing capability flag: {cap}')
def test_email_manifest_valid(self):
m = self._load('email')
self._assert_required(m)
self.assertEqual(m['id'], 'email')
self.assertEqual(m['kind'], 'builtin')
self.assertIn('mail', [m.get('subdomain')] + (m.get('extra_subdomains') or []))
self.assertIn('webmail', m.get('extra_subdomains', []))
self.assertEqual(m['capabilities']['has_accounts'], True)
def test_calendar_manifest_valid(self):
m = self._load('calendar')
self._assert_required(m)
self.assertEqual(m['id'], 'calendar')
self.assertEqual(m['subdomain'], 'calendar')
def test_files_manifest_valid(self):
m = self._load('files')
self._assert_required(m)
self.assertEqual(m['id'], 'files')
self.assertIn('webdav', m.get('extra_subdomains', []))
def test_all_builtins_have_backup_volumes(self):
for svc_id in ('email', 'calendar', 'files'):
m = self._load(svc_id)
volumes = m.get('backup', {}).get('volumes')
self.assertTrue(volumes, f'{svc_id}: backup.volumes must not be empty')
for vol in volumes:
for field in ('container', 'path', 'name'):
self.assertIn(field, vol,
f'{svc_id}: backup volume entry missing {field!r}')
def test_all_builtins_have_peer_config_template(self):
for svc_id in ('email', 'calendar', 'files'):
m = self._load(svc_id)
self.assertTrue(m.get('peer_config_template'),
f'{svc_id}: peer_config_template must not be empty')
def test_config_schema_defaults_are_correct_types(self):
for svc_id in ('email', 'calendar', 'files'):
m = self._load(svc_id)
for field, spec in (m.get('config_schema') or {}).items():
if 'default' in spec:
if spec['type'] == 'integer':
self.assertIsInstance(
spec['default'], int,
f'{svc_id}.{field}: integer default must be int')
elif spec['type'] == 'string':
self.assertIsInstance(
spec['default'], str,
f'{svc_id}.{field}: string default must be str')
class TestServiceRegistryListAll(unittest.TestCase):
def setUp(self):
self.cm = _make_cm()
self.registry = ServiceRegistry(self.cm)
def test_lists_three_builtins(self):
services = self.registry.list_all()
ids = [s['id'] for s in services]
self.assertIn('email', ids)
self.assertIn('calendar', ids)
self.assertIn('files', ids)
def test_builtins_come_before_store_services(self):
self.cm.get_installed_services.return_value = {
'zstore': {'manifest': {
'id': 'zstore', 'name': 'Z Store', 'kind': 'store',
'capabilities': {}, 'config_schema': {}
}}
}
services = self.registry.list_all()
ids = [s['id'] for s in services]
# builtins (email, calendar, files) should all appear before zstore
for builtin_id in ('email', 'calendar', 'files'):
self.assertLess(ids.index(builtin_id), ids.index('zstore'))
def test_each_service_has_config_key(self):
for svc in self.registry.list_all():
self.assertIn('config', svc, f'{svc["id"]} missing config key')
def test_no_duplicate_ids(self):
services = self.registry.list_all()
ids = [s['id'] for s in services]
self.assertEqual(len(ids), len(set(ids)))
class TestServiceRegistryConfigMerge(unittest.TestCase):
def test_defaults_used_when_no_saved_config(self):
cm = _make_cm({'calendar': {}})
reg = ServiceRegistry(cm)
svc = reg.get('calendar')
self.assertEqual(svc['config']['port'], 5232)
def test_saved_config_overrides_defaults(self):
cm = _make_cm({'calendar': {'port': 9999}})
reg = ServiceRegistry(cm)
svc = reg.get('calendar')
self.assertEqual(svc['config']['port'], 9999)
def test_unknown_saved_keys_excluded(self):
cm = _make_cm({'calendar': {'port': 5232, 'unknown_field': 'x'}})
reg = ServiceRegistry(cm)
svc = reg.get('calendar')
self.assertNotIn('unknown_field', svc['config'])
def test_partial_override_keeps_other_defaults(self):
cm = _make_cm({'email': {'smtp_port': 2525}})
reg = ServiceRegistry(cm)
svc = reg.get('email')
self.assertEqual(svc['config']['smtp_port'], 2525)
self.assertEqual(svc['config']['imap_port'], 993)
class TestServiceRegistryGet(unittest.TestCase):
def setUp(self):
self.cm = _make_cm()
self.registry = ServiceRegistry(self.cm)
def test_returns_none_for_unknown_id(self):
self.assertIsNone(self.registry.get('nonexistent_service'))
def test_returns_builtin_by_id(self):
svc = self.registry.get('email')
self.assertIsNotNone(svc)
self.assertEqual(svc['id'], 'email')
def test_returns_store_service_from_installed(self):
self.cm.get_installed_services.return_value = {
'mywiki': {'manifest': {
'id': 'mywiki', 'name': 'Wiki', 'kind': 'store',
'capabilities': {}, 'config_schema': {}
}}
}
svc = self.registry.get('mywiki')
self.assertIsNotNone(svc)
self.assertEqual(svc['id'], 'mywiki')
class TestServiceRegistryGetCaddyRoutes(unittest.TestCase):
def setUp(self):
self.cm = _make_cm()
self.registry = ServiceRegistry(self.cm)
def test_all_builtins_appear_in_routes(self):
routes = self.registry.get_caddy_routes()
route_ids = [r['service_id'] for r in routes]
for svc_id in ('email', 'calendar', 'files'):
self.assertIn(svc_id, route_ids)
def test_email_route_has_webmail_extra_subdomain(self):
routes = self.registry.get_caddy_routes()
email_route = next(r for r in routes if r['service_id'] == 'email')
self.assertIn('webmail', email_route['extra_subdomains'])
def test_files_route_has_webdav_extra_subdomain(self):
routes = self.registry.get_caddy_routes()
files_route = next(r for r in routes if r['service_id'] == 'files')
self.assertIn('webdav', files_route['extra_subdomains'])
def test_services_without_subdomain_excluded(self):
self.cm.get_installed_services.return_value = {
'nosubdomain': {'manifest': {
'id': 'nosubdomain', 'name': 'NoSub', 'kind': 'store',
'capabilities': {'has_subdomain': False},
'config_schema': {}
}}
}
routes = self.registry.get_caddy_routes()
self.assertNotIn('nosubdomain', [r['service_id'] for r in routes])
class TestServiceRegistryGetBackupPlan(unittest.TestCase):
def setUp(self):
self.cm = _make_cm()
self.registry = ServiceRegistry(self.cm)
def test_all_builtins_in_backup_plan(self):
plan = self.registry.get_backup_plan()
plan_ids = [p['service_id'] for p in plan]
for svc_id in ('email', 'calendar', 'files'):
self.assertIn(svc_id, plan_ids)
def test_email_backup_includes_maildata_volume(self):
plan = self.registry.get_backup_plan()
email_plan = next(p for p in plan if p['service_id'] == 'email')
names = [v['name'] for v in email_plan['volumes']]
self.assertIn('maildata', names)
vol = next(v for v in email_plan['volumes'] if v['name'] == 'maildata')
self.assertEqual(vol['container'], 'cell-mail')
self.assertEqual(vol['path'], '/var/mail')
def test_calendar_backup_includes_radicale_volume(self):
plan = self.registry.get_backup_plan()
cal_plan = next(p for p in plan if p['service_id'] == 'calendar')
names = [v['name'] for v in cal_plan['volumes']]
self.assertIn('radicale_data', names)
vol = next(v for v in cal_plan['volumes'] if v['name'] == 'radicale_data')
self.assertEqual(vol['container'], 'cell-radicale')
self.assertEqual(vol['path'], '/data')
def test_files_backup_includes_both_volumes(self):
plan = self.registry.get_backup_plan()
files_plan = next(p for p in plan if p['service_id'] == 'files')
names = {v['name'] for v in files_plan['volumes']}
self.assertIn('filegator', names)
self.assertIn('files', names)
def test_service_without_storage_excluded(self):
self.cm.get_installed_services.return_value = {
'nostorage': {'manifest': {
'id': 'nostorage', 'name': 'NoStorage', 'kind': 'store',
'capabilities': {'has_storage': False},
'config_schema': {}
}}
}
plan = self.registry.get_backup_plan()
self.assertNotIn('nostorage', [p['service_id'] for p in plan])
class TestServiceRegistryGetPeerServiceInfo(unittest.TestCase):
def setUp(self):
self.cm = _make_cm({'calendar': {}})
self.registry = ServiceRegistry(self.cm)
def test_fills_domain_placeholder(self):
info = self.registry.get_peer_service_info(
'calendar', 'alice', 'example.com', {})
self.assertIn('example.com', info['caldav_url'])
def test_fills_peer_username(self):
info = self.registry.get_peer_service_info(
'calendar', 'bob', 'example.com', {})
self.assertIn('bob', info['caldav_url'])
def test_fills_credentials(self):
info = self.registry.get_peer_service_info(
'calendar', 'alice', 'example.com', {'password': 'secret123'})
self.assertEqual(info['password'], 'secret123')
def test_returns_none_for_unknown_service(self):
result = self.registry.get_peer_service_info(
'unknown_svc', 'alice', 'example.com', {})
self.assertIsNone(result)
def test_username_url_encoded_in_peer_url(self):
info = self.registry.get_peer_service_info(
'calendar', 'alice/../../etc', 'example.com', {})
self.assertNotIn('../', info['caldav_url'])
self.assertIn('alice%2F', info['caldav_url'])
def test_domain_not_altered_by_username(self):
info = self.registry.get_peer_service_info(
'calendar', 'alice@evil.com', 'legit.example.com', {})
self.assertIn('legit.example.com', info['caldav_url'])
class TestServiceRegistryConfigMergeTypeCoercion(unittest.TestCase):
def test_string_in_config_coerced_to_int(self):
cm = _make_cm({'calendar': {'port': '9999'}})
reg = ServiceRegistry(cm)
svc = reg.get('calendar')
self.assertIsInstance(svc['config']['port'], int)
self.assertEqual(svc['config']['port'], 9999)
def test_unconvertible_value_falls_back_to_default(self):
cm = _make_cm({'calendar': {'port': 'not_a_number'}})
reg = ServiceRegistry(cm)
svc = reg.get('calendar')
self.assertEqual(svc['config']['port'], 5232)
class TestServiceRegistryWithBrokenManifest(unittest.TestCase):
"""Registry must not crash when a manifest file is corrupt or missing."""
def test_missing_builtins_dir_returns_empty(self):
with patch('service_registry._BUILTINS_DIR', '/nonexistent/path'):
reg = ServiceRegistry(_make_cm())
self.assertEqual(reg.list_all(), [])
def test_malformed_json_manifest_skipped(self):
with tempfile.TemporaryDirectory() as tmpdir:
bad_dir = os.path.join(tmpdir, 'bad_svc')
os.makedirs(bad_dir)
with open(os.path.join(bad_dir, 'manifest.json'), 'w') as f:
f.write('this is not json {{{')
with patch('service_registry._BUILTINS_DIR', tmpdir):
reg = ServiceRegistry(_make_cm())
# Should not raise; just return empty list
result = reg.list_all()
self.assertEqual(result, [])
if __name__ == '__main__':
unittest.main()