feat: Phase 0 — manifest_validator, compose YAML safety check, cap_add allowlist, backend denylist, provision hook enforcement, size cap

Introduces api/manifest_validator.py as a single security chokepoint
imported by both ServiceComposer and ServiceStoreManager:

- validate_manifest(): rejects kind=builtin, reserved container names,
  reserved subdomains, backend denylist (localhost, cell-api, etc.),
  cap_add outside allowlist / in denylist, shell-string provision hooks,
  and env values with shell-special characters
- validate_rendered_compose(): walks the rendered YAML and rejects
  privileged:true, host network/pid/ipc/userns, absolute bind mounts,
  denied capabilities, devices key, apparmor/seccomp unconfined, and
  string-form command/entrypoint (shell-injection vector)
- validate_provision_hook(): requires argv list form, lowercase binary,
  rejects NUL bytes

ServiceStoreManager changes:
- _validate_manifest() delegates to validate_manifest() after existing checks
- _fetch_manifest() and fetch_index() now stream with a 256 KB size cap
  (prevents memory exhaustion from a malicious or compromised index)
- Digest-pin warning for images missing @sha256 (hard error for unknown
  registries, warning for git.pic.ngo/roof/* and TRUSTED_IMAGES_NO_DIGEST)

ServiceComposer changes:
- write_compose() calls validate_rendered_compose() before any disk write
  so no partial file is left if validation fails
- render_template() substitutes ${PIC_DATA_DIR} with the resolved data_dir path

102 new tests in tests/test_manifest_validator.py covering all five P0
security issues.  Existing test mocks updated to use streaming response
pattern (stream=True + raw.read) and valid compose YAML templates.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 07:10:12 -04:00
parent 5e438aa991
commit c40919d374
6 changed files with 1412 additions and 38 deletions
+283
View File
@@ -0,0 +1,283 @@
"""
manifest_validator — single chokepoint for all manifest and compose YAML security checks.
Both ServiceComposer and ServiceStoreManager import from here so validation logic
lives in exactly one place and cannot be bypassed by taking either code path.
"""
import re
import yaml
_SUBDOMAIN_RE = re.compile(r'^[a-z][a-z0-9-]{0,30}$')
_BACKEND_RE = re.compile(r'^[A-Za-z0-9._-]+:\d{1,5}$')
_CAP_ALLOWLIST = frozenset({
'NET_ADMIN', 'NET_RAW', 'NET_BIND_SERVICE', 'CHOWN', 'DAC_OVERRIDE',
'SETUID', 'SETGID', 'KILL', 'SYS_NICE',
})
_CAP_DENYLIST = frozenset({
'ALL', 'SYS_ADMIN', 'SYS_MODULE', 'SYS_PTRACE', 'SYS_RAWIO',
'SYS_BOOT', 'MAC_ADMIN', 'MAC_OVERRIDE', 'SYS_TIME', 'SYS_TTY_CONFIG',
})
_RESERVED_SUBDOMAINS = frozenset({
'api', 'webui', 'admin', 'www', 'mail', 'ns1', 'ns2', 'git', 'registry',
'install', 'calendar', 'files', 'webdav', 'webmail',
})
_BACKEND_DENYLIST = frozenset({
'cell-api', 'cell-caddy', 'cell-coredns', 'cell-dnsmasq',
'cell-wireguard', 'cell-vault', 'localhost', '127.0.0.1',
'0.0.0.0', 'host.docker.internal',
})
_RESERVED_CONTAINER_NAMES = frozenset({
'cell-api', 'cell-caddy', 'cell-webui', 'cell-coredns',
'cell-dnsmasq', 'cell-wireguard', 'cell-chrony',
})
_CONTAINER_NAME_RE = re.compile(r'^cell-[a-z0-9][a-z0-9-]{0,30}$')
_ENV_VALUE_RE = re.compile(r'^[A-Za-z0-9._@:/+\-]{0,256}$')
_HOOK_BINARY_RE = re.compile(r'^[a-z][a-z0-9_-]{0,31}$')
_CAP_NAME_RE = re.compile(r'^[A-Z_]+$')
def validate_manifest(manifest: dict) -> tuple:
"""
Validate security-relevant fields of a store manifest.
Returns (True, []) when all checks pass; (False, [error_strings]) otherwise.
Does not replace the existing _validate_manifest in ServiceStoreManager —
it supplements it as a second layer focused on security-critical fields.
"""
errors = []
# kind must be "store" if present — reject builtins coming in over the wire
kind = manifest.get('kind')
if kind is not None and kind != 'store':
errors.append(f'manifest kind must be "store", got: {kind!r}')
# container_name structural check
cname = manifest.get('container_name')
if cname is not None:
if not _CONTAINER_NAME_RE.match(cname):
errors.append(
f'container_name must match ^cell-[a-z0-9][a-z0-9-]{{0,30}}$, got: {cname!r}'
)
elif cname in _RESERVED_CONTAINER_NAMES:
errors.append(f'container_name is reserved: {cname!r}')
# subdomain
subdomain = manifest.get('subdomain')
if subdomain is not None:
_check_subdomain(subdomain, 'subdomain', errors)
# extra_subdomains
for sub in manifest.get('extra_subdomains') or []:
_check_subdomain(sub, 'extra_subdomains entry', errors)
# backend
backend = manifest.get('backend')
if backend is not None:
_check_backend(backend, 'backend', errors)
# extra_backends
for sub_key, bknd_val in (manifest.get('extra_backends') or {}).items():
_check_backend(bknd_val, f'extra_backends[{sub_key!r}]', errors)
# cap_add
cap_add = manifest.get('cap_add')
if cap_add is not None:
if not isinstance(cap_add, list):
errors.append('cap_add must be a list')
else:
for cap in cap_add:
if not isinstance(cap, str):
errors.append(f'cap_add entry must be a string, got: {cap!r}')
continue
if not _CAP_NAME_RE.match(cap):
errors.append(f'cap_add entry must match ^[A-Z_]+$, got: {cap!r}')
continue
if cap in _CAP_DENYLIST:
errors.append(f'cap_add entry is explicitly denied: {cap}')
elif cap not in _CAP_ALLOWLIST:
errors.append(f'cap_add entry not in allowlist: {cap}')
# env values
for env_entry in manifest.get('env') or []:
val = str(env_entry.get('value', ''))
if not _ENV_VALUE_RE.match(val):
errors.append(
f'env[].value contains disallowed characters: {val!r}'
)
# provision_hook
hook = (manifest.get('accounts') or {}).get('provision_hook')
if hook is not None:
ok, msg = validate_provision_hook(hook)
if not ok:
errors.append(msg)
return (len(errors) == 0, errors)
def validate_rendered_compose(yaml_text: str) -> tuple:
"""
Parse and security-validate a rendered docker-compose YAML string.
Returns (True, []) when safe; (False, [error_strings]) otherwise.
Rejects constructs that would give a store service elevated access to the host.
"""
errors = []
try:
doc = yaml.safe_load(yaml_text)
except yaml.YAMLError as exc:
return (False, [f'YAML parse error: {exc}'])
if not isinstance(doc, dict):
return (False, ['compose file must be a YAML mapping'])
# At least one external network must exist so the container joins the cell network
# rather than an isolated bridge that would be invisible to Caddy and CoreDNS.
networks = doc.get('networks') or {}
has_external = any(
isinstance(v, dict) and v.get('external')
for v in networks.values()
)
if not has_external:
errors.append(
'compose file must declare at least one network with external: true'
)
for svc_name, svc in (doc.get('services') or {}).items():
if not isinstance(svc, dict):
continue
prefix = f'service {svc_name!r}'
if svc.get('privileged') is True:
errors.append(f'{prefix}: privileged: true is not allowed')
net_mode = svc.get('network_mode')
if net_mode is not None and net_mode not in (None, 'bridge'):
errors.append(
f'{prefix}: network_mode {net_mode!r} is not allowed (only bridge)'
)
if svc.get('pid') == 'host':
errors.append(f'{prefix}: pid: host is not allowed')
if svc.get('ipc') == 'host':
errors.append(f'{prefix}: ipc: host is not allowed')
if svc.get('userns_mode') == 'host':
errors.append(f'{prefix}: userns_mode: host is not allowed')
# cap_add
for cap in svc.get('cap_add') or []:
cap_str = str(cap)
if cap_str in _CAP_DENYLIST:
errors.append(f'{prefix}: cap_add {cap_str!r} is explicitly denied')
elif cap_str not in _CAP_ALLOWLIST:
errors.append(f'{prefix}: cap_add {cap_str!r} not in allowlist')
# volumes — reject absolute host-side bind mounts
for vol in svc.get('volumes') or []:
vol_str = str(vol)
src = vol_str.split(':')[0] if ':' in vol_str else vol_str
if src.startswith('/'):
errors.append(
f'{prefix}: absolute host bind mount not allowed: {vol_str!r}'
)
if 'devices' in svc:
errors.append(f'{prefix}: devices key is not allowed')
for opt in svc.get('security_opt') or []:
opt_str = str(opt)
if opt_str in ('apparmor=unconfined', 'seccomp=unconfined'):
errors.append(
f'{prefix}: security_opt {opt_str!r} is not allowed'
)
# command must be a list — string form passes through the shell
cmd = svc.get('command')
if cmd is not None and isinstance(cmd, str):
errors.append(
f'{prefix}: command must be a list, not a shell string'
)
# entrypoint must also be a list for the same reason
ep = svc.get('entrypoint')
if ep is not None and isinstance(ep, str):
errors.append(
f'{prefix}: entrypoint must be a list, not a shell string'
)
return (len(errors) == 0, errors)
def validate_provision_hook(hook) -> tuple:
"""
Validate a provision_hook value from accounts.provision_hook.
Acceptable: None/absent, or a dict {"argv": ["binary", "arg1", ...]}.
Rejected: any plain string (shell injection risk), empty argv, uppercase binary,
NUL bytes in any element.
Returns (True, "") on success; (False, error_string) on failure.
"""
if hook is None:
return (True, '')
if isinstance(hook, str):
return (
False,
'provision_hook must be an argv list dict {"argv": [...]}, not a shell string',
)
if not isinstance(hook, dict):
return (False, 'provision_hook must be a dict with argv list')
argv = hook.get('argv')
if not isinstance(argv, list) or len(argv) == 0:
return (False, 'provision_hook.argv must be a non-empty list')
# NUL-byte check must precede regex check so the error message is unambiguous.
for elem in argv:
if isinstance(elem, str) and '\x00' in elem:
return (False, 'provision_hook.argv element contains NUL byte')
binary = argv[0]
if not isinstance(binary, str) or not _HOOK_BINARY_RE.match(binary):
return (
False,
f'provision_hook.argv[0] must match ^[a-z][a-z0-9_-]{{0,31}}$, got: {binary!r}',
)
return (True, '')
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _check_subdomain(value, field_name: str, errors: list) -> None:
if not isinstance(value, str):
errors.append(f'{field_name} must be a string')
return
if value in _RESERVED_SUBDOMAINS:
errors.append(f'{field_name} is reserved: {value!r}')
elif not _SUBDOMAIN_RE.match(value):
errors.append(
f'{field_name} must match ^[a-z][a-z0-9-]{{0,30}}$, got: {value!r}'
)
def _check_backend(value, field_name: str, errors: list) -> None:
if not isinstance(value, str):
errors.append(f'{field_name} must be a string')
return
if not _BACKEND_RE.match(value):
errors.append(
f'{field_name} must be host:port (e.g. cell-foo:8080), got: {value!r}'
)
return
host = value.split(':')[0]
if host in _BACKEND_DENYLIST:
errors.append(f'{field_name} host {host!r} is in the backend denylist')
+12
View File
@@ -23,8 +23,11 @@ import secrets as _secrets_lib
import shutil import shutil
import subprocess import subprocess
import threading import threading
from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional
from manifest_validator import validate_rendered_compose
logger = logging.getLogger('picell') logger = logging.getLogger('picell')
_SECRET_RE = re.compile(r'\$\{(PIC_SECRET_\w+)\}') _SECRET_RE = re.compile(r'\$\{(PIC_SECRET_\w+)\}')
@@ -136,6 +139,7 @@ class ServiceComposer:
result = result.replace('${PIC_DOMAIN}', domain) result = result.replace('${PIC_DOMAIN}', domain)
result = result.replace('${PIC_CELL_NAME}', cell_name) result = result.replace('${PIC_CELL_NAME}', cell_name)
result = result.replace('${PIC_SERVICE_ID}', service_id) result = result.replace('${PIC_SERVICE_ID}', service_id)
result = result.replace('${PIC_DATA_DIR}', str(Path(self.data_dir).resolve()))
# PIC_SECRET_* — generate on first use, reuse on reconfigure # PIC_SECRET_* — generate on first use, reuse on reconfigure
for match in _SECRET_RE.finditer(template_content): for match in _SECRET_RE.finditer(template_content):
@@ -150,6 +154,14 @@ class ServiceComposer:
"""Render and atomically write the per-service compose file. Returns rendered content.""" """Render and atomically write the per-service compose file. Returns rendered content."""
os.makedirs(self._svc_dir(service_id), exist_ok=True) os.makedirs(self._svc_dir(service_id), exist_ok=True)
content = self.render_template(service_id, manifest, template_content) content = self.render_template(service_id, manifest, template_content)
# Validate before any file I/O so a bad template never touches disk.
ok, errs = validate_rendered_compose(content)
if not ok:
raise ValueError(
f'Compose template failed security validation: {"; ".join(errs)}'
)
path = self._compose_path(service_id) path = self._compose_path(service_id)
tmp = path + '.tmp' tmp = path + '.tmp'
with open(tmp, 'w') as f: with open(tmp, 'w') as f:
+50 -5
View File
@@ -18,11 +18,14 @@ import subprocess
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
import json
import requests import requests
import yaml import yaml
from base_service_manager import BaseServiceManager from base_service_manager import BaseServiceManager
from ip_utils import CONTAINER_OFFSETS from ip_utils import CONTAINER_OFFSETS
from manifest_validator import validate_manifest, validate_provision_hook
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -41,8 +44,19 @@ MANIFEST_URL_TPL = (
) )
IMAGE_ALLOWLIST_RE = re.compile( IMAGE_ALLOWLIST_RE = re.compile(
r'^git\.pic\.ngo/roof/[a-z0-9._/-]+(:[a-zA-Z0-9._-]+)?$' r'^git\.pic\.ngo/roof/[a-z0-9._/-]+(:[a-zA-Z0-9._-]+)?(@sha256:[a-f0-9]{64})?$'
) )
# Images from well-known vendors that pre-date digest pinning in PIC.
# These are allowed to ship without a @sha256 digest; all others require one
# or must come from git.pic.ngo/roof/*.
TRUSTED_IMAGES_NO_DIGEST = frozenset({
'mailserver/docker-mailserver',
'tomsquest/docker-radicale',
'bytemark/webdav',
'filegator/filegator',
'hardware/rainloop',
})
FORBIDDEN_MOUNTS = frozenset([ FORBIDDEN_MOUNTS = frozenset([
'/', '/etc', '/var', '/proc', '/sys', '/dev', '/app', '/run', '/boot', '/', '/etc', '/var', '/proc', '/sys', '/dev', '/app', '/run', '/boot',
]) ])
@@ -112,6 +126,21 @@ class ServiceStoreManager(BaseServiceManager):
errors.append( errors.append(
f'image must match git.pic.ngo/roof/* pattern, got: {image}' f'image must match git.pic.ngo/roof/* pattern, got: {image}'
) )
elif image:
# Warn when a digest pin is absent so operators know exact-version
# tracking is not guaranteed. Images in TRUSTED_IMAGES_NO_DIGEST
# and images from our own git.pic.ngo/roof/* registry (which we
# build and tag) get warnings rather than hard errors; any other
# image that somehow passes the allowlist gets a hard error.
if '@sha256:' not in image:
image_base = image.split(':')[0].split('@')[0]
is_own_registry = image_base.startswith('git.pic.ngo/roof/')
if image_base in TRUSTED_IMAGES_NO_DIGEST or is_own_registry:
logger.warning('image %s has no digest pin', image)
else:
errors.append(
f'image {image!r} must include a @sha256:<digest> pin'
)
# Volume mount safety # Volume mount safety
for vol in m.get('volumes', []): for vol in m.get('volumes', []):
@@ -202,6 +231,12 @@ class ServiceStoreManager(BaseServiceManager):
f'env[].value contains disallowed characters: {val!r}' f'env[].value contains disallowed characters: {val!r}'
) )
# Security layer: delegate to manifest_validator for cap_add, backend
# denylist, provision_hook, reserved container names, and kind guard.
ok, sec_errs = validate_manifest(m)
if not ok:
errors.extend(sec_errs)
return (len(errors) == 0, errors) return (len(errors) == 0, errors)
# ── IP allocation ───────────────────────────────────────────────────── # ── IP allocation ─────────────────────────────────────────────────────
@@ -328,13 +363,17 @@ class ServiceStoreManager(BaseServiceManager):
def fetch_index(self) -> list: def fetch_index(self) -> list:
"""Fetch and cache the service index.""" """Fetch and cache the service index."""
import time import time
_SIZE_LIMIT = 256 * 1024
now = time.time() now = time.time()
if self._index_cache is not None and (now - self._index_cache_time) < self._cache_ttl: if self._index_cache is not None and (now - self._index_cache_time) < self._cache_ttl:
return self._index_cache return self._index_cache
try: try:
resp = requests.get(self.index_url, timeout=10) resp = requests.get(self.index_url, timeout=10, stream=True)
resp.raise_for_status() resp.raise_for_status()
data = resp.json() content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True)
if len(content) > _SIZE_LIMIT:
raise ValueError('Index response exceeds 256 KB limit')
data = json.loads(content)
self._index_cache = data if isinstance(data, list) else data.get('services', []) self._index_cache = data if isinstance(data, list) else data.get('services', [])
self._index_cache_time = now self._index_cache_time = now
return self._index_cache return self._index_cache
@@ -344,10 +383,16 @@ class ServiceStoreManager(BaseServiceManager):
def _fetch_manifest(self, service_id: str) -> dict: def _fetch_manifest(self, service_id: str) -> dict:
"""Fetch a service manifest by ID.""" """Fetch a service manifest by ID."""
_SIZE_LIMIT = 256 * 1024
url = MANIFEST_URL_TPL.format(id=service_id) url = MANIFEST_URL_TPL.format(id=service_id)
resp = requests.get(url, timeout=10) resp = requests.get(url, timeout=10, stream=True)
resp.raise_for_status() resp.raise_for_status()
return resp.json() content = resp.raw.read(_SIZE_LIMIT + 1, decode_content=True)
if len(content) > _SIZE_LIMIT:
raise ValueError(
f'Manifest response for {service_id} exceeds 256 KB limit'
)
return json.loads(content)
# ── Core operations ─────────────────────────────────────────────────── # ── Core operations ───────────────────────────────────────────────────
File diff suppressed because it is too large Load Diff
+31 -3
View File
@@ -149,7 +149,17 @@ class TestWriteCompose(unittest.TestCase):
cm = _make_cm() cm = _make_cm()
composer = ServiceComposer(config_manager=cm, data_dir=tmpdir) composer = ServiceComposer(config_manager=cm, data_dir=tmpdir)
manifest = _make_manifest() manifest = _make_manifest()
template = 'PORT=${PIC_CFG_PORT}' template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' environment:\n'
' PORT: "${PIC_CFG_PORT}"\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
composer.write_compose('myservice', manifest, template) composer.write_compose('myservice', manifest, template)
expected_path = os.path.join( expected_path = os.path.join(
@@ -169,7 +179,16 @@ class TestWriteCompose(unittest.TestCase):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir) composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir)
manifest = _make_manifest() manifest = _make_manifest()
composer.write_compose('myservice', manifest, 'content: true') valid_template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
composer.write_compose('myservice', manifest, valid_template)
self.assertTrue(composer.has_compose_file('myservice')) self.assertTrue(composer.has_compose_file('myservice'))
def test_atomic_write_via_tmp_file(self): def test_atomic_write_via_tmp_file(self):
@@ -178,7 +197,16 @@ class TestWriteCompose(unittest.TestCase):
composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir) composer = ServiceComposer(config_manager=_make_cm(), data_dir=tmpdir)
manifest = _make_manifest() manifest = _make_manifest()
# Should not raise even if fsync not available # Should not raise even if fsync not available
composer.write_compose('myservice', manifest, 'content: yes') valid_template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
composer.write_compose('myservice', manifest, valid_template)
path = os.path.join(tmpdir, 'services', 'myservice', 'docker-compose.yml') path = os.path.join(tmpdir, 'services', 'myservice', 'docker-compose.yml')
self.assertTrue(os.path.exists(path)) self.assertTrue(os.path.exists(path))
+18 -30
View File
@@ -6,6 +6,7 @@ All external I/O (requests, subprocess, docker, config_manager, caddy_manager,
container_manager) is mocked so these tests run without any live infrastructure. container_manager) is mocked so these tests run without any live infrastructure.
""" """
import json
import os import os
import sys import sys
import time import time
@@ -24,6 +25,17 @@ from ip_utils import CONTAINER_OFFSETS
# Helpers # Helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _make_streaming_mock(data):
"""Return a MagicMock that simulates a requests streaming response for ``data``."""
encoded = json.dumps(data).encode()
raw = MagicMock()
raw.read.return_value = encoded
mock_resp = MagicMock(status_code=200)
mock_resp.raise_for_status = MagicMock()
mock_resp.raw = raw
return mock_resp
def _make_manager(tmp_dir=None, installed=None, identity=None): def _make_manager(tmp_dir=None, installed=None, identity=None):
"""Build a ServiceStoreManager backed by mock dependencies.""" """Build a ServiceStoreManager backed by mock dependencies."""
cm = MagicMock() cm = MagicMock()
@@ -711,11 +723,7 @@ class TestListServices(unittest.TestCase):
def test_returns_available_and_installed_keys(self): def test_returns_available_and_installed_keys(self):
mgr = _make_manager() mgr = _make_manager()
with patch('service_store_manager.requests.get') as mock_get: with patch('service_store_manager.requests.get') as mock_get:
mock_get.return_value = MagicMock( mock_get.return_value = _make_streaming_mock(self._fake_index())
status_code=200,
json=lambda: self._fake_index(),
)
mock_get.return_value.raise_for_status = MagicMock()
result = mgr.list_services() result = mgr.list_services()
self.assertIn('available', result) self.assertIn('available', result)
self.assertIn('installed', result) self.assertIn('installed', result)
@@ -723,11 +731,7 @@ class TestListServices(unittest.TestCase):
def test_available_list_comes_from_index(self): def test_available_list_comes_from_index(self):
mgr = _make_manager() mgr = _make_manager()
with patch('service_store_manager.requests.get') as mock_get: with patch('service_store_manager.requests.get') as mock_get:
mock_get.return_value = MagicMock( mock_get.return_value = _make_streaming_mock(self._fake_index())
status_code=200,
json=lambda: self._fake_index(),
)
mock_get.return_value.raise_for_status = MagicMock()
result = mgr.list_services() result = mgr.list_services()
self.assertEqual(len(result['available']), 2) self.assertEqual(len(result['available']), 2)
self.assertEqual(result['available'][0]['id'], 'svc1') self.assertEqual(result['available'][0]['id'], 'svc1')
@@ -736,22 +740,14 @@ class TestListServices(unittest.TestCase):
installed = {'svc1': {'id': 'svc1', 'name': 'Service One'}} installed = {'svc1': {'id': 'svc1', 'name': 'Service One'}}
mgr = _make_manager(installed=installed) mgr = _make_manager(installed=installed)
with patch('service_store_manager.requests.get') as mock_get: with patch('service_store_manager.requests.get') as mock_get:
mock_get.return_value = MagicMock( mock_get.return_value = _make_streaming_mock(self._fake_index())
status_code=200,
json=lambda: self._fake_index(),
)
mock_get.return_value.raise_for_status = MagicMock()
result = mgr.list_services() result = mgr.list_services()
self.assertIn('svc1', result['installed']) self.assertIn('svc1', result['installed'])
def test_cache_prevents_second_http_request_within_ttl(self): def test_cache_prevents_second_http_request_within_ttl(self):
mgr = _make_manager() mgr = _make_manager()
with patch('service_store_manager.requests.get') as mock_get: with patch('service_store_manager.requests.get') as mock_get:
mock_get.return_value = MagicMock( mock_get.return_value = _make_streaming_mock(self._fake_index())
status_code=200,
json=lambda: self._fake_index(),
)
mock_get.return_value.raise_for_status = MagicMock()
mgr.fetch_index() mgr.fetch_index()
mgr.fetch_index() mgr.fetch_index()
# Only one HTTP call despite two fetches # Only one HTTP call despite two fetches
@@ -761,11 +757,7 @@ class TestListServices(unittest.TestCase):
mgr = _make_manager() mgr = _make_manager()
mgr._cache_ttl = 1 # 1 second TTL for the test mgr._cache_ttl = 1 # 1 second TTL for the test
with patch('service_store_manager.requests.get') as mock_get: with patch('service_store_manager.requests.get') as mock_get:
mock_get.return_value = MagicMock( mock_get.return_value = _make_streaming_mock(self._fake_index())
status_code=200,
json=lambda: self._fake_index(),
)
mock_get.return_value.raise_for_status = MagicMock()
mgr.fetch_index() mgr.fetch_index()
# Simulate TTL expiry by winding back the cache timestamp # Simulate TTL expiry by winding back the cache timestamp
mgr._index_cache_time -= 2 mgr._index_cache_time -= 2
@@ -776,11 +768,7 @@ class TestListServices(unittest.TestCase):
"""Index JSON wrapped in {'services': [...]} is also handled.""" """Index JSON wrapped in {'services': [...]} is also handled."""
mgr = _make_manager() mgr = _make_manager()
with patch('service_store_manager.requests.get') as mock_get: with patch('service_store_manager.requests.get') as mock_get:
mock_get.return_value = MagicMock( mock_get.return_value = _make_streaming_mock({'services': self._fake_index()})
status_code=200,
json=lambda: {'services': self._fake_index()},
)
mock_get.return_value.raise_for_status = MagicMock()
result = mgr.list_services() result = mgr.list_services()
self.assertEqual(len(result['available']), 2) self.assertEqual(len(result['available']), 2)