From c40919d3748307feb9455d1f1e7f6efe4d6499c5 Mon Sep 17 00:00:00 2001 From: Dmitrii Iurco Date: Fri, 29 May 2026 07:10:12 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=200=20=E2=80=94=20manifest=5Fvali?= =?UTF-8?q?dator,=20compose=20YAML=20safety=20check,=20cap=5Fadd=20allowli?= =?UTF-8?q?st,=20backend=20denylist,=20provision=20hook=20enforcement,=20s?= =?UTF-8?q?ize=20cap?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces api/manifest_validator.py as a single security chokepoint imported by both ServiceComposer and ServiceStoreManager: - validate_manifest(): rejects kind=builtin, reserved container names, reserved subdomains, backend denylist (localhost, cell-api, etc.), cap_add outside allowlist / in denylist, shell-string provision hooks, and env values with shell-special characters - validate_rendered_compose(): walks the rendered YAML and rejects privileged:true, host network/pid/ipc/userns, absolute bind mounts, denied capabilities, devices key, apparmor/seccomp unconfined, and string-form command/entrypoint (shell-injection vector) - validate_provision_hook(): requires argv list form, lowercase binary, rejects NUL bytes ServiceStoreManager changes: - _validate_manifest() delegates to validate_manifest() after existing checks - _fetch_manifest() and fetch_index() now stream with a 256 KB size cap (prevents memory exhaustion from a malicious or compromised index) - Digest-pin warning for images missing @sha256 (hard error for unknown registries, warning for git.pic.ngo/roof/* and TRUSTED_IMAGES_NO_DIGEST) ServiceComposer changes: - write_compose() calls validate_rendered_compose() before any disk write so no partial file is left if validation fails - render_template() substitutes ${PIC_DATA_DIR} with the resolved data_dir path 102 new tests in tests/test_manifest_validator.py covering all five P0 security issues. Existing test mocks updated to use streaming response pattern (stream=True + raw.read) and valid compose YAML templates. Co-Authored-By: Claude Sonnet 4.6 --- api/manifest_validator.py | 283 ++++++++ api/service_composer.py | 12 + api/service_store_manager.py | 55 +- tests/test_manifest_validator.py | 1018 +++++++++++++++++++++++++++ tests/test_service_composer.py | 34 +- tests/test_service_store_manager.py | 48 +- 6 files changed, 1412 insertions(+), 38 deletions(-) create mode 100644 api/manifest_validator.py create mode 100644 tests/test_manifest_validator.py diff --git a/api/manifest_validator.py b/api/manifest_validator.py new file mode 100644 index 0000000..9fd3552 --- /dev/null +++ b/api/manifest_validator.py @@ -0,0 +1,283 @@ +""" +manifest_validator — single chokepoint for all manifest and compose YAML security checks. + +Both ServiceComposer and ServiceStoreManager import from here so validation logic +lives in exactly one place and cannot be bypassed by taking either code path. +""" + +import re +import yaml + +_SUBDOMAIN_RE = re.compile(r'^[a-z][a-z0-9-]{0,30}$') +_BACKEND_RE = re.compile(r'^[A-Za-z0-9._-]+:\d{1,5}$') +_CAP_ALLOWLIST = frozenset({ + 'NET_ADMIN', 'NET_RAW', 'NET_BIND_SERVICE', 'CHOWN', 'DAC_OVERRIDE', + 'SETUID', 'SETGID', 'KILL', 'SYS_NICE', +}) +_CAP_DENYLIST = frozenset({ + 'ALL', 'SYS_ADMIN', 'SYS_MODULE', 'SYS_PTRACE', 'SYS_RAWIO', + 'SYS_BOOT', 'MAC_ADMIN', 'MAC_OVERRIDE', 'SYS_TIME', 'SYS_TTY_CONFIG', +}) +_RESERVED_SUBDOMAINS = frozenset({ + 'api', 'webui', 'admin', 'www', 'mail', 'ns1', 'ns2', 'git', 'registry', + 'install', 'calendar', 'files', 'webdav', 'webmail', +}) +_BACKEND_DENYLIST = frozenset({ + 'cell-api', 'cell-caddy', 'cell-coredns', 'cell-dnsmasq', + 'cell-wireguard', 'cell-vault', 'localhost', '127.0.0.1', + '0.0.0.0', 'host.docker.internal', +}) +_RESERVED_CONTAINER_NAMES = frozenset({ + 'cell-api', 'cell-caddy', 'cell-webui', 'cell-coredns', + 'cell-dnsmasq', 'cell-wireguard', 'cell-chrony', +}) +_CONTAINER_NAME_RE = re.compile(r'^cell-[a-z0-9][a-z0-9-]{0,30}$') +_ENV_VALUE_RE = re.compile(r'^[A-Za-z0-9._@:/+\-]{0,256}$') +_HOOK_BINARY_RE = re.compile(r'^[a-z][a-z0-9_-]{0,31}$') +_CAP_NAME_RE = re.compile(r'^[A-Z_]+$') + + +def validate_manifest(manifest: dict) -> tuple: + """ + Validate security-relevant fields of a store manifest. + + Returns (True, []) when all checks pass; (False, [error_strings]) otherwise. + Does not replace the existing _validate_manifest in ServiceStoreManager — + it supplements it as a second layer focused on security-critical fields. + """ + errors = [] + + # kind must be "store" if present — reject builtins coming in over the wire + kind = manifest.get('kind') + if kind is not None and kind != 'store': + errors.append(f'manifest kind must be "store", got: {kind!r}') + + # container_name structural check + cname = manifest.get('container_name') + if cname is not None: + if not _CONTAINER_NAME_RE.match(cname): + errors.append( + f'container_name must match ^cell-[a-z0-9][a-z0-9-]{{0,30}}$, got: {cname!r}' + ) + elif cname in _RESERVED_CONTAINER_NAMES: + errors.append(f'container_name is reserved: {cname!r}') + + # subdomain + subdomain = manifest.get('subdomain') + if subdomain is not None: + _check_subdomain(subdomain, 'subdomain', errors) + + # extra_subdomains + for sub in manifest.get('extra_subdomains') or []: + _check_subdomain(sub, 'extra_subdomains entry', errors) + + # backend + backend = manifest.get('backend') + if backend is not None: + _check_backend(backend, 'backend', errors) + + # extra_backends + for sub_key, bknd_val in (manifest.get('extra_backends') or {}).items(): + _check_backend(bknd_val, f'extra_backends[{sub_key!r}]', errors) + + # cap_add + cap_add = manifest.get('cap_add') + if cap_add is not None: + if not isinstance(cap_add, list): + errors.append('cap_add must be a list') + else: + for cap in cap_add: + if not isinstance(cap, str): + errors.append(f'cap_add entry must be a string, got: {cap!r}') + continue + if not _CAP_NAME_RE.match(cap): + errors.append(f'cap_add entry must match ^[A-Z_]+$, got: {cap!r}') + continue + if cap in _CAP_DENYLIST: + errors.append(f'cap_add entry is explicitly denied: {cap}') + elif cap not in _CAP_ALLOWLIST: + errors.append(f'cap_add entry not in allowlist: {cap}') + + # env values + for env_entry in manifest.get('env') or []: + val = str(env_entry.get('value', '')) + if not _ENV_VALUE_RE.match(val): + errors.append( + f'env[].value contains disallowed characters: {val!r}' + ) + + # provision_hook + hook = (manifest.get('accounts') or {}).get('provision_hook') + if hook is not None: + ok, msg = validate_provision_hook(hook) + if not ok: + errors.append(msg) + + return (len(errors) == 0, errors) + + +def validate_rendered_compose(yaml_text: str) -> tuple: + """ + Parse and security-validate a rendered docker-compose YAML string. + + Returns (True, []) when safe; (False, [error_strings]) otherwise. + Rejects constructs that would give a store service elevated access to the host. + """ + errors = [] + + try: + doc = yaml.safe_load(yaml_text) + except yaml.YAMLError as exc: + return (False, [f'YAML parse error: {exc}']) + + if not isinstance(doc, dict): + return (False, ['compose file must be a YAML mapping']) + + # At least one external network must exist so the container joins the cell network + # rather than an isolated bridge that would be invisible to Caddy and CoreDNS. + networks = doc.get('networks') or {} + has_external = any( + isinstance(v, dict) and v.get('external') + for v in networks.values() + ) + if not has_external: + errors.append( + 'compose file must declare at least one network with external: true' + ) + + for svc_name, svc in (doc.get('services') or {}).items(): + if not isinstance(svc, dict): + continue + prefix = f'service {svc_name!r}' + + if svc.get('privileged') is True: + errors.append(f'{prefix}: privileged: true is not allowed') + + net_mode = svc.get('network_mode') + if net_mode is not None and net_mode not in (None, 'bridge'): + errors.append( + f'{prefix}: network_mode {net_mode!r} is not allowed (only bridge)' + ) + + if svc.get('pid') == 'host': + errors.append(f'{prefix}: pid: host is not allowed') + + if svc.get('ipc') == 'host': + errors.append(f'{prefix}: ipc: host is not allowed') + + if svc.get('userns_mode') == 'host': + errors.append(f'{prefix}: userns_mode: host is not allowed') + + # cap_add + for cap in svc.get('cap_add') or []: + cap_str = str(cap) + if cap_str in _CAP_DENYLIST: + errors.append(f'{prefix}: cap_add {cap_str!r} is explicitly denied') + elif cap_str not in _CAP_ALLOWLIST: + errors.append(f'{prefix}: cap_add {cap_str!r} not in allowlist') + + # volumes — reject absolute host-side bind mounts + for vol in svc.get('volumes') or []: + vol_str = str(vol) + src = vol_str.split(':')[0] if ':' in vol_str else vol_str + if src.startswith('/'): + errors.append( + f'{prefix}: absolute host bind mount not allowed: {vol_str!r}' + ) + + if 'devices' in svc: + errors.append(f'{prefix}: devices key is not allowed') + + for opt in svc.get('security_opt') or []: + opt_str = str(opt) + if opt_str in ('apparmor=unconfined', 'seccomp=unconfined'): + errors.append( + f'{prefix}: security_opt {opt_str!r} is not allowed' + ) + + # command must be a list — string form passes through the shell + cmd = svc.get('command') + if cmd is not None and isinstance(cmd, str): + errors.append( + f'{prefix}: command must be a list, not a shell string' + ) + + # entrypoint must also be a list for the same reason + ep = svc.get('entrypoint') + if ep is not None and isinstance(ep, str): + errors.append( + f'{prefix}: entrypoint must be a list, not a shell string' + ) + + return (len(errors) == 0, errors) + + +def validate_provision_hook(hook) -> tuple: + """ + Validate a provision_hook value from accounts.provision_hook. + + Acceptable: None/absent, or a dict {"argv": ["binary", "arg1", ...]}. + Rejected: any plain string (shell injection risk), empty argv, uppercase binary, + NUL bytes in any element. + + Returns (True, "") on success; (False, error_string) on failure. + """ + if hook is None: + return (True, '') + + if isinstance(hook, str): + return ( + False, + 'provision_hook must be an argv list dict {"argv": [...]}, not a shell string', + ) + + if not isinstance(hook, dict): + return (False, 'provision_hook must be a dict with argv list') + + argv = hook.get('argv') + if not isinstance(argv, list) or len(argv) == 0: + return (False, 'provision_hook.argv must be a non-empty list') + + # NUL-byte check must precede regex check so the error message is unambiguous. + for elem in argv: + if isinstance(elem, str) and '\x00' in elem: + return (False, 'provision_hook.argv element contains NUL byte') + + binary = argv[0] + if not isinstance(binary, str) or not _HOOK_BINARY_RE.match(binary): + return ( + False, + f'provision_hook.argv[0] must match ^[a-z][a-z0-9_-]{{0,31}}$, got: {binary!r}', + ) + + return (True, '') + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _check_subdomain(value, field_name: str, errors: list) -> None: + if not isinstance(value, str): + errors.append(f'{field_name} must be a string') + return + if value in _RESERVED_SUBDOMAINS: + errors.append(f'{field_name} is reserved: {value!r}') + elif not _SUBDOMAIN_RE.match(value): + errors.append( + f'{field_name} must match ^[a-z][a-z0-9-]{{0,30}}$, got: {value!r}' + ) + + +def _check_backend(value, field_name: str, errors: list) -> None: + if not isinstance(value, str): + errors.append(f'{field_name} must be a string') + return + if not _BACKEND_RE.match(value): + errors.append( + f'{field_name} must be host:port (e.g. cell-foo:8080), got: {value!r}' + ) + return + host = value.split(':')[0] + if host in _BACKEND_DENYLIST: + errors.append(f'{field_name} host {host!r} is in the backend denylist') diff --git a/api/service_composer.py b/api/service_composer.py index 3dfe6c3..72cf53b 100644 --- a/api/service_composer.py +++ b/api/service_composer.py @@ -23,8 +23,11 @@ import secrets as _secrets_lib import shutil import subprocess import threading +from pathlib import Path from typing import Dict, List, Optional +from manifest_validator import validate_rendered_compose + logger = logging.getLogger('picell') _SECRET_RE = re.compile(r'\$\{(PIC_SECRET_\w+)\}') @@ -136,6 +139,7 @@ class ServiceComposer: result = result.replace('${PIC_DOMAIN}', domain) result = result.replace('${PIC_CELL_NAME}', cell_name) result = result.replace('${PIC_SERVICE_ID}', service_id) + result = result.replace('${PIC_DATA_DIR}', str(Path(self.data_dir).resolve())) # PIC_SECRET_* — generate on first use, reuse on reconfigure for match in _SECRET_RE.finditer(template_content): @@ -150,6 +154,14 @@ class ServiceComposer: """Render and atomically write the per-service compose file. Returns rendered content.""" os.makedirs(self._svc_dir(service_id), exist_ok=True) content = self.render_template(service_id, manifest, template_content) + + # Validate before any file I/O so a bad template never touches disk. + ok, errs = validate_rendered_compose(content) + if not ok: + raise ValueError( + f'Compose template failed security validation: {"; ".join(errs)}' + ) + path = self._compose_path(service_id) tmp = path + '.tmp' with open(tmp, 'w') as f: diff --git a/api/service_store_manager.py b/api/service_store_manager.py index a9857fe..fd224cd 100644 --- a/api/service_store_manager.py +++ b/api/service_store_manager.py @@ -18,11 +18,14 @@ import subprocess from datetime import datetime from typing import Any, Dict, List, Optional, Tuple +import json + import requests import yaml from base_service_manager import BaseServiceManager from ip_utils import CONTAINER_OFFSETS +from manifest_validator import validate_manifest, validate_provision_hook logger = logging.getLogger(__name__) @@ -41,8 +44,19 @@ MANIFEST_URL_TPL = ( ) IMAGE_ALLOWLIST_RE = re.compile( - r'^git\.pic\.ngo/roof/[a-z0-9._/-]+(:[a-zA-Z0-9._-]+)?$' + r'^git\.pic\.ngo/roof/[a-z0-9._/-]+(:[a-zA-Z0-9._-]+)?(@sha256:[a-f0-9]{64})?$' ) + +# Images from well-known vendors that pre-date digest pinning in PIC. +# These are allowed to ship without a @sha256 digest; all others require one +# or must come from git.pic.ngo/roof/*. +TRUSTED_IMAGES_NO_DIGEST = frozenset({ + 'mailserver/docker-mailserver', + 'tomsquest/docker-radicale', + 'bytemark/webdav', + 'filegator/filegator', + 'hardware/rainloop', +}) FORBIDDEN_MOUNTS = frozenset([ '/', '/etc', '/var', '/proc', '/sys', '/dev', '/app', '/run', '/boot', ]) @@ -112,6 +126,21 @@ class ServiceStoreManager(BaseServiceManager): errors.append( f'image must match git.pic.ngo/roof/* pattern, got: {image}' ) + elif image: + # Warn when a digest pin is absent so operators know exact-version + # tracking is not guaranteed. Images in TRUSTED_IMAGES_NO_DIGEST + # and images from our own git.pic.ngo/roof/* registry (which we + # build and tag) get warnings rather than hard errors; any other + # image that somehow passes the allowlist gets a hard error. + if '@sha256:' not in image: + image_base = image.split(':')[0].split('@')[0] + is_own_registry = image_base.startswith('git.pic.ngo/roof/') + if image_base in TRUSTED_IMAGES_NO_DIGEST or is_own_registry: + logger.warning('image %s has no digest pin', image) + else: + errors.append( + f'image {image!r} must include a @sha256: pin' + ) # Volume mount safety for vol in m.get('volumes', []): @@ -202,6 +231,12 @@ class ServiceStoreManager(BaseServiceManager): f'env[].value contains disallowed characters: {val!r}' ) + # Security layer: delegate to manifest_validator for cap_add, backend + # denylist, provision_hook, reserved container names, and kind guard. + ok, sec_errs = validate_manifest(m) + if not ok: + errors.extend(sec_errs) + return (len(errors) == 0, errors) # ── IP allocation ───────────────────────────────────────────────────── @@ -328,13 +363,17 @@ class ServiceStoreManager(BaseServiceManager): def fetch_index(self) -> list: """Fetch and cache the service index.""" import time + _SIZE_LIMIT = 256 * 1024 now = time.time() if self._index_cache is not None and (now - self._index_cache_time) < self._cache_ttl: return self._index_cache try: - resp = requests.get(self.index_url, timeout=10) + resp = requests.get(self.index_url, timeout=10, stream=True) resp.raise_for_status() - data = resp.json() + content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True) + if len(content) > _SIZE_LIMIT: + raise ValueError('Index response exceeds 256 KB limit') + data = json.loads(content) self._index_cache = data if isinstance(data, list) else data.get('services', []) self._index_cache_time = now return self._index_cache @@ -344,10 +383,16 @@ class ServiceStoreManager(BaseServiceManager): def _fetch_manifest(self, service_id: str) -> dict: """Fetch a service manifest by ID.""" + _SIZE_LIMIT = 256 * 1024 url = MANIFEST_URL_TPL.format(id=service_id) - resp = requests.get(url, timeout=10) + resp = requests.get(url, timeout=10, stream=True) resp.raise_for_status() - return resp.json() + content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True) + if len(content) > _SIZE_LIMIT: + raise ValueError( + f'Manifest response for {service_id} exceeds 256 KB limit' + ) + return json.loads(content) # ── Core operations ─────────────────────────────────────────────────── diff --git a/tests/test_manifest_validator.py b/tests/test_manifest_validator.py new file mode 100644 index 0000000..6159bdb --- /dev/null +++ b/tests/test_manifest_validator.py @@ -0,0 +1,1018 @@ +""" +Tests for api/manifest_validator.py — Phase 0 security foundations. + +Covers validate_manifest(), validate_rendered_compose(), and +validate_provision_hook(), plus integration tests for the +ServiceComposer.write_compose() guard and ServiceStoreManager._validate_manifest() +security delegation. + +No Docker, network, or filesystem calls are made. +""" + +import json +import os +import sys +import tempfile +import unittest +from unittest.mock import MagicMock, patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api')) + +from manifest_validator import ( + validate_manifest, + validate_rendered_compose, + validate_provision_hook, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _minimal_manifest(**overrides): + """Minimal valid manifest for validate_manifest() (no image/id needed here).""" + m = { + 'container_name': 'cell-myapp', + 'subdomain': 'myapp', + 'backend': 'cell-myapp:8080', + } + m.update(overrides) + return m + + +def _valid_compose(extra_services=None, networks=None): + """Return a compose YAML string that passes all checks.""" + services = extra_services or '' + net_block = networks or ( + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + return ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: ["nginx", "-g", "daemon off;"]\n' + + services + + net_block + ) + + +# --------------------------------------------------------------------------- +# TestValidateManifest +# --------------------------------------------------------------------------- + +class TestValidateManifest(unittest.TestCase): + + # ── valid minimal manifest ──────────────────────────────────────────── + + def test_valid_minimal_manifest_passes(self): + ok, errs = validate_manifest(_minimal_manifest()) + self.assertTrue(ok) + self.assertEqual(errs, []) + + def test_returns_true_empty_list_on_success(self): + result = validate_manifest(_minimal_manifest()) + self.assertIsInstance(result, tuple) + self.assertEqual(len(result), 2) + ok, errs = result + self.assertIs(ok, True) + self.assertIsInstance(errs, list) + + def test_returns_false_list_of_strings_on_failure(self): + ok, errs = validate_manifest({'kind': 'builtin'}) + self.assertIs(ok, False) + self.assertIsInstance(errs, list) + self.assertTrue(len(errs) > 0) + for e in errs: + self.assertIsInstance(e, str) + + # ── kind ───────────────────────────────────────────────────────────── + + def test_kind_builtin_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(kind='builtin')) + self.assertFalse(ok) + self.assertTrue(any('kind' in e for e in errs)) + + def test_kind_store_passes(self): + ok, errs = validate_manifest(_minimal_manifest(kind='store')) + self.assertTrue(ok) + + def test_kind_absent_passes(self): + m = _minimal_manifest() + m.pop('kind', None) + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + def test_kind_unknown_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(kind='custom')) + self.assertFalse(ok) + + # ── container_name ──────────────────────────────────────────────────── + + def test_container_name_without_cell_prefix_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(container_name='myapp')) + self.assertFalse(ok) + self.assertTrue(any('container_name' in e for e in errs)) + + def test_container_name_cell_api_rejected_reserved(self): + ok, errs = validate_manifest(_minimal_manifest(container_name='cell-api')) + self.assertFalse(ok) + self.assertTrue(any('reserved' in e for e in errs)) + + def test_container_name_cell_valid_name_passes(self): + ok, errs = validate_manifest(_minimal_manifest(container_name='cell-valid-name')) + self.assertTrue(ok) + + def test_container_name_with_uppercase_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(container_name='cell-MyApp')) + self.assertFalse(ok) + + def test_container_name_cell_caddy_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(container_name='cell-caddy')) + self.assertFalse(ok) + + def test_container_name_cell_wireguard_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(container_name='cell-wireguard')) + self.assertFalse(ok) + + def test_container_name_absent_passes(self): + m = _minimal_manifest() + m.pop('container_name', None) + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + # ── subdomain ──────────────────────────────────────────────────────── + + def test_subdomain_api_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(subdomain='api')) + self.assertFalse(ok) + self.assertTrue(any('reserved' in e for e in errs)) + + def test_subdomain_webui_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(subdomain='webui')) + self.assertFalse(ok) + + def test_subdomain_myapp_passes(self): + ok, errs = validate_manifest(_minimal_manifest(subdomain='myapp')) + self.assertTrue(ok) + + def test_subdomain_absent_passes(self): + m = _minimal_manifest() + m.pop('subdomain', None) + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + def test_subdomain_calendar_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(subdomain='calendar')) + self.assertFalse(ok) + + def test_subdomain_files_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(subdomain='files')) + self.assertFalse(ok) + + # ── extra_subdomains ───────────────────────────────────────────────── + + def test_extra_subdomains_admin_rejected(self): + ok, errs = validate_manifest( + _minimal_manifest(extra_subdomains=['admin']) + ) + self.assertFalse(ok) + self.assertTrue(any('reserved' in e for e in errs)) + + def test_extra_subdomains_valid_entry_passes(self): + ok, errs = validate_manifest( + _minimal_manifest(extra_subdomains=['sub']) + ) + self.assertTrue(ok) + + def test_extra_subdomains_multiple_one_bad_rejected(self): + ok, errs = validate_manifest( + _minimal_manifest(extra_subdomains=['good', 'admin']) + ) + self.assertFalse(ok) + + # ── backend ────────────────────────────────────────────────────────── + + def test_backend_cell_api_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(backend='cell-api:3000')) + self.assertFalse(ok) + self.assertTrue(any('denylist' in e for e in errs)) + + def test_backend_localhost_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(backend='localhost:8080')) + self.assertFalse(ok) + + def test_backend_cell_mail_valid_port_passes(self): + ok, errs = validate_manifest(_minimal_manifest(backend='cell-mail:25')) + self.assertTrue(ok) + + def test_backend_bad_format_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(backend='not-a-backend')) + self.assertFalse(ok) + self.assertTrue(any('host:port' in e for e in errs)) + + def test_backend_127_0_0_1_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(backend='127.0.0.1:8080')) + self.assertFalse(ok) + + def test_backend_host_docker_internal_rejected(self): + ok, errs = validate_manifest( + _minimal_manifest(backend='host.docker.internal:8080') + ) + self.assertFalse(ok) + + def test_backend_absent_passes(self): + m = _minimal_manifest() + m.pop('backend', None) + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + # ── extra_backends ─────────────────────────────────────────────────── + + def test_extra_backends_cell_coredns_rejected(self): + ok, errs = validate_manifest( + _minimal_manifest(extra_backends={'dns': 'cell-coredns:53'}) + ) + self.assertFalse(ok) + + def test_extra_backends_valid_passes(self): + ok, errs = validate_manifest( + _minimal_manifest(extra_backends={'alt': 'cell-myapp:80'}) + ) + self.assertTrue(ok) + + # ── cap_add ────────────────────────────────────────────────────────── + + def test_cap_add_sys_admin_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(cap_add=['SYS_ADMIN'])) + self.assertFalse(ok) + self.assertTrue(any('SYS_ADMIN' in e for e in errs)) + + def test_cap_add_all_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(cap_add=['ALL'])) + self.assertFalse(ok) + self.assertTrue(any('ALL' in e for e in errs)) + + def test_cap_add_net_admin_passes(self): + ok, errs = validate_manifest(_minimal_manifest(cap_add=['NET_ADMIN'])) + self.assertTrue(ok) + + def test_cap_add_net_admin_and_sys_admin_rejected(self): + ok, errs = validate_manifest( + _minimal_manifest(cap_add=['NET_ADMIN', 'SYS_ADMIN']) + ) + self.assertFalse(ok) + self.assertTrue(any('SYS_ADMIN' in e for e in errs)) + + def test_cap_add_unknown_cap_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(cap_add=['UNKNOWN_CAP'])) + self.assertFalse(ok) + self.assertTrue(any('allowlist' in e for e in errs)) + + def test_cap_add_sys_module_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(cap_add=['SYS_MODULE'])) + self.assertFalse(ok) + + def test_cap_add_not_a_list_rejected(self): + ok, errs = validate_manifest(_minimal_manifest(cap_add='NET_ADMIN')) + self.assertFalse(ok) + + # ── provision_hook (via accounts key) ──────────────────────────────── + + def test_provision_hook_string_rejected(self): + m = _minimal_manifest(accounts={'provision_hook': 'setup email add {username}'}) + ok, errs = validate_manifest(m) + self.assertFalse(ok) + self.assertTrue(any('shell string' in e for e in errs)) + + def test_provision_hook_argv_dict_passes(self): + m = _minimal_manifest( + accounts={'provision_hook': {'argv': ['setup', 'email', 'add']}} + ) + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + def test_provision_hook_absent_passes(self): + m = _minimal_manifest() + # no 'accounts' key at all + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + def test_provision_hook_none_in_accounts_passes(self): + m = _minimal_manifest(accounts={'provision_hook': None}) + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + # ── env values ─────────────────────────────────────────────────────── + + def test_env_value_with_dollar_rejected(self): + m = _minimal_manifest(env=[{'key': 'X', 'value': '$SECRET'}]) + ok, errs = validate_manifest(m) + self.assertFalse(ok) + + def test_env_value_with_curly_brace_rejected(self): + m = _minimal_manifest(env=[{'key': 'X', 'value': '{bad}'}]) + ok, errs = validate_manifest(m) + self.assertFalse(ok) + + def test_env_value_example_com_passes(self): + m = _minimal_manifest(env=[{'key': 'DOMAIN', 'value': 'example.com'}]) + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + def test_env_value_complex_safe_string_passes(self): + m = _minimal_manifest( + env=[{'key': 'X', 'value': 'user@host:port/path+extra-chars'}] + ) + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + def test_env_empty_passes(self): + m = _minimal_manifest(env=[]) + ok, errs = validate_manifest(m) + self.assertTrue(ok) + + +# --------------------------------------------------------------------------- +# TestValidateRenderedCompose +# --------------------------------------------------------------------------- + +class TestValidateRenderedCompose(unittest.TestCase): + + # ── valid compose ───────────────────────────────────────────────────── + + def test_valid_compose_with_external_network_passes(self): + ok, errs = validate_rendered_compose(_valid_compose()) + self.assertTrue(ok) + self.assertEqual(errs, []) + + def test_invalid_yaml_returns_error(self): + ok, errs = validate_rendered_compose('}{invalid yaml{') + self.assertFalse(ok) + self.assertTrue(any('YAML' in e for e in errs)) + + def test_non_mapping_yaml_rejected(self): + ok, errs = validate_rendered_compose('- item1\n- item2\n') + self.assertFalse(ok) + + # ── privileged ─────────────────────────────────────────────────────── + + def test_privileged_true_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' privileged: true\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('privileged' in e for e in errs)) + + def test_privileged_false_passes(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' privileged: false\n' + ' command: ["echo"]\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertTrue(ok) + + # ── network_mode ───────────────────────────────────────────────────── + + def test_network_mode_host_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' network_mode: host\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('network_mode' in e for e in errs)) + + def test_network_mode_none_value_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' network_mode: "none"\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + + def test_no_network_mode_key_passes(self): + ok, errs = validate_rendered_compose(_valid_compose()) + self.assertTrue(ok) + + # ── volumes ────────────────────────────────────────────────────────── + + def test_absolute_volume_etc_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' volumes:\n' + ' - /etc:/etc\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('bind mount' in e for e in errs)) + + def test_absolute_volume_docker_sock_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' volumes:\n' + ' - /var/run/docker.sock:/var/run/docker.sock\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + + def test_relative_volume_passes(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: ["echo"]\n' + ' volumes:\n' + ' - ./data:/data\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertTrue(ok) + + def test_named_volume_passes(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: ["echo"]\n' + ' volumes:\n' + ' - myvolume:/data\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertTrue(ok) + + # ── cap_add ────────────────────────────────────────────────────────── + + def test_compose_cap_add_sys_admin_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' cap_add:\n' + ' - SYS_ADMIN\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('SYS_ADMIN' in e for e in errs)) + + def test_compose_cap_add_net_admin_passes(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: ["echo"]\n' + ' cap_add:\n' + ' - NET_ADMIN\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertTrue(ok) + + # ── external network requirement ───────────────────────────────────── + + def test_no_external_network_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: ["echo"]\n' + 'networks:\n' + ' mynet:\n' + ' driver: bridge\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('external' in e for e in errs)) + + def test_no_networks_key_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: ["echo"]\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + + # ── devices ────────────────────────────────────────────────────────── + + def test_devices_key_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' devices:\n' + ' - /dev/snd:/dev/snd\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('devices' in e for e in errs)) + + # ── security_opt ───────────────────────────────────────────────────── + + def test_security_opt_apparmor_unconfined_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' security_opt:\n' + ' - apparmor=unconfined\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('apparmor=unconfined' in e for e in errs)) + + def test_security_opt_no_new_privileges_passes(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: ["echo"]\n' + ' security_opt:\n' + ' - no-new-privileges:true\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertTrue(ok) + + def test_security_opt_seccomp_unconfined_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' security_opt:\n' + ' - seccomp=unconfined\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + + # ── command / entrypoint ───────────────────────────────────────────── + + def test_command_as_string_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: "echo hello"\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('command' in e for e in errs)) + + def test_command_as_list_passes(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: ["echo", "hello"]\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertTrue(ok) + + def test_entrypoint_as_string_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' entrypoint: "/bin/sh -c evil"\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('entrypoint' in e for e in errs)) + + # ── pid / ipc / userns ─────────────────────────────────────────────── + + def test_pid_host_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' pid: host\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('pid' in e for e in errs)) + + def test_ipc_host_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' ipc: host\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + + # ── multi-service ───────────────────────────────────────────────────── + + def test_multiple_services_one_invalid_rejected(self): + yaml_text = ( + 'version: "3.8"\n' + 'services:\n' + ' good:\n' + ' image: nginx\n' + ' command: ["echo"]\n' + ' bad:\n' + ' image: nginx\n' + ' privileged: true\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + ok, errs = validate_rendered_compose(yaml_text) + self.assertFalse(ok) + self.assertTrue(any('bad' in e for e in errs)) + + +# --------------------------------------------------------------------------- +# TestValidateProvisionHook +# --------------------------------------------------------------------------- + +class TestValidateProvisionHook(unittest.TestCase): + + def test_none_passes(self): + ok, msg = validate_provision_hook(None) + self.assertTrue(ok) + self.assertEqual(msg, '') + + def test_string_rejected(self): + ok, msg = validate_provision_hook('setup email add {username}') + self.assertFalse(ok) + self.assertIn('shell string', msg) + + def test_argv_dict_passes(self): + ok, msg = validate_provision_hook({'argv': ['setup', 'email', 'add']}) + self.assertTrue(ok) + self.assertEqual(msg, '') + + def test_empty_argv_rejected(self): + ok, msg = validate_provision_hook({'argv': []}) + self.assertFalse(ok) + self.assertIn('non-empty', msg) + + def test_single_element_argv_passes(self): + ok, msg = validate_provision_hook({'argv': ['setup']}) + self.assertTrue(ok) + + def test_uppercase_binary_rejected(self): + ok, msg = validate_provision_hook({'argv': ['UPPERCASE']}) + self.assertFalse(ok) + self.assertIn('argv[0]', msg) + + def test_nul_byte_in_element_rejected(self): + ok, msg = validate_provision_hook({'argv': ['setup\x00evil']}) + self.assertFalse(ok) + self.assertIn('NUL', msg) + + def test_nul_byte_in_arg_not_binary_rejected(self): + ok, msg = validate_provision_hook({'argv': ['setup', 'arg\x00evil']}) + self.assertFalse(ok) + + def test_integer_rejected(self): + ok, msg = validate_provision_hook(42) + self.assertFalse(ok) + self.assertIn('dict', msg) + + def test_list_rejected(self): + ok, msg = validate_provision_hook(['setup', 'add']) + self.assertFalse(ok) + + def test_dict_without_argv_rejected(self): + ok, msg = validate_provision_hook({'cmd': ['setup']}) + self.assertFalse(ok) + self.assertIn('argv', msg) + + def test_binary_with_digits_passes(self): + ok, msg = validate_provision_hook({'argv': ['setup2']}) + self.assertTrue(ok) + + def test_binary_with_hyphens_and_underscores_passes(self): + ok, msg = validate_provision_hook({'argv': ['my-setup_tool']}) + self.assertTrue(ok) + + def test_binary_starting_with_digit_rejected(self): + ok, msg = validate_provision_hook({'argv': ['2setup']}) + self.assertFalse(ok) + + +# --------------------------------------------------------------------------- +# TestServiceComposerIntegration +# --------------------------------------------------------------------------- + +class TestServiceComposerIntegration(unittest.TestCase): + """Integration tests for ServiceComposer.write_compose() security gate.""" + + def _make_cm(self): + cm = MagicMock() + cm.get_identity.return_value = { + 'cell_name': 'testcell', + 'domain': 'cell.local', + } + cm.get_effective_domain.return_value = 'cell.local' + cm.configs = {} + return cm + + def test_write_compose_raises_on_privileged_true(self): + from service_composer import ServiceComposer + privileged_template = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' privileged: true\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + with tempfile.TemporaryDirectory() as tmpdir: + composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) + manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} + with self.assertRaises(ValueError) as ctx: + composer.write_compose('testsvc', manifest, privileged_template) + self.assertIn('security validation', str(ctx.exception)) + + def test_write_compose_no_temp_file_on_validation_failure(self): + """Validation failure must not leave a .tmp file on disk.""" + from service_composer import ServiceComposer + privileged_template = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' privileged: true\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + with tempfile.TemporaryDirectory() as tmpdir: + composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) + manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} + svc_dir = os.path.join(tmpdir, 'services', 'testsvc') + os.makedirs(svc_dir, exist_ok=True) + try: + composer.write_compose('testsvc', manifest, privileged_template) + except ValueError: + pass + tmp_path = os.path.join(svc_dir, 'docker-compose.yml.tmp') + final_path = os.path.join(svc_dir, 'docker-compose.yml') + self.assertFalse( + os.path.exists(tmp_path), + 'Temp file should not exist after validation failure', + ) + self.assertFalse( + os.path.exists(final_path), + 'Final compose file should not exist after validation failure', + ) + + def test_render_template_replaces_pic_data_dir(self): + from service_composer import ServiceComposer + from pathlib import Path + with tempfile.TemporaryDirectory() as tmpdir: + composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) + manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} + template = 'DATA=${PIC_DATA_DIR}' + result = composer.render_template('testsvc', manifest, template) + expected = str(Path(tmpdir).resolve()) + self.assertIn(expected, result) + self.assertNotIn('${PIC_DATA_DIR}', result) + + def test_write_compose_succeeds_on_valid_template(self): + from service_composer import ServiceComposer + valid_template = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' command: ["echo", "ok"]\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + with tempfile.TemporaryDirectory() as tmpdir: + composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) + manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} + content = composer.write_compose('testsvc', manifest, valid_template) + self.assertIn('nginx', content) + expected_path = os.path.join( + tmpdir, 'services', 'testsvc', 'docker-compose.yml' + ) + self.assertTrue(os.path.exists(expected_path)) + + +# --------------------------------------------------------------------------- +# TestServiceStoreManagerSecurityIntegration +# --------------------------------------------------------------------------- + +class TestServiceStoreManagerSecurityIntegration(unittest.TestCase): + """Integration tests for ServiceStoreManager._validate_manifest() security additions.""" + + def _make_manager(self): + from service_store_manager import ServiceStoreManager + cm = MagicMock() + cm.get_installed_services.return_value = {} + cm.get_identity.return_value = { + 'ip_range': '172.20.0.0/16', + 'service_ips': {}, + } + return ServiceStoreManager( + config_manager=cm, + caddy_manager=MagicMock(), + container_manager=MagicMock(), + ) + + def _valid_manifest(self, **overrides): + m = { + 'id': 'myapp', + 'name': 'My App', + 'version': '1.0.0', + 'author': 'Test Author', + 'image': 'git.pic.ngo/roof/myapp:latest', + 'container_name': 'cell-myapp', + } + m.update(overrides) + return m + + def test_validate_manifest_rejects_cap_add_sys_admin(self): + from service_store_manager import ServiceStoreManager + m = self._valid_manifest(cap_add=['SYS_ADMIN']) + ok, errs = ServiceStoreManager._validate_manifest(m) + self.assertFalse(ok) + self.assertTrue(any('SYS_ADMIN' in e for e in errs)) + + def test_validate_manifest_accepts_cap_add_net_admin(self): + from service_store_manager import ServiceStoreManager + m = self._valid_manifest(cap_add=['NET_ADMIN']) + ok, errs = ServiceStoreManager._validate_manifest(m) + self.assertTrue(ok) + + def test_validate_manifest_rejects_provision_hook_string(self): + from service_store_manager import ServiceStoreManager + m = self._valid_manifest( + accounts={'provision_hook': 'setup user add'} + ) + ok, errs = ServiceStoreManager._validate_manifest(m) + self.assertFalse(ok) + self.assertTrue(any('shell string' in e for e in errs)) + + def test_validate_manifest_rejects_kind_builtin(self): + from service_store_manager import ServiceStoreManager + m = self._valid_manifest(kind='builtin') + ok, errs = ServiceStoreManager._validate_manifest(m) + self.assertFalse(ok) + + def test_validate_manifest_rejects_reserved_container_name(self): + from service_store_manager import ServiceStoreManager + m = self._valid_manifest(container_name='cell-api') + ok, errs = ServiceStoreManager._validate_manifest(m) + self.assertFalse(ok) + + def test_fetch_manifest_raises_on_oversized_response(self): + from service_store_manager import ServiceStoreManager + mgr = self._make_manager() + oversized_content = b'x' * (256 * 1024 + 1) + raw_mock = MagicMock() + raw_mock.read.return_value = oversized_content + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.raw = raw_mock + with patch('service_store_manager.requests.get', return_value=mock_resp): + with self.assertRaises(ValueError) as ctx: + mgr._fetch_manifest('bigservice') + self.assertIn('256 KB', str(ctx.exception)) + + def test_fetch_manifest_succeeds_with_small_response(self): + from service_store_manager import ServiceStoreManager + mgr = self._make_manager() + payload = {'id': 'smallsvc', 'name': 'Small'} + small_content = json.dumps(payload).encode() + raw_mock = MagicMock() + raw_mock.read.return_value = small_content + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.raw = raw_mock + with patch('service_store_manager.requests.get', return_value=mock_resp): + result = mgr._fetch_manifest('smallsvc') + self.assertEqual(result['id'], 'smallsvc') + + def test_fetch_manifest_requests_stream_true(self): + """_fetch_manifest must use stream=True so raw.read() is available.""" + from service_store_manager import ServiceStoreManager + mgr = self._make_manager() + payload = json.dumps({'id': 'svc'}).encode() + raw_mock = MagicMock() + raw_mock.read.return_value = payload + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.raw = raw_mock + with patch('service_store_manager.requests.get', return_value=mock_resp) as mock_get: + mgr._fetch_manifest('svc') + _, kwargs = mock_get.call_args + self.assertTrue(kwargs.get('stream'), 'requests.get must be called with stream=True') + + def test_validate_manifest_rejects_backend_denylist(self): + from service_store_manager import ServiceStoreManager + m = self._valid_manifest(backend='localhost:8080') + ok, errs = ServiceStoreManager._validate_manifest(m) + self.assertFalse(ok) + + def test_validate_manifest_accepts_valid_backend(self): + from service_store_manager import ServiceStoreManager + m = self._valid_manifest(backend='cell-myapp:8080') + ok, errs = ServiceStoreManager._validate_manifest(m) + self.assertTrue(ok) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_service_composer.py b/tests/test_service_composer.py index e20c824..f0d5aa6 100644 --- a/tests/test_service_composer.py +++ b/tests/test_service_composer.py @@ -149,7 +149,17 @@ class TestWriteCompose(unittest.TestCase): cm = _make_cm() composer = ServiceComposer(config_manager=cm, data_dir=tmpdir) manifest = _make_manifest() - template = 'PORT=${PIC_CFG_PORT}' + template = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + ' environment:\n' + ' PORT: "${PIC_CFG_PORT}"\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) composer.write_compose('myservice', manifest, template) expected_path = os.path.join( @@ -169,7 +179,16 @@ class TestWriteCompose(unittest.TestCase): with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir) manifest = _make_manifest() - composer.write_compose('myservice', manifest, 'content: true') + valid_template = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + composer.write_compose('myservice', manifest, valid_template) self.assertTrue(composer.has_compose_file('myservice')) def test_atomic_write_via_tmp_file(self): @@ -178,7 +197,16 @@ class TestWriteCompose(unittest.TestCase): composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir) manifest = _make_manifest() # Should not raise even if fsync not available - composer.write_compose('myservice', manifest, 'content: yes') + valid_template = ( + 'version: "3.8"\n' + 'services:\n' + ' app:\n' + ' image: nginx\n' + 'networks:\n' + ' cell-network:\n' + ' external: true\n' + ) + composer.write_compose('myservice', manifest, valid_template) path = os.path.join(tmpdir, 'services', 'myservice', 'docker-compose.yml') self.assertTrue(os.path.exists(path)) diff --git a/tests/test_service_store_manager.py b/tests/test_service_store_manager.py index 7188222..c4df9d9 100644 --- a/tests/test_service_store_manager.py +++ b/tests/test_service_store_manager.py @@ -6,6 +6,7 @@ All external I/O (requests, subprocess, docker, config_manager, caddy_manager, container_manager) is mocked so these tests run without any live infrastructure. """ +import json import os import sys import time @@ -24,6 +25,17 @@ from ip_utils import CONTAINER_OFFSETS # Helpers # --------------------------------------------------------------------------- +def _make_streaming_mock(data): + """Return a MagicMock that simulates a requests streaming response for ``data``.""" + encoded = json.dumps(data).encode() + raw = MagicMock() + raw.read.return_value = encoded + mock_resp = MagicMock(status_code=200) + mock_resp.raise_for_status = MagicMock() + mock_resp.raw = raw + return mock_resp + + def _make_manager(tmp_dir=None, installed=None, identity=None): """Build a ServiceStoreManager backed by mock dependencies.""" cm = MagicMock() @@ -711,11 +723,7 @@ class TestListServices(unittest.TestCase): def test_returns_available_and_installed_keys(self): mgr = _make_manager() with patch('service_store_manager.requests.get') as mock_get: - mock_get.return_value = MagicMock( - status_code=200, - json=lambda: self._fake_index(), - ) - mock_get.return_value.raise_for_status = MagicMock() + mock_get.return_value = _make_streaming_mock(self._fake_index()) result = mgr.list_services() self.assertIn('available', result) self.assertIn('installed', result) @@ -723,11 +731,7 @@ class TestListServices(unittest.TestCase): def test_available_list_comes_from_index(self): mgr = _make_manager() with patch('service_store_manager.requests.get') as mock_get: - mock_get.return_value = MagicMock( - status_code=200, - json=lambda: self._fake_index(), - ) - mock_get.return_value.raise_for_status = MagicMock() + mock_get.return_value = _make_streaming_mock(self._fake_index()) result = mgr.list_services() self.assertEqual(len(result['available']), 2) self.assertEqual(result['available'][0]['id'], 'svc1') @@ -736,22 +740,14 @@ class TestListServices(unittest.TestCase): installed = {'svc1': {'id': 'svc1', 'name': 'Service One'}} mgr = _make_manager(installed=installed) with patch('service_store_manager.requests.get') as mock_get: - mock_get.return_value = MagicMock( - status_code=200, - json=lambda: self._fake_index(), - ) - mock_get.return_value.raise_for_status = MagicMock() + mock_get.return_value = _make_streaming_mock(self._fake_index()) result = mgr.list_services() self.assertIn('svc1', result['installed']) def test_cache_prevents_second_http_request_within_ttl(self): mgr = _make_manager() with patch('service_store_manager.requests.get') as mock_get: - mock_get.return_value = MagicMock( - status_code=200, - json=lambda: self._fake_index(), - ) - mock_get.return_value.raise_for_status = MagicMock() + mock_get.return_value = _make_streaming_mock(self._fake_index()) mgr.fetch_index() mgr.fetch_index() # Only one HTTP call despite two fetches @@ -761,11 +757,7 @@ class TestListServices(unittest.TestCase): mgr = _make_manager() mgr._cache_ttl = 1 # 1 second TTL for the test with patch('service_store_manager.requests.get') as mock_get: - mock_get.return_value = MagicMock( - status_code=200, - json=lambda: self._fake_index(), - ) - mock_get.return_value.raise_for_status = MagicMock() + mock_get.return_value = _make_streaming_mock(self._fake_index()) mgr.fetch_index() # Simulate TTL expiry by winding back the cache timestamp mgr._index_cache_time -= 2 @@ -776,11 +768,7 @@ class TestListServices(unittest.TestCase): """Index JSON wrapped in {'services': [...]} is also handled.""" mgr = _make_manager() with patch('service_store_manager.requests.get') as mock_get: - mock_get.return_value = MagicMock( - status_code=200, - json=lambda: {'services': self._fake_index()}, - ) - mock_get.return_value.raise_for_status = MagicMock() + mock_get.return_value = _make_streaming_mock({'services': self._fake_index()}) result = mgr.list_services() self.assertEqual(len(result['available']), 2)