Files
pic/api/service_store_manager.py
T
roof 16fb362df7
Unit Tests / test (push) Failing after 11s
feat: replace hardcoded service names with ServiceRegistry-driven Caddy and CoreDNS config
Previously, CaddyManager and NetworkManager contained hardcoded lists of
service names (calendar, files, mail, webdav, etc.), meaning every new
service required a code change to appear in Caddy routes and DNS records.
Now both managers accept a service_registry parameter and derive their
service lists dynamically from the registry at runtime.

- CaddyManager: new _build_registry_service_routes() and
  _http01_service_pairs() methods pull routes from the registry
- NetworkManager: new _get_service_subdomains() method returns registry
  subdomains with a hardcoded fallback when no registry is wired in;
  _build_dns_records, stale-record detection, and service name sets all
  use the registry
- managers.py: service_registry constructed before network_manager so it
  can be injected into both CaddyManager and NetworkManager
- service_registry.py: validation chokepoint in get_caddy_routes() rejects
  invalid subdomain/backend values and reserved service names
- service_store_manager.py: _validate_manifest now validates top-level
  subdomain, backend, extra_subdomains, and extra_backends fields
- tests: 24 new tests covering registry-driven routing and DNS subdomain
  generation (test_caddy_registry_integration.py)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 18:27:52 -04:00

565 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Service Store Manager for Personal Internet Cell.
Manages installation, removal, and lifecycle of third-party services from the
PIC service store index. Each installed service runs as a Docker container
declared in a compose override file and has:
- An allocated IP in the service pool (172.20.0.20254 by default)
- Optional iptables FORWARD rules declared in its manifest
- Optional Caddy reverse-proxy route declared in its manifest
"""
import logging
import os
import re
import threading
import subprocess
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import requests
import yaml
from base_service_manager import BaseServiceManager
from ip_utils import CONTAINER_OFFSETS
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SERVICE_POOL_START = 20
SERVICE_POOL_END = 254
INDEX_URL_DEFAULT = (
'https://git.pic.ngo/roof/pic-services/raw/branch/main/index.json'
)
MANIFEST_URL_TPL = (
'https://git.pic.ngo/roof/pic-services/raw/branch/main/services/{id}/manifest.json'
)
IMAGE_ALLOWLIST_RE = re.compile(
r'^git\.pic\.ngo/roof/[a-z0-9._/-]+(:[a-zA-Z0-9._-]+)?$'
)
FORBIDDEN_MOUNTS = frozenset([
'/', '/etc', '/var', '/proc', '/sys', '/dev', '/app', '/run', '/boot',
])
RESERVED_SUBDOMAINS = frozenset([
'api', 'webui', 'admin', 'www', 'mail', 'ns1', 'ns2',
'git', 'registry', 'install',
])
ENV_VALUE_RE = re.compile(r'^[A-Za-z0-9._@:/+\-= ]*$')
SUBDOMAIN_RE = re.compile(r'^[a-z][a-z0-9-]{0,30}$')
BACKEND_RE = re.compile(r'^[A-Za-z0-9._-]+:\d{1,5}$')
# ---------------------------------------------------------------------------
# ServiceStoreManager
# ---------------------------------------------------------------------------
class ServiceStoreManager(BaseServiceManager):
"""Manages service store: install, remove, and list available/installed services."""
def __init__(self, config_manager, caddy_manager, container_manager,
data_dir: str = '', config_dir: str = ''):
super().__init__('service_store', data_dir, config_dir)
self.config_manager = config_manager
self.caddy_manager = caddy_manager
self.container_manager = container_manager
self.compose_override = os.environ.get(
'COMPOSE_SERVICES_PATH', '/app/docker-compose.services.yml'
)
self.index_url = os.environ.get('PIC_STORE_INDEX_URL', INDEX_URL_DEFAULT)
self._lock = threading.Lock()
self._index_cache: Optional[list] = None
self._index_cache_time: float = 0
self._cache_ttl: int = 300 # 5 min
# ── BaseServiceManager required ───────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
installed = self.config_manager.get_installed_services()
return {
'service': self.service_name,
'running': True,
'installed_count': len(installed),
}
def test_connectivity(self) -> Dict[str, Any]:
try:
resp = requests.get(self.index_url, timeout=5)
return {'success': resp.status_code == 200}
except Exception as e:
return {'success': False, 'error': str(e)}
# ── Manifest validation ───────────────────────────────────────────────
@staticmethod
def _validate_manifest(m: dict) -> Tuple[bool, List[str]]:
"""Validate a service manifest. Returns (ok, [errors])."""
errors: List[str] = []
# Required top-level fields
for field in ('id', 'name', 'version', 'author', 'image', 'container_name'):
if not m.get(field):
errors.append(f'Missing required field: {field}')
# Image allowlist
image = m.get('image', '')
if image and not IMAGE_ALLOWLIST_RE.match(image):
errors.append(
f'image must match git.pic.ngo/roof/* pattern, got: {image}'
)
# Volume mount safety
for vol in m.get('volumes', []):
mount = vol.get('mount', '')
if mount in FORBIDDEN_MOUNTS:
errors.append(f'Forbidden volume mount: {mount}')
elif mount.startswith('/home/roof/pic'):
errors.append(f'Volume mount cannot be a prefix of /home/roof/pic: {mount}')
# iptables rules
for rule in m.get('iptables_rules', []):
if rule.get('type') != 'ACCEPT':
errors.append(
f'iptables_rules[].type must be ACCEPT, got: {rule.get("type")}'
)
if rule.get('dest_ip') != '${SERVICE_IP}':
errors.append(
f'iptables_rules[].dest_ip must be exactly ${{SERVICE_IP}}, '
f'got: {rule.get("dest_ip")}'
)
port = rule.get('dest_port')
if not isinstance(port, int) or not (1 <= port <= 65535):
errors.append(
f'iptables_rules[].dest_port must be an integer 1-65535, got: {port}'
)
proto = rule.get('proto', 'tcp')
if proto not in ('tcp', 'udp'):
errors.append(
f'iptables_rules[].proto must be tcp or udp, got: {proto}'
)
# Legacy caddy_route dict subdomain (for store manifests using the old format)
caddy_route = m.get('caddy_route') or {}
if isinstance(caddy_route, dict):
legacy_sub = caddy_route.get('subdomain', '')
else:
legacy_sub = ''
if legacy_sub:
if legacy_sub in RESERVED_SUBDOMAINS:
errors.append(f'caddy_route.subdomain is reserved: {legacy_sub}')
elif not SUBDOMAIN_RE.match(legacy_sub):
errors.append(
f'caddy_route.subdomain must match ^[a-z][a-z0-9-]{{0,30}}$, '
f'got: {legacy_sub}'
)
# Top-level subdomain + backend (consumed by ServiceRegistry.get_caddy_routes)
subdomain = m.get('subdomain', '')
if subdomain:
if subdomain in RESERVED_SUBDOMAINS:
errors.append(f'subdomain is reserved: {subdomain}')
elif not SUBDOMAIN_RE.match(subdomain):
errors.append(
f'subdomain must match ^[a-z][a-z0-9-]{{0,30}}$, got: {subdomain}'
)
backend = m.get('backend', '')
if backend and not BACKEND_RE.match(backend):
errors.append(f'backend must be host:port (e.g. cell-foo:8080), got: {backend}')
for sub in m.get('extra_subdomains') or []:
if not isinstance(sub, str):
errors.append('extra_subdomains entries must be strings')
elif sub in RESERVED_SUBDOMAINS:
errors.append(f'extra_subdomains entry is reserved: {sub}')
elif not SUBDOMAIN_RE.match(sub):
errors.append(
f'extra_subdomains entry must match ^[a-z][a-z0-9-]{{0,30}}$, got: {sub}'
)
for sub, bknd in (m.get('extra_backends') or {}).items():
if not isinstance(sub, str) or not SUBDOMAIN_RE.match(sub):
errors.append(
f'extra_backends key must match ^[a-z][a-z0-9-]{{0,30}}$, got: {sub!r}'
)
elif sub in RESERVED_SUBDOMAINS:
errors.append(f'extra_backends key is reserved: {sub}')
if not isinstance(bknd, str) or not BACKEND_RE.match(bknd):
errors.append(
f'extra_backends[{sub!r}] value must be host:port, got: {bknd!r}'
)
# Env value safety
for env_entry in m.get('env', []):
val = str(env_entry.get('value', ''))
if not ENV_VALUE_RE.match(val):
errors.append(
f'env[].value contains disallowed characters: {val!r}'
)
return (len(errors) == 0, errors)
# ── IP allocation ─────────────────────────────────────────────────────
def _allocate_service_ip(self, service_id: str) -> str:
"""Allocate the next free IP from the service pool."""
identity = self.config_manager.get_identity()
ip_range = identity.get('ip_range', '172.20.0.0/16')
import ipaddress
network = ipaddress.IPv4Network(ip_range, strict=False)
base = int(network.network_address)
# IPs already assigned to named containers
reserved_offsets = set(CONTAINER_OFFSETS.values())
# IPs already assigned to installed services
service_ips: Dict[str, str] = identity.get('service_ips', {})
taken_ips = set(service_ips.values())
for offset in range(SERVICE_POOL_START, SERVICE_POOL_END + 1):
if offset in reserved_offsets:
continue
candidate = str(ipaddress.IPv4Address(base + offset))
if candidate not in taken_ips:
return candidate
raise RuntimeError('Service IP pool exhausted (offsets 20-254 all taken)')
# ── Compose override ──────────────────────────────────────────────────
def _render_compose_override(self, installed_records: dict) -> str:
"""Generate docker-compose YAML override for all installed services."""
services: Dict[str, Any] = {}
for svc_id, record in installed_records.items():
manifest = record.get('manifest', {})
container_name = record.get('container_name', svc_id)
image = manifest.get('image', record.get('image', ''))
service_ip = record.get('service_ip', '')
# Volumes
volumes = []
for vol in manifest.get('volumes', []):
vol_name = vol.get('name', '')
mount = vol.get('mount', '')
if vol_name and mount:
volumes.append(f'{vol_name}:{mount}')
# Environment
environment: Dict[str, str] = {}
for env_entry in manifest.get('env', []):
k = env_entry.get('key', '')
v = str(env_entry.get('value', ''))
if k:
environment[k] = v
svc_def: Dict[str, Any] = {
'image': image,
'container_name': container_name,
'restart': 'unless-stopped',
'logging': {
'driver': 'json-file',
'options': {
'max-size': '10m',
'max-file': '5',
},
},
'networks': {
'cell-network': {
'ipv4_address': service_ip,
}
},
}
if volumes:
svc_def['volumes'] = volumes
if environment:
svc_def['environment'] = environment
services[container_name] = svc_def
# Collect named volumes
named_volumes: Dict[str, Any] = {}
for svc_id, record in installed_records.items():
manifest = record.get('manifest', {})
for vol in manifest.get('volumes', []):
vol_name = vol.get('name', '')
if vol_name:
named_volumes[vol_name] = None # Docker default driver
doc: Dict[str, Any] = {
'version': '3.8',
'services': services,
'networks': {
'cell-network': {
'external': True,
}
},
}
if named_volumes:
doc['volumes'] = named_volumes
return yaml.dump(doc, default_flow_style=False, allow_unicode=True)
def _write_compose_override(self, content: str) -> None:
"""Atomic write of the compose override file."""
tmp_path = self.compose_override + '.tmp'
try:
os.makedirs(os.path.dirname(os.path.abspath(self.compose_override)),
exist_ok=True)
except (PermissionError, OSError):
pass
with open(tmp_path, 'w') as f:
f.write(content)
f.flush()
try:
os.fsync(f.fileno())
except OSError:
pass
os.replace(tmp_path, self.compose_override)
# ── Index / manifest fetching ─────────────────────────────────────────
def fetch_index(self) -> list:
"""Fetch and cache the service index."""
import time
now = time.time()
if self._index_cache is not None and (now - self._index_cache_time) < self._cache_ttl:
return self._index_cache
try:
resp = requests.get(self.index_url, timeout=10)
resp.raise_for_status()
data = resp.json()
self._index_cache = data if isinstance(data, list) else data.get('services', [])
self._index_cache_time = now
return self._index_cache
except Exception as e:
logger.warning(f'fetch_index failed: {e}')
return self._index_cache or []
def _fetch_manifest(self, service_id: str) -> dict:
"""Fetch a service manifest by ID."""
url = MANIFEST_URL_TPL.format(id=service_id)
resp = requests.get(url, timeout=10)
resp.raise_for_status()
return resp.json()
# ── Core operations ───────────────────────────────────────────────────
def install(self, service_id: str) -> dict:
"""Install a service from the store."""
from firewall_manager import apply_service_rules
with self._lock:
# Already installed?
installed = self.config_manager.get_installed_services()
if service_id in installed:
return {'ok': True, 'already_installed': True}
# Fetch and validate manifest
try:
manifest = self._fetch_manifest(service_id)
except Exception as e:
return {'ok': False, 'error': f'Failed to fetch manifest: {e}'}
ok, errs = self._validate_manifest(manifest)
if not ok:
return {'ok': False, 'errors': errs}
# Allocate IP
try:
ip = self._allocate_service_ip(service_id)
except RuntimeError as e:
return {'ok': False, 'error': str(e)}
# Build install record
record = {
'id': service_id,
'name': manifest.get('name', service_id),
'container_name': manifest['container_name'],
'image': manifest.get('image', ''),
'service_ip': ip,
'caddy_route': manifest.get('caddy_route'),
'iptables_rules': manifest.get('iptables_rules', []),
'manifest': manifest,
'installed_at': datetime.utcnow().isoformat(),
}
# Persist to config
self.config_manager.set_installed_service(service_id, record)
identity = self.config_manager.get_identity()
service_ips = dict(identity.get('service_ips', {}))
service_ips[service_id] = ip
self.config_manager.set_identity_field('service_ips', service_ips)
# Write compose override
all_installed = self.config_manager.get_installed_services()
try:
content = self._render_compose_override(all_installed)
self._write_compose_override(content)
except Exception as e:
logger.error(f'Failed to write compose override: {e}')
# Apply iptables rules (best-effort)
try:
apply_service_rules(service_id, ip, manifest.get('iptables_rules', []))
except Exception as e:
logger.warning(f'apply_service_rules for {service_id} failed (non-fatal): {e}')
# Regenerate Caddyfile
try:
caddy_routes = [
r.get('caddy_route')
for r in all_installed.values()
if r.get('caddy_route')
]
self.caddy_manager.regenerate_with_installed(caddy_routes)
except Exception as e:
logger.warning(f'caddy regenerate for {service_id} failed (non-fatal): {e}')
# Start the container via docker compose
base_compose = os.environ.get('COMPOSE_FILE', '/app/docker-compose.yml')
try:
result = subprocess.run(
['docker', 'compose',
'-f', base_compose,
'-f', self.compose_override,
'up', '-d', manifest['container_name']],
capture_output=True, text=True, timeout=120,
)
if result.returncode != 0:
logger.warning(
f'docker compose up for {service_id} failed: {result.stderr.strip()}'
)
except Exception as e:
logger.warning(f'docker compose up for {service_id} failed (non-fatal): {e}')
return {
'ok': True,
'service_ip': ip,
'container_name': manifest['container_name'],
}
def remove(self, service_id: str, purge_data: bool = False) -> dict:
"""Remove an installed service."""
from firewall_manager import clear_service_rules
with self._lock:
installed = self.config_manager.get_installed_services()
record = installed.get(service_id)
if not record:
return {'ok': False, 'error': f'Service {service_id} is not installed'}
container_name = record.get('container_name', service_id)
manifest = record.get('manifest', {})
base_compose = os.environ.get('COMPOSE_FILE', '/app/docker-compose.yml')
# Stop and remove container
try:
subprocess.run(
['docker', 'compose',
'-f', base_compose,
'-f', self.compose_override,
'stop', container_name],
capture_output=True, text=True, timeout=60,
)
except Exception as e:
logger.warning(f'docker compose stop for {service_id} failed (non-fatal): {e}')
try:
subprocess.run(
['docker', 'rm', '-f', container_name],
capture_output=True, text=True, timeout=30,
)
except Exception as e:
logger.warning(f'docker rm for {service_id} failed (non-fatal): {e}')
# Clear iptables rules
try:
clear_service_rules(service_id)
except Exception as e:
logger.warning(f'clear_service_rules for {service_id} failed (non-fatal): {e}')
# Remove from config, regenerate compose + caddy
self.config_manager.remove_installed_service(service_id)
remaining = self.config_manager.get_installed_services()
try:
content = self._render_compose_override(remaining)
self._write_compose_override(content)
except Exception as e:
logger.error(f'Failed to write compose override after remove: {e}')
try:
caddy_routes = [
r.get('caddy_route')
for r in remaining.values()
if r.get('caddy_route')
]
self.caddy_manager.regenerate_with_installed(caddy_routes)
except Exception as e:
logger.warning(f'caddy regenerate after remove failed (non-fatal): {e}')
# Purge named volumes if requested
if purge_data:
for vol in manifest.get('volumes', []):
vol_name = vol.get('name', '')
if vol_name:
try:
subprocess.run(
['docker', 'volume', 'rm', vol_name],
capture_output=True, text=True, timeout=30,
)
except Exception as e:
logger.warning(
f'docker volume rm {vol_name} failed (non-fatal): {e}'
)
return {'ok': True}
def list_services(self) -> dict:
"""Return available (from index) and installed services."""
available = self.fetch_index()
installed = self.config_manager.get_installed_services()
return {'available': available, 'installed': installed}
def reapply_on_startup(self) -> None:
"""Re-apply firewall and Caddy rules for all installed services on startup."""
from firewall_manager import apply_service_rules
installed = self.config_manager.get_installed_services()
if not installed:
return
# Regenerate compose override in case it was deleted
try:
content = self._render_compose_override(installed)
self._write_compose_override(content)
except Exception as e:
logger.warning(f'reapply_on_startup: compose override write failed: {e}')
# Re-apply iptables rules
for svc_id, record in installed.items():
ip = record.get('service_ip', '')
rules = record.get('iptables_rules', [])
try:
apply_service_rules(svc_id, ip, rules)
except Exception as e:
logger.warning(f'reapply_on_startup: apply_service_rules({svc_id}) failed: {e}')
# Regenerate Caddyfile
try:
caddy_routes = [
r.get('caddy_route')
for r in installed.values()
if r.get('caddy_route')
]
self.caddy_manager.regenerate_with_installed(caddy_routes)
except Exception as e:
logger.warning(f'reapply_on_startup: caddy regenerate failed: {e}')