Files
pic/api/service_registry.py
T
roof 18b50d08c1
Unit Tests / test (push) Successful in 11m31s
fix: post-Phase-0 corrections — data-dir bind mounts, reserved subdomains, list_active()
Three related fixes discovered during review of Phase 0 and Phase 1 manifests:

1. validate_rendered_compose(): add allowed_data_dir param. After ${PIC_DATA_DIR}
   substitution, compose templates produce absolute paths; without this the
   validator would reject every service install.  ServiceComposer.write_compose()
   now passes its resolved data_dir so only the designated data directory is
   exempt — /etc, /proc, docker.sock etc. still blocked.

2. _RESERVED_SUBDOMAINS: remove service-level subdomains (mail, calendar, files,
   webdav, webmail). The reserved list should protect PIC infrastructure endpoints
   (api, webui, admin) — not service subdomains that official store services
   (calendar, files, webmail) must be allowed to claim.  Aligns with the
   existing _RESERVED_SUBS in service_registry.py.

3. ServiceRegistry.list_active(): new method returning only installed store
   services (no builtins). This is the forward-looking API that Phase 2 will
   make the primary read path once builtins are deleted. Adding it now unblocks
   the QA agent's test_optional_services_feature.py which was already testing
   the expected Phase 2 behaviour.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 07:35:43 -04:00

233 lines
9.2 KiB
Python

"""
ServiceRegistry — single source of truth for all PIC services.
Merges three layers:
1. Manifest defaults (config_schema.*.default)
2. Admin-saved config from ConfigManager (cell_config.json)
3. Runtime state from installed store records
All consumers (CaddyManager, backup, peer services endpoint) read from here
rather than hardcoding service names or subdomains.
"""
import json
import logging
import os
import re
from typing import Dict, List, Optional
from urllib.parse import quote as _urlquote
logger = logging.getLogger('picell')
# Built-ins are baked into the container image at build time.
# Do not bind-mount this path read-write in docker-compose.
_BUILTINS_DIR = os.path.join(os.path.dirname(__file__), 'services', 'builtins')
_SUBDOMAIN_RE = re.compile(r'^[a-z][a-z0-9-]{0,30}$')
_BACKEND_RE = re.compile(r'^[A-Za-z0-9._-]+:\d{1,5}$')
_RESERVED_SUBS = frozenset({'api', 'webui', 'admin', 'www', 'ns1', 'ns2', 'git', 'registry', 'install'})
class ServiceRegistry:
def __init__(self, config_manager):
self._cm = config_manager
# ── Manifest loading ──────────────────────────────────────────────────
def _load_manifest(self, path: str) -> Optional[Dict]:
try:
with open(path) as f:
return json.load(f)
except Exception as e:
logger.warning('ServiceRegistry: failed to load manifest %s: %s', path, e)
return None
def _builtin_ids(self) -> List[str]:
if not os.path.isdir(_BUILTINS_DIR):
return []
return sorted(
d for d in os.listdir(_BUILTINS_DIR)
if os.path.isfile(os.path.join(_BUILTINS_DIR, d, 'manifest.json'))
)
def _builtin_manifest(self, service_id: str) -> Optional[Dict]:
return self._load_manifest(
os.path.join(_BUILTINS_DIR, service_id, 'manifest.json')
)
# ── Config merging ────────────────────────────────────────────────────
_TYPE_COERCIONS = {'integer': int, 'string': str, 'boolean': bool}
def _merged_config(self, manifest: Dict) -> Dict:
"""Return manifest defaults overridden by admin-saved values, type-coerced."""
svc_id = manifest.get('id', '')
saved = self._cm.configs.get(svc_id, {})
schema = manifest.get('config_schema') or {}
merged = {k: v['default'] for k, v in schema.items() if 'default' in v}
for k, spec in schema.items():
if k not in saved:
continue
raw = saved[k]
coerce = self._TYPE_COERCIONS.get(spec.get('type', ''))
if coerce is not None:
try:
raw = coerce(raw)
except (TypeError, ValueError):
raw = merged.get(k, raw)
merged[k] = raw
return merged
# ── Public API ────────────────────────────────────────────────────────
def get(self, service_id: str) -> Optional[Dict]:
"""Return manifest + merged config for one service, or None if unknown."""
manifest = self._builtin_manifest(service_id)
if manifest is None:
record = self._cm.get_installed_services().get(service_id)
if record:
manifest = record.get('manifest')
if not manifest:
return None
return {**manifest, 'config': self._merged_config(manifest)}
def list_active(self) -> List[Dict]:
"""Return only installed store services, each with merged config.
Unlike list_all(), builtins are excluded. Use this wherever the
intent is "what has the admin chosen to run?" rather than "everything
the registry knows about."
"""
results = []
for _svc_id, record in self._cm.get_installed_services().items():
manifest = record.get('manifest') or {}
if manifest.get('id'):
results.append({**manifest, 'config': self._merged_config(manifest)})
return results
def list_all(self) -> List[Dict]:
"""
Return all services — builtins first, then installed store services —
each with merged config attached as the 'config' key.
"""
results: List[Dict] = []
seen: set = set()
for svc_id in self._builtin_ids():
manifest = self._builtin_manifest(svc_id)
if manifest:
results.append({**manifest, 'config': self._merged_config(manifest)})
seen.add(svc_id)
for svc_id, record in self._cm.get_installed_services().items():
if svc_id in seen:
continue
manifest = record.get('manifest') or {}
if manifest.get('id'):
results.append({**manifest, 'config': self._merged_config(manifest)})
return results
def get_caddy_routes(self) -> List[Dict]:
"""
Return routing info for all services that have a subdomain.
Used by CaddyManager to build service blocks without hardcoding.
Values are validated here as a chokepoint so Caddyfile/DNS builders
can safely interpolate them regardless of how manifests reached disk.
"""
routes = []
for svc in self.list_all():
caps = svc.get('capabilities') or {}
if not caps.get('has_subdomain'):
continue
sub = svc.get('subdomain', '')
bknd = svc.get('backend', '')
if not sub or not bknd:
continue
svc_id = svc.get('id', '?')
if not _SUBDOMAIN_RE.match(sub) or sub in _RESERVED_SUBS:
logger.warning('ServiceRegistry: skipping %s — invalid/reserved subdomain %r', svc_id, sub)
continue
if not _BACKEND_RE.match(bknd):
logger.warning('ServiceRegistry: skipping %s — invalid backend %r', svc_id, bknd)
continue
extra_subs = [
s for s in (svc.get('extra_subdomains') or [])
if isinstance(s, str) and _SUBDOMAIN_RE.match(s) and s not in _RESERVED_SUBS
]
extra_backends = {
k: v for k, v in (svc.get('extra_backends') or {}).items()
if (isinstance(k, str) and _SUBDOMAIN_RE.match(k) and k not in _RESERVED_SUBS
and isinstance(v, str) and _BACKEND_RE.match(v))
}
routes.append({
'service_id': svc_id,
'subdomain': sub,
'backend': bknd,
'extra_subdomains': extra_subs,
'extra_backends': extra_backends,
})
return routes
def get_backup_plan(self) -> List[Dict]:
"""
Return backup declarations for all services that have storage.
Used by the backup system instead of hardcoded file lists.
Each entry:
service_id — service identifier
volumes — list of {container, path, name} for docker-exec streaming
config_paths — host-relative paths copied directly (config files)
"""
plan = []
for svc in self.list_all():
caps = svc.get('capabilities') or {}
if not caps.get('has_storage'):
continue
backup = svc.get('backup') or {}
volumes = backup.get('volumes') or []
config_paths = backup.get('config_paths') or []
if not volumes and not config_paths:
continue
plan.append({
'service_id': svc['id'],
'volumes': volumes,
'config_paths': config_paths,
})
return plan
def get_peer_service_info(self, service_id: str, peer_username: str,
domain: str, credentials: Dict) -> Optional[Dict]:
"""
Fill peer_config_template for one service+peer combination.
credentials: dict of {field_name: value} for that peer+service.
Returns None if service unknown or has no peer template.
"""
svc = self.get(service_id)
if not svc:
return None
template = svc.get('peer_config_template')
if not template:
return None
# URL-safe peer username (safe='') — prevents path traversal in CalDAV/WebDAV URLs
safe_username = _urlquote(peer_username, safe='')
result = {}
for key, raw in template.items():
val = raw
val = val.replace('{domain}', domain)
val = val.replace('{peer.username}', safe_username)
for field, cred_val in credentials.items():
val = val.replace(
'{peer.service_credentials.' + service_id + '.' + field + '}',
str(cred_val) if cred_val is not None else '',
)
cfg = svc.get('config') or {}
for cfg_key, cfg_val in cfg.items():
val = val.replace('{config.' + cfg_key + '}', str(cfg_val) if cfg_val is not None else '')
result[key] = val
return result