""" Tests for api/manifest_validator.py — Phase 0 security foundations. Covers validate_manifest(), validate_rendered_compose(), and validate_provision_hook(), plus integration tests for the ServiceComposer.write_compose() guard and ServiceStoreManager._validate_manifest() security delegation. No Docker, network, or filesystem calls are made. """ import json import os import sys import tempfile import unittest from unittest.mock import MagicMock, patch sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api')) from manifest_validator import ( validate_manifest, validate_rendered_compose, validate_provision_hook, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _minimal_manifest(**overrides): """Minimal valid manifest for validate_manifest() (no image/id needed here).""" m = { 'container_name': 'cell-myapp', 'subdomain': 'myapp', 'backend': 'cell-myapp:8080', } m.update(overrides) return m def _valid_compose(extra_services=None, networks=None): """Return a compose YAML string that passes all checks.""" services = extra_services or '' net_block = networks or ( 'networks:\n' ' cell-network:\n' ' external: true\n' ) return ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["nginx", "-g", "daemon off;"]\n' + services + net_block ) # --------------------------------------------------------------------------- # TestValidateManifest # --------------------------------------------------------------------------- class TestValidateManifest(unittest.TestCase): # ── valid minimal manifest ──────────────────────────────────────────── def test_valid_minimal_manifest_passes(self): ok, errs = validate_manifest(_minimal_manifest()) self.assertTrue(ok) self.assertEqual(errs, []) def test_returns_true_empty_list_on_success(self): result = validate_manifest(_minimal_manifest()) self.assertIsInstance(result, tuple) self.assertEqual(len(result), 2) ok, errs = result self.assertIs(ok, True) self.assertIsInstance(errs, list) def test_returns_false_list_of_strings_on_failure(self): ok, errs = validate_manifest({'kind': 'builtin'}) self.assertIs(ok, False) self.assertIsInstance(errs, list) self.assertTrue(len(errs) > 0) for e in errs: self.assertIsInstance(e, str) # ── schema_version ─────────────────────────────────────────────────── def test_schema_version_3_passes(self): ok, errs = validate_manifest(_minimal_manifest(schema_version=3)) self.assertTrue(ok) def test_schema_version_2_rejected(self): ok, errs = validate_manifest(_minimal_manifest(schema_version=2)) self.assertFalse(ok) self.assertTrue(any('schema_version' in e for e in errs)) def test_schema_version_1_rejected(self): ok, errs = validate_manifest(_minimal_manifest(schema_version=1)) self.assertFalse(ok) def test_schema_version_string_rejected(self): ok, errs = validate_manifest(_minimal_manifest(schema_version='3')) self.assertFalse(ok) self.assertTrue(any('schema_version' in e for e in errs)) def test_schema_version_absent_passes(self): m = _minimal_manifest() m.pop('schema_version', None) ok, errs = validate_manifest(m) self.assertTrue(ok) # ── id ─────────────────────────────────────────────────────────────── def test_id_valid_lowercase_passes(self): ok, errs = validate_manifest(_minimal_manifest(id='myapp')) self.assertTrue(ok) def test_id_with_hyphen_and_underscore_passes(self): ok, errs = validate_manifest(_minimal_manifest(id='my-app_v2')) self.assertTrue(ok) def test_id_starts_with_digit_rejected(self): ok, errs = validate_manifest(_minimal_manifest(id='1app')) self.assertFalse(ok) self.assertTrue(any('id' in e for e in errs)) def test_id_uppercase_rejected(self): ok, errs = validate_manifest(_minimal_manifest(id='MyApp')) self.assertFalse(ok) def test_id_with_space_rejected(self): ok, errs = validate_manifest(_minimal_manifest(id='my app')) self.assertFalse(ok) def test_id_too_long_rejected(self): ok, errs = validate_manifest(_minimal_manifest(id='a' * 32)) self.assertFalse(ok) self.assertTrue(any('id' in e for e in errs)) def test_id_31_chars_passes(self): # Pattern allows up to 31 chars total (1 + 30) ok, errs = validate_manifest(_minimal_manifest(id='a' + 'b' * 30)) self.assertTrue(ok) def test_id_absent_passes(self): m = _minimal_manifest() m.pop('id', None) ok, errs = validate_manifest(m) self.assertTrue(ok) # ── image ──────────────────────────────────────────────────────────── def test_image_with_digest_passes(self): digest = 'a' * 64 ok, errs = validate_manifest( _minimal_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}') ) self.assertTrue(ok) def test_image_tag_only_first_party_allowed(self): # First-party images without a digest pin are allowed (warning only). ok, errs = validate_manifest( _minimal_manifest(image='git.pic.ngo/roof/myapp:latest') ) self.assertTrue(ok) self.assertEqual(errs, []) def test_image_wrong_registry_rejected(self): digest = 'a' * 64 ok, errs = validate_manifest( _minimal_manifest(image=f'docker.io/library/nginx@sha256:{digest}') ) self.assertFalse(ok) def test_image_digest_too_short_rejected(self): ok, errs = validate_manifest( _minimal_manifest(image='git.pic.ngo/roof/myapp@sha256:abc123') ) self.assertFalse(ok) def test_image_digest_with_uppercase_hex_rejected(self): # sha256 digest must be lowercase hex digest = 'A' * 64 ok, errs = validate_manifest( _minimal_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}') ) self.assertFalse(ok) def test_image_no_tag_no_digest_first_party_allowed(self): # No tag and no digest — Docker defaults to :latest; treated as tag-only, allowed. ok, errs = validate_manifest( _minimal_manifest(image='git.pic.ngo/roof/myapp') ) self.assertTrue(ok) self.assertEqual(errs, []) def test_image_absent_passes(self): m = _minimal_manifest() m.pop('image', None) ok, errs = validate_manifest(m) self.assertTrue(ok) # ── kind ───────────────────────────────────────────────────────────── def test_kind_builtin_rejected(self): ok, errs = validate_manifest(_minimal_manifest(kind='builtin')) self.assertFalse(ok) self.assertTrue(any('kind' in e for e in errs)) def test_kind_store_passes(self): ok, errs = validate_manifest(_minimal_manifest(kind='store')) self.assertTrue(ok) def test_kind_absent_passes(self): m = _minimal_manifest() m.pop('kind', None) ok, errs = validate_manifest(m) self.assertTrue(ok) def test_kind_unknown_rejected(self): ok, errs = validate_manifest(_minimal_manifest(kind='custom')) self.assertFalse(ok) # ── container_name ──────────────────────────────────────────────────── def test_container_name_without_cell_prefix_rejected(self): ok, errs = validate_manifest(_minimal_manifest(container_name='myapp')) self.assertFalse(ok) self.assertTrue(any('container_name' in e for e in errs)) def test_container_name_cell_api_rejected_reserved(self): ok, errs = validate_manifest(_minimal_manifest(container_name='cell-api')) self.assertFalse(ok) self.assertTrue(any('reserved' in e for e in errs)) def test_container_name_cell_valid_name_passes(self): ok, errs = validate_manifest(_minimal_manifest(container_name='cell-valid-name')) self.assertTrue(ok) def test_container_name_with_uppercase_rejected(self): ok, errs = validate_manifest(_minimal_manifest(container_name='cell-MyApp')) self.assertFalse(ok) def test_container_name_cell_caddy_rejected(self): ok, errs = validate_manifest(_minimal_manifest(container_name='cell-caddy')) self.assertFalse(ok) def test_container_name_cell_wireguard_rejected(self): ok, errs = validate_manifest(_minimal_manifest(container_name='cell-wireguard')) self.assertFalse(ok) def test_container_name_absent_passes(self): m = _minimal_manifest() m.pop('container_name', None) ok, errs = validate_manifest(m) self.assertTrue(ok) # ── subdomain ──────────────────────────────────────────────────────── def test_subdomain_api_rejected(self): ok, errs = validate_manifest(_minimal_manifest(subdomain='api')) self.assertFalse(ok) self.assertTrue(any('reserved' in e for e in errs)) def test_subdomain_webui_rejected(self): ok, errs = validate_manifest(_minimal_manifest(subdomain='webui')) self.assertFalse(ok) def test_subdomain_myapp_passes(self): ok, errs = validate_manifest(_minimal_manifest(subdomain='myapp')) self.assertTrue(ok) def test_subdomain_absent_passes(self): m = _minimal_manifest() m.pop('subdomain', None) ok, errs = validate_manifest(m) self.assertTrue(ok) def test_subdomain_calendar_passes(self): # calendar/files/webmail/etc are service subdomains, not infrastructure; # official PIC store services need to claim them. ok, errs = validate_manifest(_minimal_manifest(subdomain='calendar')) self.assertTrue(ok, errs) def test_subdomain_files_passes(self): ok, errs = validate_manifest(_minimal_manifest(subdomain='files')) self.assertTrue(ok, errs) # ── extra_subdomains ───────────────────────────────────────────────── def test_extra_subdomains_admin_rejected(self): ok, errs = validate_manifest( _minimal_manifest(extra_subdomains=['admin']) ) self.assertFalse(ok) self.assertTrue(any('reserved' in e for e in errs)) def test_extra_subdomains_valid_entry_passes(self): ok, errs = validate_manifest( _minimal_manifest(extra_subdomains=['sub']) ) self.assertTrue(ok) def test_extra_subdomains_multiple_one_bad_rejected(self): ok, errs = validate_manifest( _minimal_manifest(extra_subdomains=['good', 'admin']) ) self.assertFalse(ok) # ── backend ────────────────────────────────────────────────────────── def test_backend_cell_api_rejected(self): ok, errs = validate_manifest(_minimal_manifest(backend='cell-api:3000')) self.assertFalse(ok) self.assertTrue(any('denylist' in e for e in errs)) def test_backend_localhost_rejected(self): ok, errs = validate_manifest(_minimal_manifest(backend='localhost:8080')) self.assertFalse(ok) def test_backend_cell_mail_valid_port_passes(self): ok, errs = validate_manifest(_minimal_manifest(backend='cell-mail:25')) self.assertTrue(ok) def test_backend_bad_format_rejected(self): ok, errs = validate_manifest(_minimal_manifest(backend='not-a-backend')) self.assertFalse(ok) self.assertTrue(any('host:port' in e for e in errs)) def test_backend_127_0_0_1_rejected(self): ok, errs = validate_manifest(_minimal_manifest(backend='127.0.0.1:8080')) self.assertFalse(ok) def test_backend_host_docker_internal_rejected(self): ok, errs = validate_manifest( _minimal_manifest(backend='host.docker.internal:8080') ) self.assertFalse(ok) def test_backend_absent_passes(self): m = _minimal_manifest() m.pop('backend', None) ok, errs = validate_manifest(m) self.assertTrue(ok) # ── extra_backends ─────────────────────────────────────────────────── def test_extra_backends_cell_coredns_rejected(self): ok, errs = validate_manifest( _minimal_manifest(extra_backends={'dns': 'cell-coredns:53'}) ) self.assertFalse(ok) def test_extra_backends_valid_passes(self): ok, errs = validate_manifest( _minimal_manifest(extra_backends={'alt': 'cell-myapp:80'}) ) self.assertTrue(ok) # ── cap_add ────────────────────────────────────────────────────────── def test_cap_add_sys_admin_rejected(self): ok, errs = validate_manifest(_minimal_manifest(cap_add=['SYS_ADMIN'])) self.assertFalse(ok) self.assertTrue(any('SYS_ADMIN' in e for e in errs)) def test_cap_add_all_rejected(self): ok, errs = validate_manifest(_minimal_manifest(cap_add=['ALL'])) self.assertFalse(ok) self.assertTrue(any('ALL' in e for e in errs)) def test_cap_add_net_admin_passes(self): ok, errs = validate_manifest(_minimal_manifest(cap_add=['NET_ADMIN'])) self.assertTrue(ok) def test_cap_add_net_admin_and_sys_admin_rejected(self): ok, errs = validate_manifest( _minimal_manifest(cap_add=['NET_ADMIN', 'SYS_ADMIN']) ) self.assertFalse(ok) self.assertTrue(any('SYS_ADMIN' in e for e in errs)) def test_cap_add_unknown_cap_rejected(self): ok, errs = validate_manifest(_minimal_manifest(cap_add=['UNKNOWN_CAP'])) self.assertFalse(ok) self.assertTrue(any('allowlist' in e for e in errs)) def test_cap_add_sys_module_rejected(self): ok, errs = validate_manifest(_minimal_manifest(cap_add=['SYS_MODULE'])) self.assertFalse(ok) def test_cap_add_not_a_list_rejected(self): ok, errs = validate_manifest(_minimal_manifest(cap_add='NET_ADMIN')) self.assertFalse(ok) # ── provision_hook (via accounts key) ──────────────────────────────── def test_provision_hook_string_rejected(self): m = _minimal_manifest(accounts={'provision_hook': 'setup email add {username}'}) ok, errs = validate_manifest(m) self.assertFalse(ok) self.assertTrue(any('shell string' in e for e in errs)) def test_provision_hook_argv_dict_passes(self): m = _minimal_manifest( accounts={'provision_hook': {'argv': ['setup', 'email', 'add']}} ) ok, errs = validate_manifest(m) self.assertTrue(ok) def test_provision_hook_absent_passes(self): m = _minimal_manifest() # no 'accounts' key at all ok, errs = validate_manifest(m) self.assertTrue(ok) def test_provision_hook_none_in_accounts_passes(self): m = _minimal_manifest(accounts={'provision_hook': None}) ok, errs = validate_manifest(m) self.assertTrue(ok) # ── env values ─────────────────────────────────────────────────────── def test_env_value_with_dollar_rejected(self): m = _minimal_manifest(env=[{'key': 'X', 'value': '$SECRET'}]) ok, errs = validate_manifest(m) self.assertFalse(ok) def test_env_value_with_curly_brace_rejected(self): m = _minimal_manifest(env=[{'key': 'X', 'value': '{bad}'}]) ok, errs = validate_manifest(m) self.assertFalse(ok) def test_env_value_example_com_passes(self): m = _minimal_manifest(env=[{'key': 'DOMAIN', 'value': 'example.com'}]) ok, errs = validate_manifest(m) self.assertTrue(ok) def test_env_value_complex_safe_string_passes(self): m = _minimal_manifest( env=[{'key': 'X', 'value': 'user@host:port/path+extra-chars'}] ) ok, errs = validate_manifest(m) self.assertTrue(ok) def test_env_empty_passes(self): m = _minimal_manifest(env=[]) ok, errs = validate_manifest(m) self.assertTrue(ok) # --------------------------------------------------------------------------- # TestValidateRenderedCompose # --------------------------------------------------------------------------- class TestValidateRenderedCompose(unittest.TestCase): # ── valid compose ───────────────────────────────────────────────────── def test_valid_compose_with_external_network_passes(self): ok, errs = validate_rendered_compose(_valid_compose()) self.assertTrue(ok) self.assertEqual(errs, []) def test_invalid_yaml_returns_error(self): ok, errs = validate_rendered_compose('}{invalid yaml{') self.assertFalse(ok) self.assertTrue(any('YAML' in e for e in errs)) def test_non_mapping_yaml_rejected(self): ok, errs = validate_rendered_compose('- item1\n- item2\n') self.assertFalse(ok) # ── privileged ─────────────────────────────────────────────────────── def test_privileged_true_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' privileged: true\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('privileged' in e for e in errs)) def test_privileged_false_passes(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' privileged: false\n' ' command: ["echo"]\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertTrue(ok) # ── network_mode ───────────────────────────────────────────────────── def test_network_mode_host_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' network_mode: host\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('network_mode' in e for e in errs)) def test_network_mode_none_value_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' network_mode: "none"\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) def test_no_network_mode_key_passes(self): ok, errs = validate_rendered_compose(_valid_compose()) self.assertTrue(ok) # ── volumes ────────────────────────────────────────────────────────── def test_absolute_volume_etc_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' volumes:\n' ' - /etc:/etc\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('bind mount' in e for e in errs)) def test_absolute_volume_docker_sock_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' volumes:\n' ' - /var/run/docker.sock:/var/run/docker.sock\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) def test_relative_volume_passes(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ' volumes:\n' ' - ./data:/data\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertTrue(ok) def test_named_volume_passes(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ' volumes:\n' ' - myvolume:/data\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertTrue(ok) def test_absolute_volume_under_allowed_data_dir_passes(self): # After ${PIC_DATA_DIR} substitution, compose templates produce absolute # paths like /data/services/email/mail:/var/mail. These must be allowed # when allowed_data_dir is set to the same prefix. yaml_text = ( 'services:\n' ' mail:\n' ' image: svc-email:latest\n' ' command: ["postfix"]\n' ' volumes:\n' ' - /data/services/email/mail:/var/mail\n' ' - /data/services/email/config:/tmp/config\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text, allowed_data_dir='/data') self.assertTrue(ok, errs) def test_absolute_volume_outside_allowed_data_dir_still_rejected(self): yaml_text = ( 'services:\n' ' app:\n' ' image: nginx\n' ' volumes:\n' ' - /etc/passwd:/etc/passwd\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text, allowed_data_dir='/data') self.assertFalse(ok) self.assertTrue(any('bind mount' in e for e in errs)) def test_absolute_volume_rejected_when_no_allowed_data_dir(self): yaml_text = ( 'services:\n' ' app:\n' ' image: nginx\n' ' volumes:\n' ' - /data/services/email/mail:/var/mail\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) # Without allowed_data_dir, even /data paths are rejected ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('bind mount' in e for e in errs)) # ── cap_add ────────────────────────────────────────────────────────── def test_compose_cap_add_sys_admin_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' cap_add:\n' ' - SYS_ADMIN\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('SYS_ADMIN' in e for e in errs)) def test_compose_cap_add_net_admin_passes(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ' cap_add:\n' ' - NET_ADMIN\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertTrue(ok) # ── external network requirement ───────────────────────────────────── def test_no_external_network_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' 'networks:\n' ' mynet:\n' ' driver: bridge\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('external' in e for e in errs)) def test_no_networks_key_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) # ── devices ────────────────────────────────────────────────────────── def test_devices_key_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' devices:\n' ' - /dev/snd:/dev/snd\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('devices' in e for e in errs)) # ── security_opt ───────────────────────────────────────────────────── def test_security_opt_apparmor_unconfined_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' security_opt:\n' ' - apparmor=unconfined\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('apparmor=unconfined' in e for e in errs)) def test_security_opt_no_new_privileges_passes(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ' security_opt:\n' ' - no-new-privileges:true\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertTrue(ok) def test_security_opt_seccomp_unconfined_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' security_opt:\n' ' - seccomp=unconfined\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) # ── command / entrypoint ───────────────────────────────────────────── def test_command_as_string_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: "echo hello"\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('command' in e for e in errs)) def test_command_as_list_passes(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo", "hello"]\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertTrue(ok) def test_entrypoint_as_string_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' entrypoint: "/bin/sh -c evil"\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('entrypoint' in e for e in errs)) # ── pid / ipc / userns ─────────────────────────────────────────────── def test_pid_host_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' pid: host\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('pid' in e for e in errs)) def test_ipc_host_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' ipc: host\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) # ── container_name reserved in compose ────────────────────────────── def test_compose_container_name_cell_api_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ' container_name: cell-api\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('reserved' in e for e in errs)) def test_compose_container_name_cell_caddy_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ' container_name: cell-caddy\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) def test_compose_container_name_cell_wireguard_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ' container_name: cell-wireguard\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) def test_compose_container_name_cell_coredns_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ' container_name: cell-coredns\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) def test_compose_container_name_non_reserved_passes(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' ' container_name: cell-myapp\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertTrue(ok) def test_compose_no_container_name_passes(self): ok, errs = validate_rendered_compose(_valid_compose()) self.assertTrue(ok) # ── multi-service ───────────────────────────────────────────────────── def test_multiple_services_one_invalid_rejected(self): yaml_text = ( 'version: "3.8"\n' 'services:\n' ' good:\n' ' image: nginx\n' ' command: ["echo"]\n' ' bad:\n' ' image: nginx\n' ' privileged: true\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('bad' in e for e in errs)) # --------------------------------------------------------------------------- # TestValidateRenderedComposeHostNetwork # --------------------------------------------------------------------------- class TestValidateRenderedComposeHostNetwork(unittest.TestCase): """Tests for allow_host_network=True — connectivity services.""" _HOST_NET_COMPOSE = ( 'services:\n' ' wireguard-ext:\n' ' image: git.pic.ngo/roof/svc-wireguard-ext:latest\n' ' container_name: cell-wg-ext\n' ' restart: unless-stopped\n' ' network_mode: host\n' ' cap_add:\n' ' - NET_ADMIN\n' ' volumes:\n' ' - /app/data/services/wireguard-ext/config:/etc/wireguard\n' ) def test_host_network_compose_passes_with_flag(self): ok, errs = validate_rendered_compose( self._HOST_NET_COMPOSE, allowed_data_dir='/app/data', allow_host_network=True, ) self.assertTrue(ok, errs) def test_host_network_compose_fails_without_flag(self): ok, errs = validate_rendered_compose( self._HOST_NET_COMPOSE, allowed_data_dir='/app/data', allow_host_network=False, ) self.assertFalse(ok) def test_network_mode_host_rejected_without_flag(self): yaml_text = ( 'services:\n' ' svc:\n' ' image: git.pic.ngo/roof/foo:latest\n' ' network_mode: host\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('network_mode' in e for e in errs)) def test_devices_allowed_with_flag(self): yaml_text = ( 'services:\n' ' openvpn-client:\n' ' image: git.pic.ngo/roof/svc-openvpn-client:latest\n' ' container_name: cell-openvpn\n' ' network_mode: host\n' ' cap_add:\n' ' - NET_ADMIN\n' ' devices:\n' ' - /dev/net/tun\n' ' volumes:\n' ' - /app/data/services/openvpn-client/config:/etc/openvpn\n' ) ok, errs = validate_rendered_compose( yaml_text, allowed_data_dir='/app/data', allow_host_network=True, ) self.assertTrue(ok, errs) def test_devices_rejected_without_flag(self): yaml_text = ( 'services:\n' ' svc:\n' ' image: git.pic.ngo/roof/foo:latest\n' ' devices:\n' ' - /dev/net/tun\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) ok, errs = validate_rendered_compose(yaml_text) self.assertFalse(ok) self.assertTrue(any('devices' in e for e in errs)) def test_no_external_network_ok_with_flag(self): yaml_text = ( 'services:\n' ' tor:\n' ' image: git.pic.ngo/roof/svc-tor:latest\n' ' network_mode: host\n' ) ok, errs = validate_rendered_compose(yaml_text, allow_host_network=True) self.assertTrue(ok, errs) def test_privileged_still_rejected_with_flag(self): yaml_text = ( 'services:\n' ' svc:\n' ' image: git.pic.ngo/roof/foo:latest\n' ' network_mode: host\n' ' privileged: true\n' ) ok, errs = validate_rendered_compose(yaml_text, allow_host_network=True) self.assertFalse(ok) self.assertTrue(any('privileged' in e for e in errs)) def test_non_host_network_mode_rejected_with_flag(self): """When allow_host_network=True, only 'host' is accepted as network_mode.""" yaml_text = ( 'services:\n' ' svc:\n' ' image: git.pic.ngo/roof/foo:latest\n' ' network_mode: none\n' ) ok, errs = validate_rendered_compose(yaml_text, allow_host_network=True) self.assertFalse(ok) self.assertTrue(any('network_mode' in e for e in errs)) # --------------------------------------------------------------------------- # TestValidateProvisionHook # --------------------------------------------------------------------------- class TestValidateProvisionHook(unittest.TestCase): def test_none_passes(self): ok, msg = validate_provision_hook(None) self.assertTrue(ok) self.assertEqual(msg, '') def test_string_rejected(self): ok, msg = validate_provision_hook('setup email add {username}') self.assertFalse(ok) self.assertIn('shell string', msg) def test_argv_dict_passes(self): ok, msg = validate_provision_hook({'argv': ['setup', 'email', 'add']}) self.assertTrue(ok) self.assertEqual(msg, '') def test_empty_argv_rejected(self): ok, msg = validate_provision_hook({'argv': []}) self.assertFalse(ok) self.assertIn('non-empty', msg) def test_single_element_argv_passes(self): ok, msg = validate_provision_hook({'argv': ['setup']}) self.assertTrue(ok) def test_uppercase_binary_rejected(self): ok, msg = validate_provision_hook({'argv': ['UPPERCASE']}) self.assertFalse(ok) self.assertIn('argv[0]', msg) def test_nul_byte_in_element_rejected(self): ok, msg = validate_provision_hook({'argv': ['setup\x00evil']}) self.assertFalse(ok) self.assertIn('NUL', msg) def test_nul_byte_in_arg_not_binary_rejected(self): ok, msg = validate_provision_hook({'argv': ['setup', 'arg\x00evil']}) self.assertFalse(ok) def test_integer_rejected(self): ok, msg = validate_provision_hook(42) self.assertFalse(ok) self.assertIn('dict', msg) def test_list_rejected(self): ok, msg = validate_provision_hook(['setup', 'add']) self.assertFalse(ok) def test_dict_without_argv_rejected(self): ok, msg = validate_provision_hook({'cmd': ['setup']}) self.assertFalse(ok) self.assertIn('argv', msg) def test_binary_with_digits_passes(self): ok, msg = validate_provision_hook({'argv': ['setup2']}) self.assertTrue(ok) def test_binary_with_hyphens_and_underscores_passes(self): ok, msg = validate_provision_hook({'argv': ['my-setup_tool']}) self.assertTrue(ok) def test_binary_starting_with_digit_rejected(self): ok, msg = validate_provision_hook({'argv': ['2setup']}) self.assertFalse(ok) # --------------------------------------------------------------------------- # TestServiceComposerIntegration # --------------------------------------------------------------------------- class TestServiceComposerIntegration(unittest.TestCase): """Integration tests for ServiceComposer.write_compose() security gate.""" def _make_cm(self): cm = MagicMock() cm.get_identity.return_value = { 'cell_name': 'testcell', 'domain': 'cell.local', } cm.get_effective_domain.return_value = 'cell.local' cm.configs = {} return cm def test_write_compose_raises_on_privileged_true(self): from service_composer import ServiceComposer privileged_template = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' privileged: true\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} with self.assertRaises(ValueError) as ctx: composer.write_compose('testsvc', manifest, privileged_template) self.assertIn('security validation', str(ctx.exception)) def test_write_compose_no_temp_file_on_validation_failure(self): """Validation failure must not leave a .tmp file on disk.""" from service_composer import ServiceComposer privileged_template = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' privileged: true\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} svc_dir = os.path.join(tmpdir, 'services', 'testsvc') os.makedirs(svc_dir, exist_ok=True) try: composer.write_compose('testsvc', manifest, privileged_template) except ValueError: pass tmp_path = os.path.join(svc_dir, 'docker-compose.yml.tmp') final_path = os.path.join(svc_dir, 'docker-compose.yml') self.assertFalse( os.path.exists(tmp_path), 'Temp file should not exist after validation failure', ) self.assertFalse( os.path.exists(final_path), 'Final compose file should not exist after validation failure', ) def test_render_template_replaces_pic_data_dir(self): from service_composer import ServiceComposer from pathlib import Path with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} template = 'DATA=${PIC_DATA_DIR}' result = composer.render_template('testsvc', manifest, template) expected = str(Path(tmpdir).resolve()) self.assertIn(expected, result) self.assertNotIn('${PIC_DATA_DIR}', result) def test_write_compose_succeeds_on_valid_template(self): from service_composer import ServiceComposer valid_template = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo", "ok"]\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} content = composer.write_compose('testsvc', manifest, valid_template) self.assertIn('nginx', content) expected_path = os.path.join( tmpdir, 'services', 'testsvc', 'docker-compose.yml' ) self.assertTrue(os.path.exists(expected_path)) # --------------------------------------------------------------------------- # TestServiceStoreManagerSecurityIntegration # --------------------------------------------------------------------------- class TestServiceStoreManagerSecurityIntegration(unittest.TestCase): """Integration tests for ServiceStoreManager._validate_manifest() security additions.""" def _make_manager(self): from service_store_manager import ServiceStoreManager cm = MagicMock() cm.get_installed_services.return_value = {} cm.get_identity.return_value = { 'ip_range': '172.20.0.0/16', 'service_ips': {}, } return ServiceStoreManager( config_manager=cm, caddy_manager=MagicMock(), container_manager=MagicMock(), ) def _valid_manifest(self, **overrides): m = { 'id': 'myapp', 'name': 'My App', 'version': '1.0.0', 'author': 'Test Author', 'image': 'git.pic.ngo/roof/myapp@sha256:' + 'a' * 64, 'container_name': 'cell-myapp', } m.update(overrides) return m def test_validate_manifest_rejects_cap_add_sys_admin(self): from service_store_manager import ServiceStoreManager m = self._valid_manifest(cap_add=['SYS_ADMIN']) ok, errs = ServiceStoreManager._validate_manifest(m) self.assertFalse(ok) self.assertTrue(any('SYS_ADMIN' in e for e in errs)) def test_validate_manifest_accepts_cap_add_net_admin(self): from service_store_manager import ServiceStoreManager m = self._valid_manifest(cap_add=['NET_ADMIN']) ok, errs = ServiceStoreManager._validate_manifest(m) self.assertTrue(ok) def test_validate_manifest_rejects_provision_hook_string(self): from service_store_manager import ServiceStoreManager m = self._valid_manifest( accounts={'provision_hook': 'setup user add'} ) ok, errs = ServiceStoreManager._validate_manifest(m) self.assertFalse(ok) self.assertTrue(any('shell string' in e for e in errs)) def test_validate_manifest_rejects_kind_builtin(self): from service_store_manager import ServiceStoreManager m = self._valid_manifest(kind='builtin') ok, errs = ServiceStoreManager._validate_manifest(m) self.assertFalse(ok) def test_validate_manifest_rejects_reserved_container_name(self): from service_store_manager import ServiceStoreManager m = self._valid_manifest(container_name='cell-api') ok, errs = ServiceStoreManager._validate_manifest(m) self.assertFalse(ok) def test_fetch_manifest_raises_on_oversized_response(self): from service_store_manager import ServiceStoreManager mgr = self._make_manager() oversized_content = b'x' * (256 * 1024 + 1) raw_mock = MagicMock() raw_mock.read.return_value = oversized_content mock_resp = MagicMock() mock_resp.raise_for_status = MagicMock() mock_resp.raw = raw_mock with patch('service_store_manager.requests.get', return_value=mock_resp): with self.assertRaises(ValueError) as ctx: mgr._fetch_manifest('bigservice') self.assertIn('256 KB', str(ctx.exception)) def test_fetch_manifest_succeeds_with_small_response(self): from service_store_manager import ServiceStoreManager mgr = self._make_manager() payload = {'id': 'smallsvc', 'name': 'Small'} small_content = json.dumps(payload).encode() raw_mock = MagicMock() raw_mock.read.return_value = small_content mock_resp = MagicMock() mock_resp.raise_for_status = MagicMock() mock_resp.raw = raw_mock with patch('service_store_manager.requests.get', return_value=mock_resp): result = mgr._fetch_manifest('smallsvc') self.assertEqual(result['id'], 'smallsvc') def test_fetch_manifest_requests_stream_true(self): """_fetch_manifest must use stream=True so raw.read() is available.""" from service_store_manager import ServiceStoreManager mgr = self._make_manager() payload = json.dumps({'id': 'svc'}).encode() raw_mock = MagicMock() raw_mock.read.return_value = payload mock_resp = MagicMock() mock_resp.raise_for_status = MagicMock() mock_resp.raw = raw_mock with patch('service_store_manager.requests.get', return_value=mock_resp) as mock_get: mgr._fetch_manifest('svc') _, kwargs = mock_get.call_args self.assertTrue(kwargs.get('stream'), 'requests.get must be called with stream=True') def test_validate_manifest_rejects_backend_denylist(self): from service_store_manager import ServiceStoreManager m = self._valid_manifest(backend='localhost:8080') ok, errs = ServiceStoreManager._validate_manifest(m) self.assertFalse(ok) def test_validate_manifest_accepts_valid_backend(self): from service_store_manager import ServiceStoreManager m = self._valid_manifest(backend='cell-myapp:8080') ok, errs = ServiceStoreManager._validate_manifest(m) self.assertTrue(ok) def test_validate_manifest_allows_image_tag_only_first_party(self): """First-party images without a digest pin are allowed (warning only).""" from service_store_manager import ServiceStoreManager m = self._valid_manifest(image='git.pic.ngo/roof/myapp:latest') ok, errs = ServiceStoreManager._validate_manifest(m) self.assertTrue(ok) def test_validate_manifest_accepts_image_with_digest(self): from service_store_manager import ServiceStoreManager digest = 'b' * 64 m = self._valid_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}') ok, errs = ServiceStoreManager._validate_manifest(m) self.assertTrue(ok) # --------------------------------------------------------------------------- # TestInstallManifestValidation # --------------------------------------------------------------------------- class TestInstallManifestValidation(unittest.TestCase): """Tests that ServiceStoreManager.install() propagates manifest validation errors.""" def _make_ssm(self, manifest): from service_store_manager import ServiceStoreManager cm = MagicMock() cm.get_installed_services.return_value = {} composer = MagicMock() composer._resolve_requires.return_value = None composer.install.return_value = {'ok': True} ssm = ServiceStoreManager( config_manager=cm, caddy_manager=MagicMock(), container_manager=MagicMock(), service_composer=composer, ) ssm._fetch_manifest = MagicMock(return_value=manifest) ssm._fetch_template = MagicMock(return_value=( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' 'networks:\n' ' cell-network:\n' ' external: true\n' )) return ssm def test_install_succeeds_when_image_tag_only_first_party(self): # First-party images without a digest pin are allowed (warning, not error). manifest = { 'id': 'myapp', 'name': 'My App', 'version': '1.0.0', 'author': 'Test', 'image': 'git.pic.ngo/roof/myapp:latest', 'container_name': 'cell-myapp', } ssm = self._make_ssm(manifest) result = ssm.install('myapp') self.assertTrue(result['ok']) def test_install_returns_error_when_kind_is_builtin(self): digest = 'c' * 64 manifest = { 'id': 'myapp', 'name': 'My App', 'version': '1.0.0', 'author': 'Test', 'image': f'git.pic.ngo/roof/myapp@sha256:{digest}', 'container_name': 'cell-myapp', 'kind': 'builtin', } ssm = self._make_ssm(manifest) result = ssm.install('myapp') self.assertFalse(result['ok']) def test_install_returns_error_when_cap_add_sys_admin(self): digest = 'd' * 64 manifest = { 'id': 'myapp', 'name': 'My App', 'version': '1.0.0', 'author': 'Test', 'image': f'git.pic.ngo/roof/myapp@sha256:{digest}', 'container_name': 'cell-myapp', 'cap_add': ['SYS_ADMIN'], } ssm = self._make_ssm(manifest) result = ssm.install('myapp') self.assertFalse(result['ok']) def test_install_succeeds_with_valid_manifest(self): digest = 'e' * 64 manifest = { 'id': 'myapp', 'name': 'My App', 'version': '1.0.0', 'author': 'Test', 'image': f'git.pic.ngo/roof/myapp@sha256:{digest}', 'container_name': 'cell-myapp', } ssm = self._make_ssm(manifest) result = ssm.install('myapp') self.assertTrue(result['ok']) # --------------------------------------------------------------------------- # TestWriteComposeValidation # --------------------------------------------------------------------------- class TestWriteComposeValidation(unittest.TestCase): """Tests that ServiceComposer.write_compose() rejects each specific violation.""" def _make_cm(self): cm = MagicMock() cm.get_identity.return_value = { 'cell_name': 'testcell', 'domain': 'cell.local', } cm.get_effective_domain.return_value = 'cell.local' cm.configs = {} return cm def _make_template(self, **service_extras): """Build a compose YAML string with optional extra service fields.""" extra_lines = '' for key, val in service_extras.items(): if isinstance(val, bool): extra_lines += f' {key}: {"true" if val else "false"}\n' elif isinstance(val, list): extra_lines += f' {key}:\n' for item in val: extra_lines += f' - {item}\n' else: extra_lines += f' {key}: {val}\n' return ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' + extra_lines + 'networks:\n' ' cell-network:\n' ' external: true\n' ) def test_write_compose_raises_on_network_mode_host(self): from service_composer import ServiceComposer with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} template = self._make_template(network_mode='host') with self.assertRaises(ValueError) as ctx: composer.write_compose('testsvc', manifest, template) self.assertIn('security validation', str(ctx.exception)) def test_write_compose_raises_on_pid_host(self): from service_composer import ServiceComposer with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} template = self._make_template(pid='host') with self.assertRaises(ValueError) as ctx: composer.write_compose('testsvc', manifest, template) self.assertIn('security validation', str(ctx.exception)) def test_write_compose_raises_on_ipc_host(self): from service_composer import ServiceComposer with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} template = self._make_template(ipc='host') with self.assertRaises(ValueError) as ctx: composer.write_compose('testsvc', manifest, template) self.assertIn('security validation', str(ctx.exception)) def test_write_compose_raises_on_userns_mode_host(self): from service_composer import ServiceComposer with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} template = self._make_template(userns_mode='host') with self.assertRaises(ValueError) as ctx: composer.write_compose('testsvc', manifest, template) self.assertIn('security validation', str(ctx.exception)) def test_write_compose_raises_on_reserved_container_name(self): from service_composer import ServiceComposer with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} template = self._make_template(container_name='cell-api') with self.assertRaises(ValueError) as ctx: composer.write_compose('testsvc', manifest, template) self.assertIn('security validation', str(ctx.exception)) def test_write_compose_raises_on_cap_add_all(self): from service_composer import ServiceComposer with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} template = self._make_template(cap_add=['ALL']) with self.assertRaises(ValueError) as ctx: composer.write_compose('testsvc', manifest, template) self.assertIn('security validation', str(ctx.exception)) def test_write_compose_raises_when_no_external_network(self): from service_composer import ServiceComposer with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} template = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: ["echo"]\n' 'networks:\n' ' internal:\n' ' driver: bridge\n' ) with self.assertRaises(ValueError): composer.write_compose('testsvc', manifest, template) def test_write_compose_raises_on_string_command(self): from service_composer import ServiceComposer with tempfile.TemporaryDirectory() as tmpdir: composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir) manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}} template = ( 'version: "3.8"\n' 'services:\n' ' app:\n' ' image: nginx\n' ' command: "echo hello && rm -rf /"\n' 'networks:\n' ' cell-network:\n' ' external: true\n' ) with self.assertRaises(ValueError) as ctx: composer.write_compose('testsvc', manifest, template) self.assertIn('security validation', str(ctx.exception)) if __name__ == '__main__': unittest.main()