Files
pic/tests/test_manifest_validator.py
T
roof f7bb2cc962
Unit Tests / test (push) Successful in 11m25s
fix: allow first-party store service subdomains and registry images
Two manifest validation bugs blocked all store service installs:

1. service_store_manager.RESERVED_SUBDOMAINS included 'mail', which
   prevented the email service from using its required subdomain.
   Removed mail/calendar/files/webmail — they belong to official PIC
   store services and must be claimable by them.

2. manifest_validator required @sha256 digest pins on ALL images,
   including first-party git.pic.ngo/roof/* images that the PIC team
   builds and controls. service_store_manager._validate_manifest already
   only warned for first-party images; the secondary validator was
   stricter than intended, causing a hard reject on :latest tags.
   Aligned to warn-not-reject for first-party; malformed digests (when
   provided) are still a hard error.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-30 03:09:41 -04:00

1514 lines
57 KiB
Python

"""
Tests for api/manifest_validator.py — Phase 0 security foundations.
Covers validate_manifest(), validate_rendered_compose(), and
validate_provision_hook(), plus integration tests for the
ServiceComposer.write_compose() guard and ServiceStoreManager._validate_manifest()
security delegation.
No Docker, network, or filesystem calls are made.
"""
import json
import os
import sys
import tempfile
import unittest
from unittest.mock import MagicMock, patch
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
from manifest_validator import (
validate_manifest,
validate_rendered_compose,
validate_provision_hook,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _minimal_manifest(**overrides):
"""Minimal valid manifest for validate_manifest() (no image/id needed here)."""
m = {
'container_name': 'cell-myapp',
'subdomain': 'myapp',
'backend': 'cell-myapp:8080',
}
m.update(overrides)
return m
def _valid_compose(extra_services=None, networks=None):
"""Return a compose YAML string that passes all checks."""
services = extra_services or ''
net_block = networks or (
'networks:\n'
' cell-network:\n'
' external: true\n'
)
return (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["nginx", "-g", "daemon off;"]\n'
+ services
+ net_block
)
# ---------------------------------------------------------------------------
# TestValidateManifest
# ---------------------------------------------------------------------------
class TestValidateManifest(unittest.TestCase):
# ── valid minimal manifest ────────────────────────────────────────────
def test_valid_minimal_manifest_passes(self):
ok, errs = validate_manifest(_minimal_manifest())
self.assertTrue(ok)
self.assertEqual(errs, [])
def test_returns_true_empty_list_on_success(self):
result = validate_manifest(_minimal_manifest())
self.assertIsInstance(result, tuple)
self.assertEqual(len(result), 2)
ok, errs = result
self.assertIs(ok, True)
self.assertIsInstance(errs, list)
def test_returns_false_list_of_strings_on_failure(self):
ok, errs = validate_manifest({'kind': 'builtin'})
self.assertIs(ok, False)
self.assertIsInstance(errs, list)
self.assertTrue(len(errs) > 0)
for e in errs:
self.assertIsInstance(e, str)
# ── schema_version ───────────────────────────────────────────────────
def test_schema_version_3_passes(self):
ok, errs = validate_manifest(_minimal_manifest(schema_version=3))
self.assertTrue(ok)
def test_schema_version_2_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(schema_version=2))
self.assertFalse(ok)
self.assertTrue(any('schema_version' in e for e in errs))
def test_schema_version_1_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(schema_version=1))
self.assertFalse(ok)
def test_schema_version_string_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(schema_version='3'))
self.assertFalse(ok)
self.assertTrue(any('schema_version' in e for e in errs))
def test_schema_version_absent_passes(self):
m = _minimal_manifest()
m.pop('schema_version', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ── id ───────────────────────────────────────────────────────────────
def test_id_valid_lowercase_passes(self):
ok, errs = validate_manifest(_minimal_manifest(id='myapp'))
self.assertTrue(ok)
def test_id_with_hyphen_and_underscore_passes(self):
ok, errs = validate_manifest(_minimal_manifest(id='my-app_v2'))
self.assertTrue(ok)
def test_id_starts_with_digit_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(id='1app'))
self.assertFalse(ok)
self.assertTrue(any('id' in e for e in errs))
def test_id_uppercase_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(id='MyApp'))
self.assertFalse(ok)
def test_id_with_space_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(id='my app'))
self.assertFalse(ok)
def test_id_too_long_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(id='a' * 32))
self.assertFalse(ok)
self.assertTrue(any('id' in e for e in errs))
def test_id_31_chars_passes(self):
# Pattern allows up to 31 chars total (1 + 30)
ok, errs = validate_manifest(_minimal_manifest(id='a' + 'b' * 30))
self.assertTrue(ok)
def test_id_absent_passes(self):
m = _minimal_manifest()
m.pop('id', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ── image ────────────────────────────────────────────────────────────
def test_image_with_digest_passes(self):
digest = 'a' * 64
ok, errs = validate_manifest(
_minimal_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}')
)
self.assertTrue(ok)
def test_image_tag_only_first_party_allowed(self):
# First-party images without a digest pin are allowed (warning only).
ok, errs = validate_manifest(
_minimal_manifest(image='git.pic.ngo/roof/myapp:latest')
)
self.assertTrue(ok)
self.assertEqual(errs, [])
def test_image_wrong_registry_rejected(self):
digest = 'a' * 64
ok, errs = validate_manifest(
_minimal_manifest(image=f'docker.io/library/nginx@sha256:{digest}')
)
self.assertFalse(ok)
def test_image_digest_too_short_rejected(self):
ok, errs = validate_manifest(
_minimal_manifest(image='git.pic.ngo/roof/myapp@sha256:abc123')
)
self.assertFalse(ok)
def test_image_digest_with_uppercase_hex_rejected(self):
# sha256 digest must be lowercase hex
digest = 'A' * 64
ok, errs = validate_manifest(
_minimal_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}')
)
self.assertFalse(ok)
def test_image_no_tag_no_digest_first_party_allowed(self):
# No tag and no digest — Docker defaults to :latest; treated as tag-only, allowed.
ok, errs = validate_manifest(
_minimal_manifest(image='git.pic.ngo/roof/myapp')
)
self.assertTrue(ok)
self.assertEqual(errs, [])
def test_image_absent_passes(self):
m = _minimal_manifest()
m.pop('image', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ── kind ─────────────────────────────────────────────────────────────
def test_kind_builtin_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(kind='builtin'))
self.assertFalse(ok)
self.assertTrue(any('kind' in e for e in errs))
def test_kind_store_passes(self):
ok, errs = validate_manifest(_minimal_manifest(kind='store'))
self.assertTrue(ok)
def test_kind_absent_passes(self):
m = _minimal_manifest()
m.pop('kind', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
def test_kind_unknown_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(kind='custom'))
self.assertFalse(ok)
# ── container_name ────────────────────────────────────────────────────
def test_container_name_without_cell_prefix_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(container_name='myapp'))
self.assertFalse(ok)
self.assertTrue(any('container_name' in e for e in errs))
def test_container_name_cell_api_rejected_reserved(self):
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-api'))
self.assertFalse(ok)
self.assertTrue(any('reserved' in e for e in errs))
def test_container_name_cell_valid_name_passes(self):
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-valid-name'))
self.assertTrue(ok)
def test_container_name_with_uppercase_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-MyApp'))
self.assertFalse(ok)
def test_container_name_cell_caddy_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-caddy'))
self.assertFalse(ok)
def test_container_name_cell_wireguard_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-wireguard'))
self.assertFalse(ok)
def test_container_name_absent_passes(self):
m = _minimal_manifest()
m.pop('container_name', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ── subdomain ────────────────────────────────────────────────────────
def test_subdomain_api_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(subdomain='api'))
self.assertFalse(ok)
self.assertTrue(any('reserved' in e for e in errs))
def test_subdomain_webui_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(subdomain='webui'))
self.assertFalse(ok)
def test_subdomain_myapp_passes(self):
ok, errs = validate_manifest(_minimal_manifest(subdomain='myapp'))
self.assertTrue(ok)
def test_subdomain_absent_passes(self):
m = _minimal_manifest()
m.pop('subdomain', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
def test_subdomain_calendar_passes(self):
# calendar/files/webmail/etc are service subdomains, not infrastructure;
# official PIC store services need to claim them.
ok, errs = validate_manifest(_minimal_manifest(subdomain='calendar'))
self.assertTrue(ok, errs)
def test_subdomain_files_passes(self):
ok, errs = validate_manifest(_minimal_manifest(subdomain='files'))
self.assertTrue(ok, errs)
# ── extra_subdomains ─────────────────────────────────────────────────
def test_extra_subdomains_admin_rejected(self):
ok, errs = validate_manifest(
_minimal_manifest(extra_subdomains=['admin'])
)
self.assertFalse(ok)
self.assertTrue(any('reserved' in e for e in errs))
def test_extra_subdomains_valid_entry_passes(self):
ok, errs = validate_manifest(
_minimal_manifest(extra_subdomains=['sub'])
)
self.assertTrue(ok)
def test_extra_subdomains_multiple_one_bad_rejected(self):
ok, errs = validate_manifest(
_minimal_manifest(extra_subdomains=['good', 'admin'])
)
self.assertFalse(ok)
# ── backend ──────────────────────────────────────────────────────────
def test_backend_cell_api_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(backend='cell-api:3000'))
self.assertFalse(ok)
self.assertTrue(any('denylist' in e for e in errs))
def test_backend_localhost_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(backend='localhost:8080'))
self.assertFalse(ok)
def test_backend_cell_mail_valid_port_passes(self):
ok, errs = validate_manifest(_minimal_manifest(backend='cell-mail:25'))
self.assertTrue(ok)
def test_backend_bad_format_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(backend='not-a-backend'))
self.assertFalse(ok)
self.assertTrue(any('host:port' in e for e in errs))
def test_backend_127_0_0_1_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(backend='127.0.0.1:8080'))
self.assertFalse(ok)
def test_backend_host_docker_internal_rejected(self):
ok, errs = validate_manifest(
_minimal_manifest(backend='host.docker.internal:8080')
)
self.assertFalse(ok)
def test_backend_absent_passes(self):
m = _minimal_manifest()
m.pop('backend', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ── extra_backends ───────────────────────────────────────────────────
def test_extra_backends_cell_coredns_rejected(self):
ok, errs = validate_manifest(
_minimal_manifest(extra_backends={'dns': 'cell-coredns:53'})
)
self.assertFalse(ok)
def test_extra_backends_valid_passes(self):
ok, errs = validate_manifest(
_minimal_manifest(extra_backends={'alt': 'cell-myapp:80'})
)
self.assertTrue(ok)
# ── cap_add ──────────────────────────────────────────────────────────
def test_cap_add_sys_admin_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(cap_add=['SYS_ADMIN']))
self.assertFalse(ok)
self.assertTrue(any('SYS_ADMIN' in e for e in errs))
def test_cap_add_all_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(cap_add=['ALL']))
self.assertFalse(ok)
self.assertTrue(any('ALL' in e for e in errs))
def test_cap_add_net_admin_passes(self):
ok, errs = validate_manifest(_minimal_manifest(cap_add=['NET_ADMIN']))
self.assertTrue(ok)
def test_cap_add_net_admin_and_sys_admin_rejected(self):
ok, errs = validate_manifest(
_minimal_manifest(cap_add=['NET_ADMIN', 'SYS_ADMIN'])
)
self.assertFalse(ok)
self.assertTrue(any('SYS_ADMIN' in e for e in errs))
def test_cap_add_unknown_cap_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(cap_add=['UNKNOWN_CAP']))
self.assertFalse(ok)
self.assertTrue(any('allowlist' in e for e in errs))
def test_cap_add_sys_module_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(cap_add=['SYS_MODULE']))
self.assertFalse(ok)
def test_cap_add_not_a_list_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(cap_add='NET_ADMIN'))
self.assertFalse(ok)
# ── provision_hook (via accounts key) ────────────────────────────────
def test_provision_hook_string_rejected(self):
m = _minimal_manifest(accounts={'provision_hook': 'setup email add {username}'})
ok, errs = validate_manifest(m)
self.assertFalse(ok)
self.assertTrue(any('shell string' in e for e in errs))
def test_provision_hook_argv_dict_passes(self):
m = _minimal_manifest(
accounts={'provision_hook': {'argv': ['setup', 'email', 'add']}}
)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
def test_provision_hook_absent_passes(self):
m = _minimal_manifest()
# no 'accounts' key at all
ok, errs = validate_manifest(m)
self.assertTrue(ok)
def test_provision_hook_none_in_accounts_passes(self):
m = _minimal_manifest(accounts={'provision_hook': None})
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ── env values ───────────────────────────────────────────────────────
def test_env_value_with_dollar_rejected(self):
m = _minimal_manifest(env=[{'key': 'X', 'value': '$SECRET'}])
ok, errs = validate_manifest(m)
self.assertFalse(ok)
def test_env_value_with_curly_brace_rejected(self):
m = _minimal_manifest(env=[{'key': 'X', 'value': '{bad}'}])
ok, errs = validate_manifest(m)
self.assertFalse(ok)
def test_env_value_example_com_passes(self):
m = _minimal_manifest(env=[{'key': 'DOMAIN', 'value': 'example.com'}])
ok, errs = validate_manifest(m)
self.assertTrue(ok)
def test_env_value_complex_safe_string_passes(self):
m = _minimal_manifest(
env=[{'key': 'X', 'value': 'user@host:port/path+extra-chars'}]
)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
def test_env_empty_passes(self):
m = _minimal_manifest(env=[])
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ---------------------------------------------------------------------------
# TestValidateRenderedCompose
# ---------------------------------------------------------------------------
class TestValidateRenderedCompose(unittest.TestCase):
# ── valid compose ─────────────────────────────────────────────────────
def test_valid_compose_with_external_network_passes(self):
ok, errs = validate_rendered_compose(_valid_compose())
self.assertTrue(ok)
self.assertEqual(errs, [])
def test_invalid_yaml_returns_error(self):
ok, errs = validate_rendered_compose('}{invalid yaml{')
self.assertFalse(ok)
self.assertTrue(any('YAML' in e for e in errs))
def test_non_mapping_yaml_rejected(self):
ok, errs = validate_rendered_compose('- item1\n- item2\n')
self.assertFalse(ok)
# ── privileged ───────────────────────────────────────────────────────
def test_privileged_true_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' privileged: true\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('privileged' in e for e in errs))
def test_privileged_false_passes(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' privileged: false\n'
' command: ["echo"]\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertTrue(ok)
# ── network_mode ─────────────────────────────────────────────────────
def test_network_mode_host_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' network_mode: host\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('network_mode' in e for e in errs))
def test_network_mode_none_value_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' network_mode: "none"\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
def test_no_network_mode_key_passes(self):
ok, errs = validate_rendered_compose(_valid_compose())
self.assertTrue(ok)
# ── volumes ──────────────────────────────────────────────────────────
def test_absolute_volume_etc_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' volumes:\n'
' - /etc:/etc\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('bind mount' in e for e in errs))
def test_absolute_volume_docker_sock_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' volumes:\n'
' - /var/run/docker.sock:/var/run/docker.sock\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
def test_relative_volume_passes(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' volumes:\n'
' - ./data:/data\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertTrue(ok)
def test_named_volume_passes(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' volumes:\n'
' - myvolume:/data\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertTrue(ok)
def test_absolute_volume_under_allowed_data_dir_passes(self):
# After ${PIC_DATA_DIR} substitution, compose templates produce absolute
# paths like /data/services/email/mail:/var/mail. These must be allowed
# when allowed_data_dir is set to the same prefix.
yaml_text = (
'services:\n'
' mail:\n'
' image: svc-email:latest\n'
' command: ["postfix"]\n'
' volumes:\n'
' - /data/services/email/mail:/var/mail\n'
' - /data/services/email/config:/tmp/config\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text, allowed_data_dir='/data')
self.assertTrue(ok, errs)
def test_absolute_volume_outside_allowed_data_dir_still_rejected(self):
yaml_text = (
'services:\n'
' app:\n'
' image: nginx\n'
' volumes:\n'
' - /etc/passwd:/etc/passwd\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text, allowed_data_dir='/data')
self.assertFalse(ok)
self.assertTrue(any('bind mount' in e for e in errs))
def test_absolute_volume_rejected_when_no_allowed_data_dir(self):
yaml_text = (
'services:\n'
' app:\n'
' image: nginx\n'
' volumes:\n'
' - /data/services/email/mail:/var/mail\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
# Without allowed_data_dir, even /data paths are rejected
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('bind mount' in e for e in errs))
# ── cap_add ──────────────────────────────────────────────────────────
def test_compose_cap_add_sys_admin_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' cap_add:\n'
' - SYS_ADMIN\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('SYS_ADMIN' in e for e in errs))
def test_compose_cap_add_net_admin_passes(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' cap_add:\n'
' - NET_ADMIN\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertTrue(ok)
# ── external network requirement ─────────────────────────────────────
def test_no_external_network_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
'networks:\n'
' mynet:\n'
' driver: bridge\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('external' in e for e in errs))
def test_no_networks_key_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
# ── devices ──────────────────────────────────────────────────────────
def test_devices_key_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' devices:\n'
' - /dev/snd:/dev/snd\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('devices' in e for e in errs))
# ── security_opt ─────────────────────────────────────────────────────
def test_security_opt_apparmor_unconfined_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' security_opt:\n'
' - apparmor=unconfined\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('apparmor=unconfined' in e for e in errs))
def test_security_opt_no_new_privileges_passes(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' security_opt:\n'
' - no-new-privileges:true\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertTrue(ok)
def test_security_opt_seccomp_unconfined_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' security_opt:\n'
' - seccomp=unconfined\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
# ── command / entrypoint ─────────────────────────────────────────────
def test_command_as_string_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: "echo hello"\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('command' in e for e in errs))
def test_command_as_list_passes(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo", "hello"]\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertTrue(ok)
def test_entrypoint_as_string_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' entrypoint: "/bin/sh -c evil"\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('entrypoint' in e for e in errs))
# ── pid / ipc / userns ───────────────────────────────────────────────
def test_pid_host_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' pid: host\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('pid' in e for e in errs))
def test_ipc_host_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' ipc: host\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
# ── container_name reserved in compose ──────────────────────────────
def test_compose_container_name_cell_api_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-api\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('reserved' in e for e in errs))
def test_compose_container_name_cell_caddy_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-caddy\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
def test_compose_container_name_cell_wireguard_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-wireguard\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
def test_compose_container_name_cell_coredns_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-coredns\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
def test_compose_container_name_non_reserved_passes(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-myapp\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertTrue(ok)
def test_compose_no_container_name_passes(self):
ok, errs = validate_rendered_compose(_valid_compose())
self.assertTrue(ok)
# ── multi-service ─────────────────────────────────────────────────────
def test_multiple_services_one_invalid_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' good:\n'
' image: nginx\n'
' command: ["echo"]\n'
' bad:\n'
' image: nginx\n'
' privileged: true\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('bad' in e for e in errs))
# ---------------------------------------------------------------------------
# TestValidateProvisionHook
# ---------------------------------------------------------------------------
class TestValidateProvisionHook(unittest.TestCase):
def test_none_passes(self):
ok, msg = validate_provision_hook(None)
self.assertTrue(ok)
self.assertEqual(msg, '')
def test_string_rejected(self):
ok, msg = validate_provision_hook('setup email add {username}')
self.assertFalse(ok)
self.assertIn('shell string', msg)
def test_argv_dict_passes(self):
ok, msg = validate_provision_hook({'argv': ['setup', 'email', 'add']})
self.assertTrue(ok)
self.assertEqual(msg, '')
def test_empty_argv_rejected(self):
ok, msg = validate_provision_hook({'argv': []})
self.assertFalse(ok)
self.assertIn('non-empty', msg)
def test_single_element_argv_passes(self):
ok, msg = validate_provision_hook({'argv': ['setup']})
self.assertTrue(ok)
def test_uppercase_binary_rejected(self):
ok, msg = validate_provision_hook({'argv': ['UPPERCASE']})
self.assertFalse(ok)
self.assertIn('argv[0]', msg)
def test_nul_byte_in_element_rejected(self):
ok, msg = validate_provision_hook({'argv': ['setup\x00evil']})
self.assertFalse(ok)
self.assertIn('NUL', msg)
def test_nul_byte_in_arg_not_binary_rejected(self):
ok, msg = validate_provision_hook({'argv': ['setup', 'arg\x00evil']})
self.assertFalse(ok)
def test_integer_rejected(self):
ok, msg = validate_provision_hook(42)
self.assertFalse(ok)
self.assertIn('dict', msg)
def test_list_rejected(self):
ok, msg = validate_provision_hook(['setup', 'add'])
self.assertFalse(ok)
def test_dict_without_argv_rejected(self):
ok, msg = validate_provision_hook({'cmd': ['setup']})
self.assertFalse(ok)
self.assertIn('argv', msg)
def test_binary_with_digits_passes(self):
ok, msg = validate_provision_hook({'argv': ['setup2']})
self.assertTrue(ok)
def test_binary_with_hyphens_and_underscores_passes(self):
ok, msg = validate_provision_hook({'argv': ['my-setup_tool']})
self.assertTrue(ok)
def test_binary_starting_with_digit_rejected(self):
ok, msg = validate_provision_hook({'argv': ['2setup']})
self.assertFalse(ok)
# ---------------------------------------------------------------------------
# TestServiceComposerIntegration
# ---------------------------------------------------------------------------
class TestServiceComposerIntegration(unittest.TestCase):
"""Integration tests for ServiceComposer.write_compose() security gate."""
def _make_cm(self):
cm = MagicMock()
cm.get_identity.return_value = {
'cell_name': 'testcell',
'domain': 'cell.local',
}
cm.get_effective_domain.return_value = 'cell.local'
cm.configs = {}
return cm
def test_write_compose_raises_on_privileged_true(self):
from service_composer import ServiceComposer
privileged_template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' privileged: true\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, privileged_template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_no_temp_file_on_validation_failure(self):
"""Validation failure must not leave a .tmp file on disk."""
from service_composer import ServiceComposer
privileged_template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' privileged: true\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
svc_dir = os.path.join(tmpdir, 'services', 'testsvc')
os.makedirs(svc_dir, exist_ok=True)
try:
composer.write_compose('testsvc', manifest, privileged_template)
except ValueError:
pass
tmp_path = os.path.join(svc_dir, 'docker-compose.yml.tmp')
final_path = os.path.join(svc_dir, 'docker-compose.yml')
self.assertFalse(
os.path.exists(tmp_path),
'Temp file should not exist after validation failure',
)
self.assertFalse(
os.path.exists(final_path),
'Final compose file should not exist after validation failure',
)
def test_render_template_replaces_pic_data_dir(self):
from service_composer import ServiceComposer
from pathlib import Path
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = 'DATA=${PIC_DATA_DIR}'
result = composer.render_template('testsvc', manifest, template)
expected = str(Path(tmpdir).resolve())
self.assertIn(expected, result)
self.assertNotIn('${PIC_DATA_DIR}', result)
def test_write_compose_succeeds_on_valid_template(self):
from service_composer import ServiceComposer
valid_template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo", "ok"]\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
content = composer.write_compose('testsvc', manifest, valid_template)
self.assertIn('nginx', content)
expected_path = os.path.join(
tmpdir, 'services', 'testsvc', 'docker-compose.yml'
)
self.assertTrue(os.path.exists(expected_path))
# ---------------------------------------------------------------------------
# TestServiceStoreManagerSecurityIntegration
# ---------------------------------------------------------------------------
class TestServiceStoreManagerSecurityIntegration(unittest.TestCase):
"""Integration tests for ServiceStoreManager._validate_manifest() security additions."""
def _make_manager(self):
from service_store_manager import ServiceStoreManager
cm = MagicMock()
cm.get_installed_services.return_value = {}
cm.get_identity.return_value = {
'ip_range': '172.20.0.0/16',
'service_ips': {},
}
return ServiceStoreManager(
config_manager=cm,
caddy_manager=MagicMock(),
container_manager=MagicMock(),
)
def _valid_manifest(self, **overrides):
m = {
'id': 'myapp',
'name': 'My App',
'version': '1.0.0',
'author': 'Test Author',
'image': 'git.pic.ngo/roof/myapp@sha256:' + 'a' * 64,
'container_name': 'cell-myapp',
}
m.update(overrides)
return m
def test_validate_manifest_rejects_cap_add_sys_admin(self):
from service_store_manager import ServiceStoreManager
m = self._valid_manifest(cap_add=['SYS_ADMIN'])
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertFalse(ok)
self.assertTrue(any('SYS_ADMIN' in e for e in errs))
def test_validate_manifest_accepts_cap_add_net_admin(self):
from service_store_manager import ServiceStoreManager
m = self._valid_manifest(cap_add=['NET_ADMIN'])
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertTrue(ok)
def test_validate_manifest_rejects_provision_hook_string(self):
from service_store_manager import ServiceStoreManager
m = self._valid_manifest(
accounts={'provision_hook': 'setup user add'}
)
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertFalse(ok)
self.assertTrue(any('shell string' in e for e in errs))
def test_validate_manifest_rejects_kind_builtin(self):
from service_store_manager import ServiceStoreManager
m = self._valid_manifest(kind='builtin')
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertFalse(ok)
def test_validate_manifest_rejects_reserved_container_name(self):
from service_store_manager import ServiceStoreManager
m = self._valid_manifest(container_name='cell-api')
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertFalse(ok)
def test_fetch_manifest_raises_on_oversized_response(self):
from service_store_manager import ServiceStoreManager
mgr = self._make_manager()
oversized_content = b'x' * (256 * 1024 + 1)
raw_mock = MagicMock()
raw_mock.read.return_value = oversized_content
mock_resp = MagicMock()
mock_resp.raise_for_status = MagicMock()
mock_resp.raw = raw_mock
with patch('service_store_manager.requests.get', return_value=mock_resp):
with self.assertRaises(ValueError) as ctx:
mgr._fetch_manifest('bigservice')
self.assertIn('256 KB', str(ctx.exception))
def test_fetch_manifest_succeeds_with_small_response(self):
from service_store_manager import ServiceStoreManager
mgr = self._make_manager()
payload = {'id': 'smallsvc', 'name': 'Small'}
small_content = json.dumps(payload).encode()
raw_mock = MagicMock()
raw_mock.read.return_value = small_content
mock_resp = MagicMock()
mock_resp.raise_for_status = MagicMock()
mock_resp.raw = raw_mock
with patch('service_store_manager.requests.get', return_value=mock_resp):
result = mgr._fetch_manifest('smallsvc')
self.assertEqual(result['id'], 'smallsvc')
def test_fetch_manifest_requests_stream_true(self):
"""_fetch_manifest must use stream=True so raw.read() is available."""
from service_store_manager import ServiceStoreManager
mgr = self._make_manager()
payload = json.dumps({'id': 'svc'}).encode()
raw_mock = MagicMock()
raw_mock.read.return_value = payload
mock_resp = MagicMock()
mock_resp.raise_for_status = MagicMock()
mock_resp.raw = raw_mock
with patch('service_store_manager.requests.get', return_value=mock_resp) as mock_get:
mgr._fetch_manifest('svc')
_, kwargs = mock_get.call_args
self.assertTrue(kwargs.get('stream'), 'requests.get must be called with stream=True')
def test_validate_manifest_rejects_backend_denylist(self):
from service_store_manager import ServiceStoreManager
m = self._valid_manifest(backend='localhost:8080')
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertFalse(ok)
def test_validate_manifest_accepts_valid_backend(self):
from service_store_manager import ServiceStoreManager
m = self._valid_manifest(backend='cell-myapp:8080')
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertTrue(ok)
def test_validate_manifest_allows_image_tag_only_first_party(self):
"""First-party images without a digest pin are allowed (warning only)."""
from service_store_manager import ServiceStoreManager
m = self._valid_manifest(image='git.pic.ngo/roof/myapp:latest')
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertTrue(ok)
def test_validate_manifest_accepts_image_with_digest(self):
from service_store_manager import ServiceStoreManager
digest = 'b' * 64
m = self._valid_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}')
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertTrue(ok)
# ---------------------------------------------------------------------------
# TestInstallManifestValidation
# ---------------------------------------------------------------------------
class TestInstallManifestValidation(unittest.TestCase):
"""Tests that ServiceStoreManager.install() propagates manifest validation errors."""
def _make_ssm(self, manifest):
from service_store_manager import ServiceStoreManager
cm = MagicMock()
cm.get_installed_services.return_value = {}
composer = MagicMock()
composer._resolve_requires.return_value = None
composer.install.return_value = {'ok': True}
ssm = ServiceStoreManager(
config_manager=cm,
caddy_manager=MagicMock(),
container_manager=MagicMock(),
service_composer=composer,
)
ssm._fetch_manifest = MagicMock(return_value=manifest)
ssm._fetch_template = MagicMock(return_value=(
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
))
return ssm
def test_install_succeeds_when_image_tag_only_first_party(self):
# First-party images without a digest pin are allowed (warning, not error).
manifest = {
'id': 'myapp',
'name': 'My App',
'version': '1.0.0',
'author': 'Test',
'image': 'git.pic.ngo/roof/myapp:latest',
'container_name': 'cell-myapp',
}
ssm = self._make_ssm(manifest)
result = ssm.install('myapp')
self.assertTrue(result['ok'])
def test_install_returns_error_when_kind_is_builtin(self):
digest = 'c' * 64
manifest = {
'id': 'myapp',
'name': 'My App',
'version': '1.0.0',
'author': 'Test',
'image': f'git.pic.ngo/roof/myapp@sha256:{digest}',
'container_name': 'cell-myapp',
'kind': 'builtin',
}
ssm = self._make_ssm(manifest)
result = ssm.install('myapp')
self.assertFalse(result['ok'])
def test_install_returns_error_when_cap_add_sys_admin(self):
digest = 'd' * 64
manifest = {
'id': 'myapp',
'name': 'My App',
'version': '1.0.0',
'author': 'Test',
'image': f'git.pic.ngo/roof/myapp@sha256:{digest}',
'container_name': 'cell-myapp',
'cap_add': ['SYS_ADMIN'],
}
ssm = self._make_ssm(manifest)
result = ssm.install('myapp')
self.assertFalse(result['ok'])
def test_install_succeeds_with_valid_manifest(self):
digest = 'e' * 64
manifest = {
'id': 'myapp',
'name': 'My App',
'version': '1.0.0',
'author': 'Test',
'image': f'git.pic.ngo/roof/myapp@sha256:{digest}',
'container_name': 'cell-myapp',
}
ssm = self._make_ssm(manifest)
result = ssm.install('myapp')
self.assertTrue(result['ok'])
# ---------------------------------------------------------------------------
# TestWriteComposeValidation
# ---------------------------------------------------------------------------
class TestWriteComposeValidation(unittest.TestCase):
"""Tests that ServiceComposer.write_compose() rejects each specific violation."""
def _make_cm(self):
cm = MagicMock()
cm.get_identity.return_value = {
'cell_name': 'testcell',
'domain': 'cell.local',
}
cm.get_effective_domain.return_value = 'cell.local'
cm.configs = {}
return cm
def _make_template(self, **service_extras):
"""Build a compose YAML string with optional extra service fields."""
extra_lines = ''
for key, val in service_extras.items():
if isinstance(val, bool):
extra_lines += f' {key}: {"true" if val else "false"}\n'
elif isinstance(val, list):
extra_lines += f' {key}:\n'
for item in val:
extra_lines += f' - {item}\n'
else:
extra_lines += f' {key}: {val}\n'
return (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
+ extra_lines
+ 'networks:\n'
' cell-network:\n'
' external: true\n'
)
def test_write_compose_raises_on_network_mode_host(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(network_mode='host')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_pid_host(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(pid='host')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_ipc_host(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(ipc='host')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_userns_mode_host(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(userns_mode='host')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_reserved_container_name(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(container_name='cell-api')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_cap_add_all(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(cap_add=['ALL'])
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_when_no_external_network(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
'networks:\n'
' internal:\n'
' driver: bridge\n'
)
with self.assertRaises(ValueError):
composer.write_compose('testsvc', manifest, template)
def test_write_compose_raises_on_string_command(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: "echo hello && rm -rf /"\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
if __name__ == '__main__':
unittest.main()