#!/usr/bin/env python3 """ Service Store Manager for Personal Internet Cell. Manages installation, removal, and lifecycle of third-party services from the PIC service store index. Each installed service runs as a Docker container declared in a compose override file and has: - An allocated IP in the service pool (172.20.0.20–254 by default) - Optional iptables FORWARD rules declared in its manifest - Optional Caddy reverse-proxy route declared in its manifest """ import logging import os import re import threading import subprocess from datetime import datetime from typing import Any, Dict, List, Optional, Tuple import requests import yaml from base_service_manager import BaseServiceManager from ip_utils import CONTAINER_OFFSETS logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- SERVICE_POOL_START = 20 SERVICE_POOL_END = 254 INDEX_URL_DEFAULT = ( 'https://git.pic.ngo/roof/pic-services/raw/branch/main/index.json' ) MANIFEST_URL_TPL = ( 'https://git.pic.ngo/roof/pic-services/raw/branch/main/services/{id}/manifest.json' ) IMAGE_ALLOWLIST_RE = re.compile( r'^git\.pic\.ngo/roof/[a-z0-9._/-]+(:[a-zA-Z0-9._-]+)?$' ) FORBIDDEN_MOUNTS = frozenset([ '/', '/etc', '/var', '/proc', '/sys', '/dev', '/app', '/run', '/boot', ]) RESERVED_SUBDOMAINS = frozenset([ 'api', 'webui', 'admin', 'www', 'mail', 'ns1', 'ns2', 'git', 'registry', 'install', ]) ENV_VALUE_RE = re.compile(r'^[A-Za-z0-9._@:/+\-= ]*$') # --------------------------------------------------------------------------- # ServiceStoreManager # --------------------------------------------------------------------------- class ServiceStoreManager(BaseServiceManager): """Manages service store: install, remove, and list available/installed services.""" def __init__(self, config_manager, caddy_manager, container_manager, data_dir: str = '', config_dir: str = ''): super().__init__('service_store', data_dir, config_dir) self.config_manager = config_manager self.caddy_manager = caddy_manager self.container_manager = container_manager self.compose_override = os.environ.get( 'COMPOSE_SERVICES_PATH', '/app/docker-compose.services.yml' ) self.index_url = os.environ.get('PIC_STORE_INDEX_URL', INDEX_URL_DEFAULT) self._lock = threading.Lock() self._index_cache: Optional[list] = None self._index_cache_time: float = 0 self._cache_ttl: int = 300 # 5 min # ── BaseServiceManager required ─────────────────────────────────────── def get_status(self) -> Dict[str, Any]: installed = self.config_manager.get_installed_services() return { 'service': self.service_name, 'running': True, 'installed_count': len(installed), } def test_connectivity(self) -> Dict[str, Any]: try: resp = requests.get(self.index_url, timeout=5) return {'success': resp.status_code == 200} except Exception as e: return {'success': False, 'error': str(e)} # ── Manifest validation ─────────────────────────────────────────────── @staticmethod def _validate_manifest(m: dict) -> Tuple[bool, List[str]]: """Validate a service manifest. Returns (ok, [errors]).""" errors: List[str] = [] # Required top-level fields for field in ('id', 'name', 'version', 'author', 'image', 'container_name'): if not m.get(field): errors.append(f'Missing required field: {field}') # Image allowlist image = m.get('image', '') if image and not IMAGE_ALLOWLIST_RE.match(image): errors.append( f'image must match git.pic.ngo/roof/* pattern, got: {image}' ) # Volume mount safety for vol in m.get('volumes', []): mount = vol.get('mount', '') if mount in FORBIDDEN_MOUNTS: errors.append(f'Forbidden volume mount: {mount}') elif mount.startswith('/home/roof/pic'): errors.append(f'Volume mount cannot be a prefix of /home/roof/pic: {mount}') # iptables rules for rule in m.get('iptables_rules', []): if rule.get('type') != 'ACCEPT': errors.append( f'iptables_rules[].type must be ACCEPT, got: {rule.get("type")}' ) if rule.get('dest_ip') != '${SERVICE_IP}': errors.append( f'iptables_rules[].dest_ip must be exactly ${{SERVICE_IP}}, ' f'got: {rule.get("dest_ip")}' ) port = rule.get('dest_port') if not isinstance(port, int) or not (1 <= port <= 65535): errors.append( f'iptables_rules[].dest_port must be an integer 1-65535, got: {port}' ) proto = rule.get('proto', 'tcp') if proto not in ('tcp', 'udp'): errors.append( f'iptables_rules[].proto must be tcp or udp, got: {proto}' ) # Caddy route subdomain caddy_route = m.get('caddy_route') or {} if isinstance(caddy_route, dict): subdomain = caddy_route.get('subdomain', '') else: subdomain = '' if subdomain: if subdomain in RESERVED_SUBDOMAINS: errors.append(f'caddy_route.subdomain is reserved: {subdomain}') elif not re.match(r'^[a-z][a-z0-9-]{0,30}$', subdomain): errors.append( f'caddy_route.subdomain must match ^[a-z][a-z0-9-]{{0,30}}$, ' f'got: {subdomain}' ) # Env value safety for env_entry in m.get('env', []): val = str(env_entry.get('value', '')) if not ENV_VALUE_RE.match(val): errors.append( f'env[].value contains disallowed characters: {val!r}' ) return (len(errors) == 0, errors) # ── IP allocation ───────────────────────────────────────────────────── def _allocate_service_ip(self, service_id: str) -> str: """Allocate the next free IP from the service pool.""" identity = self.config_manager.get_identity() ip_range = identity.get('ip_range', '172.20.0.0/16') import ipaddress network = ipaddress.IPv4Network(ip_range, strict=False) base = int(network.network_address) # IPs already assigned to named containers reserved_offsets = set(CONTAINER_OFFSETS.values()) # IPs already assigned to installed services service_ips: Dict[str, str] = identity.get('service_ips', {}) taken_ips = set(service_ips.values()) for offset in range(SERVICE_POOL_START, SERVICE_POOL_END + 1): if offset in reserved_offsets: continue candidate = str(ipaddress.IPv4Address(base + offset)) if candidate not in taken_ips: return candidate raise RuntimeError('Service IP pool exhausted (offsets 20-254 all taken)') # ── Compose override ────────────────────────────────────────────────── def _render_compose_override(self, installed_records: dict) -> str: """Generate docker-compose YAML override for all installed services.""" services: Dict[str, Any] = {} for svc_id, record in installed_records.items(): manifest = record.get('manifest', {}) container_name = record.get('container_name', svc_id) image = manifest.get('image', record.get('image', '')) service_ip = record.get('service_ip', '') # Volumes volumes = [] for vol in manifest.get('volumes', []): vol_name = vol.get('name', '') mount = vol.get('mount', '') if vol_name and mount: volumes.append(f'{vol_name}:{mount}') # Environment environment: Dict[str, str] = {} for env_entry in manifest.get('env', []): k = env_entry.get('key', '') v = str(env_entry.get('value', '')) if k: environment[k] = v svc_def: Dict[str, Any] = { 'image': image, 'container_name': container_name, 'restart': 'unless-stopped', 'logging': { 'driver': 'json-file', 'options': { 'max-size': '10m', 'max-file': '5', }, }, 'networks': { 'cell-network': { 'ipv4_address': service_ip, } }, } if volumes: svc_def['volumes'] = volumes if environment: svc_def['environment'] = environment services[container_name] = svc_def # Collect named volumes named_volumes: Dict[str, Any] = {} for svc_id, record in installed_records.items(): manifest = record.get('manifest', {}) for vol in manifest.get('volumes', []): vol_name = vol.get('name', '') if vol_name: named_volumes[vol_name] = None # Docker default driver doc: Dict[str, Any] = { 'version': '3.8', 'services': services, 'networks': { 'cell-network': { 'external': True, } }, } if named_volumes: doc['volumes'] = named_volumes return yaml.dump(doc, default_flow_style=False, allow_unicode=True) def _write_compose_override(self, content: str) -> None: """Atomic write of the compose override file.""" tmp_path = self.compose_override + '.tmp' try: os.makedirs(os.path.dirname(os.path.abspath(self.compose_override)), exist_ok=True) except (PermissionError, OSError): pass with open(tmp_path, 'w') as f: f.write(content) f.flush() try: os.fsync(f.fileno()) except OSError: pass os.replace(tmp_path, self.compose_override) # ── Index / manifest fetching ───────────────────────────────────────── def fetch_index(self) -> list: """Fetch and cache the service index.""" import time now = time.time() if self._index_cache is not None and (now - self._index_cache_time) < self._cache_ttl: return self._index_cache try: resp = requests.get(self.index_url, timeout=10) resp.raise_for_status() data = resp.json() self._index_cache = data if isinstance(data, list) else data.get('services', []) self._index_cache_time = now return self._index_cache except Exception as e: logger.warning(f'fetch_index failed: {e}') return self._index_cache or [] def _fetch_manifest(self, service_id: str) -> dict: """Fetch a service manifest by ID.""" url = MANIFEST_URL_TPL.format(id=service_id) resp = requests.get(url, timeout=10) resp.raise_for_status() return resp.json() # ── Core operations ─────────────────────────────────────────────────── def install(self, service_id: str) -> dict: """Install a service from the store.""" from firewall_manager import apply_service_rules with self._lock: # Already installed? installed = self.config_manager.get_installed_services() if service_id in installed: return {'ok': True, 'already_installed': True} # Fetch and validate manifest try: manifest = self._fetch_manifest(service_id) except Exception as e: return {'ok': False, 'error': f'Failed to fetch manifest: {e}'} ok, errs = self._validate_manifest(manifest) if not ok: return {'ok': False, 'errors': errs} # Allocate IP try: ip = self._allocate_service_ip(service_id) except RuntimeError as e: return {'ok': False, 'error': str(e)} # Build install record record = { 'id': service_id, 'name': manifest.get('name', service_id), 'container_name': manifest['container_name'], 'image': manifest.get('image', ''), 'service_ip': ip, 'caddy_route': manifest.get('caddy_route'), 'iptables_rules': manifest.get('iptables_rules', []), 'manifest': manifest, 'installed_at': datetime.utcnow().isoformat(), } # Persist to config self.config_manager.set_installed_service(service_id, record) identity = self.config_manager.get_identity() service_ips = dict(identity.get('service_ips', {})) service_ips[service_id] = ip self.config_manager.set_identity_field('service_ips', service_ips) # Write compose override all_installed = self.config_manager.get_installed_services() try: content = self._render_compose_override(all_installed) self._write_compose_override(content) except Exception as e: logger.error(f'Failed to write compose override: {e}') # Apply iptables rules (best-effort) try: apply_service_rules(service_id, ip, manifest.get('iptables_rules', [])) except Exception as e: logger.warning(f'apply_service_rules for {service_id} failed (non-fatal): {e}') # Regenerate Caddyfile try: caddy_routes = [ r.get('caddy_route') for r in all_installed.values() if r.get('caddy_route') ] self.caddy_manager.regenerate_with_installed(caddy_routes) except Exception as e: logger.warning(f'caddy regenerate for {service_id} failed (non-fatal): {e}') # Start the container via docker compose base_compose = os.environ.get('COMPOSE_FILE', '/app/docker-compose.yml') try: result = subprocess.run( ['docker', 'compose', '-f', base_compose, '-f', self.compose_override, 'up', '-d', manifest['container_name']], capture_output=True, text=True, timeout=120, ) if result.returncode != 0: logger.warning( f'docker compose up for {service_id} failed: {result.stderr.strip()}' ) except Exception as e: logger.warning(f'docker compose up for {service_id} failed (non-fatal): {e}') return { 'ok': True, 'service_ip': ip, 'container_name': manifest['container_name'], } def remove(self, service_id: str, purge_data: bool = False) -> dict: """Remove an installed service.""" from firewall_manager import clear_service_rules with self._lock: installed = self.config_manager.get_installed_services() record = installed.get(service_id) if not record: return {'ok': False, 'error': f'Service {service_id} is not installed'} container_name = record.get('container_name', service_id) manifest = record.get('manifest', {}) base_compose = os.environ.get('COMPOSE_FILE', '/app/docker-compose.yml') # Stop and remove container try: subprocess.run( ['docker', 'compose', '-f', base_compose, '-f', self.compose_override, 'stop', container_name], capture_output=True, text=True, timeout=60, ) except Exception as e: logger.warning(f'docker compose stop for {service_id} failed (non-fatal): {e}') try: subprocess.run( ['docker', 'rm', '-f', container_name], capture_output=True, text=True, timeout=30, ) except Exception as e: logger.warning(f'docker rm for {service_id} failed (non-fatal): {e}') # Clear iptables rules try: clear_service_rules(service_id) except Exception as e: logger.warning(f'clear_service_rules for {service_id} failed (non-fatal): {e}') # Remove from config, regenerate compose + caddy self.config_manager.remove_installed_service(service_id) remaining = self.config_manager.get_installed_services() try: content = self._render_compose_override(remaining) self._write_compose_override(content) except Exception as e: logger.error(f'Failed to write compose override after remove: {e}') try: caddy_routes = [ r.get('caddy_route') for r in remaining.values() if r.get('caddy_route') ] self.caddy_manager.regenerate_with_installed(caddy_routes) except Exception as e: logger.warning(f'caddy regenerate after remove failed (non-fatal): {e}') # Purge named volumes if requested if purge_data: for vol in manifest.get('volumes', []): vol_name = vol.get('name', '') if vol_name: try: subprocess.run( ['docker', 'volume', 'rm', vol_name], capture_output=True, text=True, timeout=30, ) except Exception as e: logger.warning( f'docker volume rm {vol_name} failed (non-fatal): {e}' ) return {'ok': True} def list_services(self) -> dict: """Return available (from index) and installed services.""" available = self.fetch_index() installed = self.config_manager.get_installed_services() return {'available': available, 'installed': installed} def reapply_on_startup(self) -> None: """Re-apply firewall and Caddy rules for all installed services on startup.""" from firewall_manager import apply_service_rules installed = self.config_manager.get_installed_services() if not installed: return # Regenerate compose override in case it was deleted try: content = self._render_compose_override(installed) self._write_compose_override(content) except Exception as e: logger.warning(f'reapply_on_startup: compose override write failed: {e}') # Re-apply iptables rules for svc_id, record in installed.items(): ip = record.get('service_ip', '') rules = record.get('iptables_rules', []) try: apply_service_rules(svc_id, ip, rules) except Exception as e: logger.warning(f'reapply_on_startup: apply_service_rules({svc_id}) failed: {e}') # Regenerate Caddyfile try: caddy_routes = [ r.get('caddy_route') for r in installed.values() if r.get('caddy_route') ] self.caddy_manager.regenerate_with_installed(caddy_routes) except Exception as e: logger.warning(f'reapply_on_startup: caddy regenerate failed: {e}')