Files
pic/api/service_store_manager.py
T
roof 6bc1d625bf fix: unblock instanceable connectivity store-service install + clean up on delete
Live verification on pic1 of the connectivity v2 multi-instance feature
surfaced four integration bugs that prevented installing any published
connectivity store service (proxy/wireguard-ext/openvpn-client/sshuttle)
and left stale host routing state behind. All four are fixed here:

1. manifest_validator rejected the CI-published `name:tag@sha256:<digest>`
   image form (it required digest-only), while service_store_manager already
   accepted it — so every published store image failed validation. Allow an
   optional tag before the digest, matching service_store_manager.

2. The cell-api image shipped the docker CLI but not the Compose v2 plugin,
   so every `docker compose` ServiceComposer runs (pull/up/down for store
   services) failed with "'compose' is not a docker command". Copy the
   compose plugin binary from the docker-cli stage.

3. service_store_manager.install ran the base compose up for instanceable
   services, whose template still contains ${INSTANCE_ID}/${REDIRECT_PORT}
   (there is no base container — one runs per connection instance). It now
   verifies the image signature but defers the container to connection
   creation for instanceable manifests.

4. delete_connection freed the record/secrets/container but never removed the
   connection's individually-managed `ip rule fwmark->table` or its FORWARD
   kill-switch (apply_routes only flushes the PIC_CONNECTIVITY chains and
   re-adds rules for surviving connections), leaking stale host routing state.
   It now tears both down; added _remove_killswitch.

Verified end-to-end on pic1: two proxy instances allocate distinct
marks/tables/ports (skipping in-use resources), render distinct per-instance
containers, two peers route through distinct instances (per-peer MARK +
REDIRECT), delete is blocked while referenced (409) and cleans its ip rule.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-15 08:45:32 -04:00

478 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Service Store Manager for Personal Internet Cell.
Manages installation, removal, and lifecycle of third-party services from the
PIC service store index. Each installed service runs as a Docker container
declared in a compose override file and has:
- An allocated IP in the service pool (172.20.0.20–254 by default)
- Optional iptables FORWARD rules declared in its manifest
- Optional Caddy reverse-proxy route declared in its manifest
"""
import logging
import os
import re
import threading
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import json
import requests
from base_service_manager import BaseServiceManager
from constants import RESERVED_SUBDOMAINS
from manifest_validator import validate_manifest, validate_provision_hook
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
INDEX_URL_DEFAULT = (
'https://git.pic.ngo/roof/pic-services/raw/branch/main/index.json'
)
MANIFEST_URL_TPL = (
'https://git.pic.ngo/roof/pic-services/raw/branch/main/services/{id}/manifest.json'
)
TEMPLATE_URL_TPL = (
'https://git.pic.ngo/roof/pic-services/raw/branch/main/services/{id}/compose-template.yml'
)
IMAGE_ALLOWLIST_RE = re.compile(
r'^git\.pic\.ngo/roof/[a-z0-9._/-]+(:[a-zA-Z0-9._-]+)?(@sha256:[a-f0-9]{64})?$'
)
# Images from well-known vendors that pre-date digest pinning in PIC.
# These are allowed to ship without a @sha256 digest; all others require one
# or must come from git.pic.ngo/roof/*.
TRUSTED_IMAGES_NO_DIGEST = frozenset({
'mailserver/docker-mailserver',
'tomsquest/docker-radicale',
'bytemark/webdav',
'filegator/filegator',
'hardware/rainloop',
})
FORBIDDEN_MOUNTS = frozenset([
'/', '/etc', '/var', '/proc', '/sys', '/dev', '/app', '/run', '/boot',
])
ENV_VALUE_RE = re.compile(r'^[A-Za-z0-9._@:/+\-= ]*$')
SUBDOMAIN_RE = re.compile(r'^[a-z][a-z0-9-]{0,30}$')
BACKEND_RE = re.compile(r'^[A-Za-z0-9._-]+:\d{1,5}$')
# ---------------------------------------------------------------------------
# ServiceStoreManager
# ---------------------------------------------------------------------------
class ServiceStoreManager(BaseServiceManager):
"""Manages service store: install, remove, and list available/installed services."""
def __init__(self, config_manager, caddy_manager, container_manager,
data_dir: str = '', config_dir: str = '',
service_composer=None, egress_manager=None):
super().__init__('service_store', data_dir, config_dir)
self.config_manager = config_manager
self.caddy_manager = caddy_manager
self.container_manager = container_manager
self.service_composer = service_composer
self.egress_manager = egress_manager
self.compose_override = os.environ.get(
'COMPOSE_SERVICES_PATH', '/app/docker-compose.services.yml'
)
self.index_url = os.environ.get('PIC_STORE_INDEX_URL', INDEX_URL_DEFAULT)
self._lock = threading.Lock()
self._index_cache: Optional[list] = None
self._index_cache_time: float = 0
self._cache_ttl: int = 300 # 5 min
# ── BaseServiceManager required ───────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
installed = self.config_manager.get_installed_services()
return {
'service': self.service_name,
'running': True,
'installed_count': len(installed),
}
def test_connectivity(self) -> Dict[str, Any]:
try:
resp = requests.get(self.index_url, timeout=5)
return {'success': resp.status_code == 200}
except Exception as e:
return {'success': False, 'error': str(e)}
# ── Manifest validation ───────────────────────────────────────────────
@staticmethod
def _validate_manifest(m: dict) -> Tuple[bool, List[str]]:
"""Validate a service manifest. Returns (ok, [errors])."""
errors: List[str] = []
# Required top-level fields
for field in ('id', 'name', 'version', 'author', 'image', 'container_name'):
if not m.get(field):
errors.append(f'Missing required field: {field}')
# Image allowlist
image = m.get('image', '')
if image and not IMAGE_ALLOWLIST_RE.match(image):
errors.append(
f'image must match git.pic.ngo/roof/* pattern, got: {image}'
)
elif image:
# Warn when a digest pin is absent so operators know exact-version
# tracking is not guaranteed. Images in TRUSTED_IMAGES_NO_DIGEST
# and images from our own git.pic.ngo/roof/* registry (which we
# build and tag) get warnings rather than hard errors; any other
# image that somehow passes the allowlist gets a hard error.
if '@sha256:' not in image:
image_base = image.split(':')[0].split('@')[0]
is_own_registry = image_base.startswith('git.pic.ngo/roof/')
if image_base in TRUSTED_IMAGES_NO_DIGEST or is_own_registry:
logger.warning('image %s has no digest pin', image)
else:
errors.append(
f'image {image!r} must include a @sha256:<digest> pin'
)
# Volume mount safety
for vol in m.get('volumes', []):
mount = vol.get('mount', '')
if mount in FORBIDDEN_MOUNTS:
errors.append(f'Forbidden volume mount: {mount}')
elif mount.startswith('/home/roof/pic'):
errors.append(f'Volume mount cannot be a prefix of /home/roof/pic: {mount}')
# iptables rules
for rule in m.get('iptables_rules', []):
if rule.get('type') != 'ACCEPT':
errors.append(
f'iptables_rules[].type must be ACCEPT, got: {rule.get("type")}'
)
if rule.get('dest_ip') != '${SERVICE_IP}':
errors.append(
f'iptables_rules[].dest_ip must be exactly ${{SERVICE_IP}}, '
f'got: {rule.get("dest_ip")}'
)
port = rule.get('dest_port')
if not isinstance(port, int) or not (1 <= port <= 65535):
errors.append(
f'iptables_rules[].dest_port must be an integer 1-65535, got: {port}'
)
proto = rule.get('proto', 'tcp')
if proto not in ('tcp', 'udp'):
errors.append(
f'iptables_rules[].proto must be tcp or udp, got: {proto}'
)
# Legacy caddy_route dict subdomain (for store manifests using the old format)
caddy_route = m.get('caddy_route') or {}
if isinstance(caddy_route, dict):
legacy_sub = caddy_route.get('subdomain', '')
else:
legacy_sub = ''
if legacy_sub:
if legacy_sub in RESERVED_SUBDOMAINS:
errors.append(f'caddy_route.subdomain is reserved: {legacy_sub}')
elif not SUBDOMAIN_RE.match(legacy_sub):
errors.append(
f'caddy_route.subdomain must match ^[a-z][a-z0-9-]{{0,30}}$, '
f'got: {legacy_sub}'
)
# Top-level subdomain + backend (consumed by ServiceRegistry.get_caddy_routes)
subdomain = m.get('subdomain', '')
if subdomain:
if subdomain in RESERVED_SUBDOMAINS:
errors.append(f'subdomain is reserved: {subdomain}')
elif not SUBDOMAIN_RE.match(subdomain):
errors.append(
f'subdomain must match ^[a-z][a-z0-9-]{{0,30}}$, got: {subdomain}'
)
backend = m.get('backend', '')
if backend and not BACKEND_RE.match(backend):
errors.append(f'backend must be host:port (e.g. cell-foo:8080), got: {backend}')
for sub in m.get('extra_subdomains') or []:
if not isinstance(sub, str):
errors.append('extra_subdomains entries must be strings')
elif sub in RESERVED_SUBDOMAINS:
errors.append(f'extra_subdomains entry is reserved: {sub}')
elif not SUBDOMAIN_RE.match(sub):
errors.append(
f'extra_subdomains entry must match ^[a-z][a-z0-9-]{{0,30}}$, got: {sub}'
)
for sub, bknd in (m.get('extra_backends') or {}).items():
if not isinstance(sub, str) or not SUBDOMAIN_RE.match(sub):
errors.append(
f'extra_backends key must match ^[a-z][a-z0-9-]{{0,30}}$, got: {sub!r}'
)
elif sub in RESERVED_SUBDOMAINS:
errors.append(f'extra_backends key is reserved: {sub}')
if not isinstance(bknd, str) or not BACKEND_RE.match(bknd):
errors.append(
f'extra_backends[{sub!r}] value must be host:port, got: {bknd!r}'
)
# Env value safety
for env_entry in m.get('env', []):
val = str(env_entry.get('value', ''))
if not ENV_VALUE_RE.match(val):
errors.append(
f'env[].value contains disallowed characters: {val!r}'
)
# Security layer: delegate to manifest_validator for cap_add, backend
# denylist, provision_hook, reserved container names, and kind guard.
ok, sec_errs = validate_manifest(m)
if not ok:
errors.extend(sec_errs)
return (len(errors) == 0, errors)
# ── Index / manifest fetching ─────────────────────────────────────────
def fetch_index(self) -> list:
"""Fetch and cache the service index."""
import time
_SIZE_LIMIT = 256 * 1024
now = time.time()
if self._index_cache is not None and (now - self._index_cache_time) < self._cache_ttl:
return self._index_cache
try:
resp = requests.get(self.index_url, timeout=10, stream=True)
resp.raise_for_status()
content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True)
if len(content) > _SIZE_LIMIT:
raise ValueError('Index response exceeds 256 KB limit')
data = json.loads(content)
self._index_cache = data if isinstance(data, list) else data.get('services', [])
self._index_cache_time = now
return self._index_cache
except Exception as e:
logger.warning(f'fetch_index failed: {e}')
return self._index_cache or []
def _fetch_manifest(self, service_id: str) -> dict:
"""Fetch a service manifest by ID."""
_SIZE_LIMIT = 256 * 1024
url = MANIFEST_URL_TPL.format(id=service_id)
resp = requests.get(url, timeout=10, stream=True)
resp.raise_for_status()
content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True)
if len(content) > _SIZE_LIMIT:
raise ValueError(
f'Manifest response for {service_id} exceeds 256 KB limit'
)
return json.loads(content)
def _fetch_template(self, service_id: str, manifest: dict) -> str:
"""Fetch the compose template for a service."""
_SIZE_LIMIT = 256 * 1024
url = TEMPLATE_URL_TPL.format(id=service_id)
resp = requests.get(url, timeout=10, stream=True)
resp.raise_for_status()
content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True)
if len(content) > _SIZE_LIMIT:
raise ValueError(f'Compose template for {service_id} exceeds 256 KB limit')
return content.decode('utf-8')
# ── Core operations ───────────────────────────────────────────────────
def install(self, service_id: str) -> dict:
"""Install a service from the store."""
with self._lock:
installed = self.config_manager.get_installed_services()
if service_id in installed:
return {'ok': True, 'already_installed': True}
# Fetch and validate manifest
try:
manifest = self._fetch_manifest(service_id)
except Exception as e:
return {'ok': False, 'error': f'Failed to fetch manifest: {e}'}
ok, errs = self._validate_manifest(manifest)
if not ok:
return {'ok': False, 'errors': errs}
ok2, errs2 = validate_manifest(manifest)
if not ok2:
return {'ok': False, 'errors': errs2}
# Digest-pin requirement is mode-dependent: the static validators
# above only warn on a missing @sha256: pin (so installs keep
# working until the publish pipeline writes digests). Under
# enforce, a store image without a digest pin is fatal.
mode = self.config_manager.get_image_verification_mode()
image = manifest.get('image', '')
if mode == 'enforce' and image and '@sha256:' not in image:
return {
'ok': False,
'error': (
f'image {image!r} must be digest-pinned (@sha256:) '
'under image_verification mode "enforce"'
),
}
# Dependency check
if self.service_composer is not None:
err = self.service_composer._resolve_requires(manifest, installed)
if err:
return {'ok': False, 'error': err}
# Fetch compose template
try:
template_content = self._fetch_template(service_id, manifest)
except Exception as e:
return {'ok': False, 'error': f'Failed to fetch compose template: {e}'}
# Write compose file and start containers (validation inside write_compose).
# Instanceable connectivity services back one container PER connection
# instance, rendered later by ConnectivityManager with a concrete
# ${INSTANCE_ID}/${REDIRECT_PORT}. Their base template still contains
# those placeholders, so there is no base container to bring up at
# install time — rendering/pulling/up-ing it here fails on the unset
# variables. Verify the image signature now (the enforce gate still
# applies), but defer the container to connection creation.
if self.service_composer is not None:
if manifest.get('instanceable'):
try:
verify = self.service_composer.verify_image(service_id, manifest)
except Exception as e:
return {'ok': False, 'error': f'image verification failed: {e}'}
if not verify.get('ok'):
return {'ok': False,
'error': verify.get('error', 'image verification failed')}
else:
try:
result = self.service_composer.install(service_id, manifest, template_content)
except ValueError as e:
return {'ok': False, 'error': str(e)}
except Exception as e:
return {'ok': False, 'error': f'Failed to start service: {e}'}
if not result.get('ok'):
return {'ok': False, 'error': result.get('error') or result.get('stderr', 'docker up failed')}
# Persist minimal install record. For instanceable connectivity
# services the raw compose template is stored so ConnectivityManager
# can render one container per connection instance without re-fetching.
record = {
'id': service_id,
'manifest': manifest,
'installed_at': datetime.utcnow().isoformat(),
}
if manifest.get('instanceable'):
record['compose_template'] = template_content
self.config_manager.set_installed_service(service_id, record)
# Regenerate Caddy (registry now drives routes, no caddy_routes list needed)
try:
self.caddy_manager.regenerate_with_installed([])
except Exception as e:
logger.warning('install: caddy regenerate failed for %s (non-fatal): %s', service_id, e)
if self.egress_manager:
try:
self.egress_manager.apply_service(service_id)
except Exception as exc:
logger.warning('Egress apply failed for %s (non-fatal): %s', service_id, exc)
return {'ok': True}
def remove(self, service_id: str, purge_data: bool = False) -> dict:
"""Remove an installed service."""
with self._lock:
installed = self.config_manager.get_installed_services()
if service_id not in installed:
return {'ok': False, 'error': f'Service {service_id} is not installed'}
# Prevent removing a service that others depend on
if self.service_composer is not None:
dependents = self.service_composer._resolve_dependents(service_id, installed)
if dependents:
return {
'ok': False,
'error': f'Cannot remove {service_id}: required by {", ".join(sorted(dependents))}',
}
if self.egress_manager:
try:
self.egress_manager.clear_service(service_id)
except Exception as exc:
logger.warning('Egress clear failed for %s (non-fatal): %s', service_id, exc)
# Stop and remove containers (best-effort)
if self.service_composer is not None:
try:
self.service_composer.remove(service_id, purge_data=purge_data)
except Exception as e:
logger.warning('remove: composer.remove failed for %s (non-fatal): %s', service_id, e)
# Remove from config
self.config_manager.remove_installed_service(service_id)
# Regenerate Caddy
try:
self.caddy_manager.regenerate_with_installed([])
except Exception as e:
logger.warning('remove: caddy regenerate failed for %s (non-fatal): %s', service_id, e)
return {'ok': True}
def list_services(self) -> dict:
"""Return available (from index) and installed services."""
available = self.fetch_index()
installed = self.config_manager.get_installed_services()
return {'available': available, 'installed': installed}
def reapply_on_startup(self) -> None:
"""Re-apply firewall and Caddy rules for all installed services on startup."""
from firewall_manager import apply_service_rules
installed = self.config_manager.get_installed_services()
# Always regenerate the Caddyfile so a cell rename or fresh install
# produces the correct domain even when no store services are installed.
try:
caddy_routes = [
r.get('caddy_route')
for r in (installed or {}).values()
if r.get('caddy_route')
]
self.caddy_manager.regenerate_with_installed(caddy_routes)
except Exception as e:
logger.warning(f'reapply_on_startup: caddy regenerate failed: {e}')
if not installed:
return
# Re-apply iptables rules
for svc_id, record in installed.items():
ip = record.get('service_ip', '')
rules = record.get('iptables_rules', [])
try:
apply_service_rules(svc_id, ip, rules)
except Exception as e:
logger.warning(f'reapply_on_startup: apply_service_rules({svc_id}) failed: {e}')
# Bring up per-service compose stacks
if self.service_composer is not None:
try:
self.service_composer.reapply_active_services()
except Exception as e:
logger.warning('reapply_on_startup: reapply_active_services failed: %s', e)
# Re-apply egress fwmark rules
if self.egress_manager is not None:
try:
self.egress_manager.apply_all()
except Exception as e:
logger.warning('reapply_on_startup: egress apply_all failed: %s', e)