Files
pic/api/service_store_manager.py
T
roof f7bb2cc962
Unit Tests / test (push) Successful in 11m25s
fix: allow first-party store service subdomains and registry images
Two manifest validation bugs blocked all store service installs:

1. service_store_manager.RESERVED_SUBDOMAINS included 'mail', which
   prevented the email service from using its required subdomain.
   Removed mail/calendar/files/webmail — they belong to official PIC
   store services and must be claimable by them.

2. manifest_validator required @sha256 digest pins on ALL images,
   including first-party git.pic.ngo/roof/* images that the PIC team
   builds and controls. service_store_manager._validate_manifest already
   only warned for first-party images; the secondary validator was
   stricter than intended, causing a hard reject on :latest tags.
   Aligned to warn-not-reject for first-party; malformed digests (when
   provided) are still a hard error.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-30 03:09:41 -04:00

439 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Service Store Manager for Personal Internet Cell.
Manages installation, removal, and lifecycle of third-party services from the
PIC service store index. Each installed service runs as a Docker container
declared in a compose override file and has:
- An allocated IP in the service pool (172.20.0.20254 by default)
- Optional iptables FORWARD rules declared in its manifest
- Optional Caddy reverse-proxy route declared in its manifest
"""
import logging
import os
import re
import threading
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import json
import requests
from base_service_manager import BaseServiceManager
from manifest_validator import validate_manifest, validate_provision_hook
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
INDEX_URL_DEFAULT = (
'https://git.pic.ngo/roof/pic-services/raw/branch/main/index.json'
)
MANIFEST_URL_TPL = (
'https://git.pic.ngo/roof/pic-services/raw/branch/main/services/{id}/manifest.json'
)
TEMPLATE_URL_TPL = (
'https://git.pic.ngo/roof/pic-services/raw/branch/main/services/{id}/compose-template.yml'
)
IMAGE_ALLOWLIST_RE = re.compile(
r'^git\.pic\.ngo/roof/[a-z0-9._/-]+(:[a-zA-Z0-9._-]+)?(@sha256:[a-f0-9]{64})?$'
)
# Images from well-known vendors that pre-date digest pinning in PIC.
# These are allowed to ship without a @sha256 digest; all others require one
# or must come from git.pic.ngo/roof/*.
TRUSTED_IMAGES_NO_DIGEST = frozenset({
'mailserver/docker-mailserver',
'tomsquest/docker-radicale',
'bytemark/webdav',
'filegator/filegator',
'hardware/rainloop',
})
FORBIDDEN_MOUNTS = frozenset([
'/', '/etc', '/var', '/proc', '/sys', '/dev', '/app', '/run', '/boot',
])
RESERVED_SUBDOMAINS = frozenset([
'api', 'webui', 'admin', 'www', 'ns1', 'ns2',
'git', 'registry', 'install',
# mail, calendar, files, webmail are intentionally absent:
# they are claimed by official PIC store services.
])
ENV_VALUE_RE = re.compile(r'^[A-Za-z0-9._@:/+\-= ]*$')
SUBDOMAIN_RE = re.compile(r'^[a-z][a-z0-9-]{0,30}$')
BACKEND_RE = re.compile(r'^[A-Za-z0-9._-]+:\d{1,5}$')
# ---------------------------------------------------------------------------
# ServiceStoreManager
# ---------------------------------------------------------------------------
class ServiceStoreManager(BaseServiceManager):
"""Manages service store: install, remove, and list available/installed services."""
def __init__(self, config_manager, caddy_manager, container_manager,
data_dir: str = '', config_dir: str = '',
service_composer=None, egress_manager=None):
super().__init__('service_store', data_dir, config_dir)
self.config_manager = config_manager
self.caddy_manager = caddy_manager
self.container_manager = container_manager
self.service_composer = service_composer
self.egress_manager = egress_manager
self.compose_override = os.environ.get(
'COMPOSE_SERVICES_PATH', '/app/docker-compose.services.yml'
)
self.index_url = os.environ.get('PIC_STORE_INDEX_URL', INDEX_URL_DEFAULT)
self._lock = threading.Lock()
self._index_cache: Optional[list] = None
self._index_cache_time: float = 0
self._cache_ttl: int = 300 # 5 min
# ── BaseServiceManager required ───────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
installed = self.config_manager.get_installed_services()
return {
'service': self.service_name,
'running': True,
'installed_count': len(installed),
}
def test_connectivity(self) -> Dict[str, Any]:
try:
resp = requests.get(self.index_url, timeout=5)
return {'success': resp.status_code == 200}
except Exception as e:
return {'success': False, 'error': str(e)}
# ── Manifest validation ───────────────────────────────────────────────
@staticmethod
def _validate_manifest(m: dict) -> Tuple[bool, List[str]]:
"""Validate a service manifest. Returns (ok, [errors])."""
errors: List[str] = []
# Required top-level fields
for field in ('id', 'name', 'version', 'author', 'image', 'container_name'):
if not m.get(field):
errors.append(f'Missing required field: {field}')
# Image allowlist
image = m.get('image', '')
if image and not IMAGE_ALLOWLIST_RE.match(image):
errors.append(
f'image must match git.pic.ngo/roof/* pattern, got: {image}'
)
elif image:
# Warn when a digest pin is absent so operators know exact-version
# tracking is not guaranteed. Images in TRUSTED_IMAGES_NO_DIGEST
# and images from our own git.pic.ngo/roof/* registry (which we
# build and tag) get warnings rather than hard errors; any other
# image that somehow passes the allowlist gets a hard error.
if '@sha256:' not in image:
image_base = image.split(':')[0].split('@')[0]
is_own_registry = image_base.startswith('git.pic.ngo/roof/')
if image_base in TRUSTED_IMAGES_NO_DIGEST or is_own_registry:
logger.warning('image %s has no digest pin', image)
else:
errors.append(
f'image {image!r} must include a @sha256:<digest> pin'
)
# Volume mount safety
for vol in m.get('volumes', []):
mount = vol.get('mount', '')
if mount in FORBIDDEN_MOUNTS:
errors.append(f'Forbidden volume mount: {mount}')
elif mount.startswith('/home/roof/pic'):
errors.append(f'Volume mount cannot be a prefix of /home/roof/pic: {mount}')
# iptables rules
for rule in m.get('iptables_rules', []):
if rule.get('type') != 'ACCEPT':
errors.append(
f'iptables_rules[].type must be ACCEPT, got: {rule.get("type")}'
)
if rule.get('dest_ip') != '${SERVICE_IP}':
errors.append(
f'iptables_rules[].dest_ip must be exactly ${{SERVICE_IP}}, '
f'got: {rule.get("dest_ip")}'
)
port = rule.get('dest_port')
if not isinstance(port, int) or not (1 <= port <= 65535):
errors.append(
f'iptables_rules[].dest_port must be an integer 1-65535, got: {port}'
)
proto = rule.get('proto', 'tcp')
if proto not in ('tcp', 'udp'):
errors.append(
f'iptables_rules[].proto must be tcp or udp, got: {proto}'
)
# Legacy caddy_route dict subdomain (for store manifests using the old format)
caddy_route = m.get('caddy_route') or {}
if isinstance(caddy_route, dict):
legacy_sub = caddy_route.get('subdomain', '')
else:
legacy_sub = ''
if legacy_sub:
if legacy_sub in RESERVED_SUBDOMAINS:
errors.append(f'caddy_route.subdomain is reserved: {legacy_sub}')
elif not SUBDOMAIN_RE.match(legacy_sub):
errors.append(
f'caddy_route.subdomain must match ^[a-z][a-z0-9-]{{0,30}}$, '
f'got: {legacy_sub}'
)
# Top-level subdomain + backend (consumed by ServiceRegistry.get_caddy_routes)
subdomain = m.get('subdomain', '')
if subdomain:
if subdomain in RESERVED_SUBDOMAINS:
errors.append(f'subdomain is reserved: {subdomain}')
elif not SUBDOMAIN_RE.match(subdomain):
errors.append(
f'subdomain must match ^[a-z][a-z0-9-]{{0,30}}$, got: {subdomain}'
)
backend = m.get('backend', '')
if backend and not BACKEND_RE.match(backend):
errors.append(f'backend must be host:port (e.g. cell-foo:8080), got: {backend}')
for sub in m.get('extra_subdomains') or []:
if not isinstance(sub, str):
errors.append('extra_subdomains entries must be strings')
elif sub in RESERVED_SUBDOMAINS:
errors.append(f'extra_subdomains entry is reserved: {sub}')
elif not SUBDOMAIN_RE.match(sub):
errors.append(
f'extra_subdomains entry must match ^[a-z][a-z0-9-]{{0,30}}$, got: {sub}'
)
for sub, bknd in (m.get('extra_backends') or {}).items():
if not isinstance(sub, str) or not SUBDOMAIN_RE.match(sub):
errors.append(
f'extra_backends key must match ^[a-z][a-z0-9-]{{0,30}}$, got: {sub!r}'
)
elif sub in RESERVED_SUBDOMAINS:
errors.append(f'extra_backends key is reserved: {sub}')
if not isinstance(bknd, str) or not BACKEND_RE.match(bknd):
errors.append(
f'extra_backends[{sub!r}] value must be host:port, got: {bknd!r}'
)
# Env value safety
for env_entry in m.get('env', []):
val = str(env_entry.get('value', ''))
if not ENV_VALUE_RE.match(val):
errors.append(
f'env[].value contains disallowed characters: {val!r}'
)
# Security layer: delegate to manifest_validator for cap_add, backend
# denylist, provision_hook, reserved container names, and kind guard.
ok, sec_errs = validate_manifest(m)
if not ok:
errors.extend(sec_errs)
return (len(errors) == 0, errors)
# ── Index / manifest fetching ─────────────────────────────────────────
def fetch_index(self) -> list:
"""Fetch and cache the service index."""
import time
_SIZE_LIMIT = 256 * 1024
now = time.time()
if self._index_cache is not None and (now - self._index_cache_time) < self._cache_ttl:
return self._index_cache
try:
resp = requests.get(self.index_url, timeout=10, stream=True)
resp.raise_for_status()
content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True)
if len(content) > _SIZE_LIMIT:
raise ValueError('Index response exceeds 256 KB limit')
data = json.loads(content)
self._index_cache = data if isinstance(data, list) else data.get('services', [])
self._index_cache_time = now
return self._index_cache
except Exception as e:
logger.warning(f'fetch_index failed: {e}')
return self._index_cache or []
def _fetch_manifest(self, service_id: str) -> dict:
"""Fetch a service manifest by ID."""
_SIZE_LIMIT = 256 * 1024
url = MANIFEST_URL_TPL.format(id=service_id)
resp = requests.get(url, timeout=10, stream=True)
resp.raise_for_status()
content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True)
if len(content) > _SIZE_LIMIT:
raise ValueError(
f'Manifest response for {service_id} exceeds 256 KB limit'
)
return json.loads(content)
def _fetch_template(self, service_id: str, manifest: dict) -> str:
"""Fetch the compose template for a service."""
_SIZE_LIMIT = 256 * 1024
url = TEMPLATE_URL_TPL.format(id=service_id)
resp = requests.get(url, timeout=10, stream=True)
resp.raise_for_status()
content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True)
if len(content) > _SIZE_LIMIT:
raise ValueError(f'Compose template for {service_id} exceeds 256 KB limit')
return content.decode('utf-8')
# ── Core operations ───────────────────────────────────────────────────
def install(self, service_id: str) -> dict:
"""Install a service from the store."""
with self._lock:
installed = self.config_manager.get_installed_services()
if service_id in installed:
return {'ok': True, 'already_installed': True}
# Fetch and validate manifest
try:
manifest = self._fetch_manifest(service_id)
except Exception as e:
return {'ok': False, 'error': f'Failed to fetch manifest: {e}'}
ok, errs = self._validate_manifest(manifest)
if not ok:
return {'ok': False, 'errors': errs}
ok2, errs2 = validate_manifest(manifest)
if not ok2:
return {'ok': False, 'errors': errs2}
# Dependency check
if self.service_composer is not None:
err = self.service_composer._resolve_requires(manifest, installed)
if err:
return {'ok': False, 'error': err}
# Fetch compose template
try:
template_content = self._fetch_template(service_id, manifest)
except Exception as e:
return {'ok': False, 'error': f'Failed to fetch compose template: {e}'}
# Write compose file and start containers (validation inside write_compose)
if self.service_composer is not None:
try:
result = self.service_composer.install(service_id, manifest, template_content)
except ValueError as e:
return {'ok': False, 'error': str(e)}
except Exception as e:
return {'ok': False, 'error': f'Failed to start service: {e}'}
if not result.get('ok'):
return {'ok': False, 'error': result.get('error') or result.get('stderr', 'docker up failed')}
# Persist minimal install record
record = {
'id': service_id,
'manifest': manifest,
'installed_at': datetime.utcnow().isoformat(),
}
self.config_manager.set_installed_service(service_id, record)
# Regenerate Caddy (registry now drives routes, no caddy_routes list needed)
try:
self.caddy_manager.regenerate_with_installed([])
except Exception as e:
logger.warning('install: caddy regenerate failed for %s (non-fatal): %s', service_id, e)
if self.egress_manager:
try:
self.egress_manager.apply_service(service_id)
except Exception as exc:
logger.warning('Egress apply failed for %s (non-fatal): %s', service_id, exc)
return {'ok': True}
def remove(self, service_id: str, purge_data: bool = False) -> dict:
"""Remove an installed service."""
with self._lock:
installed = self.config_manager.get_installed_services()
if service_id not in installed:
return {'ok': False, 'error': f'Service {service_id} is not installed'}
# Prevent removing a service that others depend on
if self.service_composer is not None:
dependents = self.service_composer._resolve_dependents(service_id, installed)
if dependents:
return {
'ok': False,
'error': f'Cannot remove {service_id}: required by {", ".join(sorted(dependents))}',
}
if self.egress_manager:
try:
self.egress_manager.clear_service(service_id)
except Exception as exc:
logger.warning('Egress clear failed for %s (non-fatal): %s', service_id, exc)
# Stop and remove containers (best-effort)
if self.service_composer is not None:
try:
self.service_composer.remove(service_id, purge_data=purge_data)
except Exception as e:
logger.warning('remove: composer.remove failed for %s (non-fatal): %s', service_id, e)
# Remove from config
self.config_manager.remove_installed_service(service_id)
# Regenerate Caddy
try:
self.caddy_manager.regenerate_with_installed([])
except Exception as e:
logger.warning('remove: caddy regenerate failed for %s (non-fatal): %s', service_id, e)
return {'ok': True}
def list_services(self) -> dict:
"""Return available (from index) and installed services."""
available = self.fetch_index()
installed = self.config_manager.get_installed_services()
return {'available': available, 'installed': installed}
def reapply_on_startup(self) -> None:
"""Re-apply firewall and Caddy rules for all installed services on startup."""
from firewall_manager import apply_service_rules
installed = self.config_manager.get_installed_services()
if not installed:
return
# Re-apply iptables rules
for svc_id, record in installed.items():
ip = record.get('service_ip', '')
rules = record.get('iptables_rules', [])
try:
apply_service_rules(svc_id, ip, rules)
except Exception as e:
logger.warning(f'reapply_on_startup: apply_service_rules({svc_id}) failed: {e}')
# Regenerate Caddyfile
try:
caddy_routes = [
r.get('caddy_route')
for r in installed.values()
if r.get('caddy_route')
]
self.caddy_manager.regenerate_with_installed(caddy_routes)
except Exception as e:
logger.warning(f'reapply_on_startup: caddy regenerate failed: {e}')
# Bring up per-service compose stacks
if self.service_composer is not None:
try:
self.service_composer.reapply_active_services()
except Exception as e:
logger.warning('reapply_on_startup: reapply_active_services failed: %s', e)