7d5c5421f1
Unit Tests / test (push) Successful in 11m31s
- ConnectivityManager: move config dirs to data_dir/services/<id>/config so Docker can bind-mount them into store-service containers (Docker resolves bind-mount paths on the host, not inside the API container). Add _migrate_legacy_configs to copy existing files from the old config_dir location on first boot. - manifest_validator: add allow_host_network parameter to validate_rendered_compose. When True, waives the external-network requirement, permits network_mode: host, and allows devices: — all needed by VPN/Tor containers that must share the host network namespace to create tun/wg interfaces. Non-host services are unaffected. - service_composer: read requires_host_network from the manifest and pass allow_host_network=True to validate_rendered_compose for connectivity services. - Tests: update file-path assertions to new data_dir layout; add TestMigrateLegacyConfigs, TestValidateRenderedComposeHostNetwork, and two TestWriteCompose cases for the host-network path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1635 lines
61 KiB
Python
1635 lines
61 KiB
Python
"""
|
|
Tests for api/manifest_validator.py — Phase 0 security foundations.
|
|
|
|
Covers validate_manifest(), validate_rendered_compose(), and
|
|
validate_provision_hook(), plus integration tests for the
|
|
ServiceComposer.write_compose() guard and ServiceStoreManager._validate_manifest()
|
|
security delegation.
|
|
|
|
No Docker, network, or filesystem calls are made.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
|
|
|
|
from manifest_validator import (
|
|
validate_manifest,
|
|
validate_rendered_compose,
|
|
validate_provision_hook,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _minimal_manifest(**overrides):
|
|
"""Minimal valid manifest for validate_manifest() (no image/id needed here)."""
|
|
m = {
|
|
'container_name': 'cell-myapp',
|
|
'subdomain': 'myapp',
|
|
'backend': 'cell-myapp:8080',
|
|
}
|
|
m.update(overrides)
|
|
return m
|
|
|
|
|
|
def _valid_compose(extra_services=None, networks=None):
|
|
"""Return a compose YAML string that passes all checks."""
|
|
services = extra_services or ''
|
|
net_block = networks or (
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
return (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["nginx", "-g", "daemon off;"]\n'
|
|
+ services
|
|
+ net_block
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestValidateManifest
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestValidateManifest(unittest.TestCase):
|
|
|
|
# ── valid minimal manifest ────────────────────────────────────────────
|
|
|
|
def test_valid_minimal_manifest_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest())
|
|
self.assertTrue(ok)
|
|
self.assertEqual(errs, [])
|
|
|
|
def test_returns_true_empty_list_on_success(self):
|
|
result = validate_manifest(_minimal_manifest())
|
|
self.assertIsInstance(result, tuple)
|
|
self.assertEqual(len(result), 2)
|
|
ok, errs = result
|
|
self.assertIs(ok, True)
|
|
self.assertIsInstance(errs, list)
|
|
|
|
def test_returns_false_list_of_strings_on_failure(self):
|
|
ok, errs = validate_manifest({'kind': 'builtin'})
|
|
self.assertIs(ok, False)
|
|
self.assertIsInstance(errs, list)
|
|
self.assertTrue(len(errs) > 0)
|
|
for e in errs:
|
|
self.assertIsInstance(e, str)
|
|
|
|
# ── schema_version ───────────────────────────────────────────────────
|
|
|
|
def test_schema_version_3_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(schema_version=3))
|
|
self.assertTrue(ok)
|
|
|
|
def test_schema_version_2_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(schema_version=2))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('schema_version' in e for e in errs))
|
|
|
|
def test_schema_version_1_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(schema_version=1))
|
|
self.assertFalse(ok)
|
|
|
|
def test_schema_version_string_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(schema_version='3'))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('schema_version' in e for e in errs))
|
|
|
|
def test_schema_version_absent_passes(self):
|
|
m = _minimal_manifest()
|
|
m.pop('schema_version', None)
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
# ── id ───────────────────────────────────────────────────────────────
|
|
|
|
def test_id_valid_lowercase_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(id='myapp'))
|
|
self.assertTrue(ok)
|
|
|
|
def test_id_with_hyphen_and_underscore_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(id='my-app_v2'))
|
|
self.assertTrue(ok)
|
|
|
|
def test_id_starts_with_digit_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(id='1app'))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('id' in e for e in errs))
|
|
|
|
def test_id_uppercase_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(id='MyApp'))
|
|
self.assertFalse(ok)
|
|
|
|
def test_id_with_space_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(id='my app'))
|
|
self.assertFalse(ok)
|
|
|
|
def test_id_too_long_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(id='a' * 32))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('id' in e for e in errs))
|
|
|
|
def test_id_31_chars_passes(self):
|
|
# Pattern allows up to 31 chars total (1 + 30)
|
|
ok, errs = validate_manifest(_minimal_manifest(id='a' + 'b' * 30))
|
|
self.assertTrue(ok)
|
|
|
|
def test_id_absent_passes(self):
|
|
m = _minimal_manifest()
|
|
m.pop('id', None)
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
# ── image ────────────────────────────────────────────────────────────
|
|
|
|
def test_image_with_digest_passes(self):
|
|
digest = 'a' * 64
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}')
|
|
)
|
|
self.assertTrue(ok)
|
|
|
|
def test_image_tag_only_first_party_allowed(self):
|
|
# First-party images without a digest pin are allowed (warning only).
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(image='git.pic.ngo/roof/myapp:latest')
|
|
)
|
|
self.assertTrue(ok)
|
|
self.assertEqual(errs, [])
|
|
|
|
def test_image_wrong_registry_rejected(self):
|
|
digest = 'a' * 64
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(image=f'docker.io/library/nginx@sha256:{digest}')
|
|
)
|
|
self.assertFalse(ok)
|
|
|
|
def test_image_digest_too_short_rejected(self):
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(image='git.pic.ngo/roof/myapp@sha256:abc123')
|
|
)
|
|
self.assertFalse(ok)
|
|
|
|
def test_image_digest_with_uppercase_hex_rejected(self):
|
|
# sha256 digest must be lowercase hex
|
|
digest = 'A' * 64
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}')
|
|
)
|
|
self.assertFalse(ok)
|
|
|
|
def test_image_no_tag_no_digest_first_party_allowed(self):
|
|
# No tag and no digest — Docker defaults to :latest; treated as tag-only, allowed.
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(image='git.pic.ngo/roof/myapp')
|
|
)
|
|
self.assertTrue(ok)
|
|
self.assertEqual(errs, [])
|
|
|
|
def test_image_absent_passes(self):
|
|
m = _minimal_manifest()
|
|
m.pop('image', None)
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
# ── kind ─────────────────────────────────────────────────────────────
|
|
|
|
def test_kind_builtin_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(kind='builtin'))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('kind' in e for e in errs))
|
|
|
|
def test_kind_store_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(kind='store'))
|
|
self.assertTrue(ok)
|
|
|
|
def test_kind_absent_passes(self):
|
|
m = _minimal_manifest()
|
|
m.pop('kind', None)
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
def test_kind_unknown_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(kind='custom'))
|
|
self.assertFalse(ok)
|
|
|
|
# ── container_name ────────────────────────────────────────────────────
|
|
|
|
def test_container_name_without_cell_prefix_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(container_name='myapp'))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('container_name' in e for e in errs))
|
|
|
|
def test_container_name_cell_api_rejected_reserved(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-api'))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('reserved' in e for e in errs))
|
|
|
|
def test_container_name_cell_valid_name_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-valid-name'))
|
|
self.assertTrue(ok)
|
|
|
|
def test_container_name_with_uppercase_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-MyApp'))
|
|
self.assertFalse(ok)
|
|
|
|
def test_container_name_cell_caddy_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-caddy'))
|
|
self.assertFalse(ok)
|
|
|
|
def test_container_name_cell_wireguard_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(container_name='cell-wireguard'))
|
|
self.assertFalse(ok)
|
|
|
|
def test_container_name_absent_passes(self):
|
|
m = _minimal_manifest()
|
|
m.pop('container_name', None)
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
# ── subdomain ────────────────────────────────────────────────────────
|
|
|
|
def test_subdomain_api_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(subdomain='api'))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('reserved' in e for e in errs))
|
|
|
|
def test_subdomain_webui_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(subdomain='webui'))
|
|
self.assertFalse(ok)
|
|
|
|
def test_subdomain_myapp_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(subdomain='myapp'))
|
|
self.assertTrue(ok)
|
|
|
|
def test_subdomain_absent_passes(self):
|
|
m = _minimal_manifest()
|
|
m.pop('subdomain', None)
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
def test_subdomain_calendar_passes(self):
|
|
# calendar/files/webmail/etc are service subdomains, not infrastructure;
|
|
# official PIC store services need to claim them.
|
|
ok, errs = validate_manifest(_minimal_manifest(subdomain='calendar'))
|
|
self.assertTrue(ok, errs)
|
|
|
|
def test_subdomain_files_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(subdomain='files'))
|
|
self.assertTrue(ok, errs)
|
|
|
|
# ── extra_subdomains ─────────────────────────────────────────────────
|
|
|
|
def test_extra_subdomains_admin_rejected(self):
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(extra_subdomains=['admin'])
|
|
)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('reserved' in e for e in errs))
|
|
|
|
def test_extra_subdomains_valid_entry_passes(self):
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(extra_subdomains=['sub'])
|
|
)
|
|
self.assertTrue(ok)
|
|
|
|
def test_extra_subdomains_multiple_one_bad_rejected(self):
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(extra_subdomains=['good', 'admin'])
|
|
)
|
|
self.assertFalse(ok)
|
|
|
|
# ── backend ──────────────────────────────────────────────────────────
|
|
|
|
def test_backend_cell_api_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(backend='cell-api:3000'))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('denylist' in e for e in errs))
|
|
|
|
def test_backend_localhost_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(backend='localhost:8080'))
|
|
self.assertFalse(ok)
|
|
|
|
def test_backend_cell_mail_valid_port_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(backend='cell-mail:25'))
|
|
self.assertTrue(ok)
|
|
|
|
def test_backend_bad_format_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(backend='not-a-backend'))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('host:port' in e for e in errs))
|
|
|
|
def test_backend_127_0_0_1_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(backend='127.0.0.1:8080'))
|
|
self.assertFalse(ok)
|
|
|
|
def test_backend_host_docker_internal_rejected(self):
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(backend='host.docker.internal:8080')
|
|
)
|
|
self.assertFalse(ok)
|
|
|
|
def test_backend_absent_passes(self):
|
|
m = _minimal_manifest()
|
|
m.pop('backend', None)
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
# ── extra_backends ───────────────────────────────────────────────────
|
|
|
|
def test_extra_backends_cell_coredns_rejected(self):
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(extra_backends={'dns': 'cell-coredns:53'})
|
|
)
|
|
self.assertFalse(ok)
|
|
|
|
def test_extra_backends_valid_passes(self):
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(extra_backends={'alt': 'cell-myapp:80'})
|
|
)
|
|
self.assertTrue(ok)
|
|
|
|
# ── cap_add ──────────────────────────────────────────────────────────
|
|
|
|
def test_cap_add_sys_admin_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(cap_add=['SYS_ADMIN']))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('SYS_ADMIN' in e for e in errs))
|
|
|
|
def test_cap_add_all_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(cap_add=['ALL']))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('ALL' in e for e in errs))
|
|
|
|
def test_cap_add_net_admin_passes(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(cap_add=['NET_ADMIN']))
|
|
self.assertTrue(ok)
|
|
|
|
def test_cap_add_net_admin_and_sys_admin_rejected(self):
|
|
ok, errs = validate_manifest(
|
|
_minimal_manifest(cap_add=['NET_ADMIN', 'SYS_ADMIN'])
|
|
)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('SYS_ADMIN' in e for e in errs))
|
|
|
|
def test_cap_add_unknown_cap_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(cap_add=['UNKNOWN_CAP']))
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('allowlist' in e for e in errs))
|
|
|
|
def test_cap_add_sys_module_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(cap_add=['SYS_MODULE']))
|
|
self.assertFalse(ok)
|
|
|
|
def test_cap_add_not_a_list_rejected(self):
|
|
ok, errs = validate_manifest(_minimal_manifest(cap_add='NET_ADMIN'))
|
|
self.assertFalse(ok)
|
|
|
|
# ── provision_hook (via accounts key) ────────────────────────────────
|
|
|
|
def test_provision_hook_string_rejected(self):
|
|
m = _minimal_manifest(accounts={'provision_hook': 'setup email add {username}'})
|
|
ok, errs = validate_manifest(m)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('shell string' in e for e in errs))
|
|
|
|
def test_provision_hook_argv_dict_passes(self):
|
|
m = _minimal_manifest(
|
|
accounts={'provision_hook': {'argv': ['setup', 'email', 'add']}}
|
|
)
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
def test_provision_hook_absent_passes(self):
|
|
m = _minimal_manifest()
|
|
# no 'accounts' key at all
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
def test_provision_hook_none_in_accounts_passes(self):
|
|
m = _minimal_manifest(accounts={'provision_hook': None})
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
# ── env values ───────────────────────────────────────────────────────
|
|
|
|
def test_env_value_with_dollar_rejected(self):
|
|
m = _minimal_manifest(env=[{'key': 'X', 'value': '$SECRET'}])
|
|
ok, errs = validate_manifest(m)
|
|
self.assertFalse(ok)
|
|
|
|
def test_env_value_with_curly_brace_rejected(self):
|
|
m = _minimal_manifest(env=[{'key': 'X', 'value': '{bad}'}])
|
|
ok, errs = validate_manifest(m)
|
|
self.assertFalse(ok)
|
|
|
|
def test_env_value_example_com_passes(self):
|
|
m = _minimal_manifest(env=[{'key': 'DOMAIN', 'value': 'example.com'}])
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
def test_env_value_complex_safe_string_passes(self):
|
|
m = _minimal_manifest(
|
|
env=[{'key': 'X', 'value': 'user@host:port/path+extra-chars'}]
|
|
)
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
def test_env_empty_passes(self):
|
|
m = _minimal_manifest(env=[])
|
|
ok, errs = validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestValidateRenderedCompose
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestValidateRenderedCompose(unittest.TestCase):
|
|
|
|
# ── valid compose ─────────────────────────────────────────────────────
|
|
|
|
def test_valid_compose_with_external_network_passes(self):
|
|
ok, errs = validate_rendered_compose(_valid_compose())
|
|
self.assertTrue(ok)
|
|
self.assertEqual(errs, [])
|
|
|
|
def test_invalid_yaml_returns_error(self):
|
|
ok, errs = validate_rendered_compose('}{invalid yaml{')
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('YAML' in e for e in errs))
|
|
|
|
def test_non_mapping_yaml_rejected(self):
|
|
ok, errs = validate_rendered_compose('- item1\n- item2\n')
|
|
self.assertFalse(ok)
|
|
|
|
# ── privileged ───────────────────────────────────────────────────────
|
|
|
|
def test_privileged_true_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' privileged: true\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('privileged' in e for e in errs))
|
|
|
|
def test_privileged_false_passes(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' privileged: false\n'
|
|
' command: ["echo"]\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertTrue(ok)
|
|
|
|
# ── network_mode ─────────────────────────────────────────────────────
|
|
|
|
def test_network_mode_host_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' network_mode: host\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('network_mode' in e for e in errs))
|
|
|
|
def test_network_mode_none_value_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' network_mode: "none"\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
|
|
def test_no_network_mode_key_passes(self):
|
|
ok, errs = validate_rendered_compose(_valid_compose())
|
|
self.assertTrue(ok)
|
|
|
|
# ── volumes ──────────────────────────────────────────────────────────
|
|
|
|
def test_absolute_volume_etc_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' volumes:\n'
|
|
' - /etc:/etc\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('bind mount' in e for e in errs))
|
|
|
|
def test_absolute_volume_docker_sock_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' volumes:\n'
|
|
' - /var/run/docker.sock:/var/run/docker.sock\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
|
|
def test_relative_volume_passes(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' volumes:\n'
|
|
' - ./data:/data\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertTrue(ok)
|
|
|
|
def test_named_volume_passes(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' volumes:\n'
|
|
' - myvolume:/data\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertTrue(ok)
|
|
|
|
def test_absolute_volume_under_allowed_data_dir_passes(self):
|
|
# After ${PIC_DATA_DIR} substitution, compose templates produce absolute
|
|
# paths like /data/services/email/mail:/var/mail. These must be allowed
|
|
# when allowed_data_dir is set to the same prefix.
|
|
yaml_text = (
|
|
'services:\n'
|
|
' mail:\n'
|
|
' image: svc-email:latest\n'
|
|
' command: ["postfix"]\n'
|
|
' volumes:\n'
|
|
' - /data/services/email/mail:/var/mail\n'
|
|
' - /data/services/email/config:/tmp/config\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text, allowed_data_dir='/data')
|
|
self.assertTrue(ok, errs)
|
|
|
|
def test_absolute_volume_outside_allowed_data_dir_still_rejected(self):
|
|
yaml_text = (
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' volumes:\n'
|
|
' - /etc/passwd:/etc/passwd\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text, allowed_data_dir='/data')
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('bind mount' in e for e in errs))
|
|
|
|
def test_absolute_volume_rejected_when_no_allowed_data_dir(self):
|
|
yaml_text = (
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' volumes:\n'
|
|
' - /data/services/email/mail:/var/mail\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
# Without allowed_data_dir, even /data paths are rejected
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('bind mount' in e for e in errs))
|
|
|
|
# ── cap_add ──────────────────────────────────────────────────────────
|
|
|
|
def test_compose_cap_add_sys_admin_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' cap_add:\n'
|
|
' - SYS_ADMIN\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('SYS_ADMIN' in e for e in errs))
|
|
|
|
def test_compose_cap_add_net_admin_passes(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' cap_add:\n'
|
|
' - NET_ADMIN\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertTrue(ok)
|
|
|
|
# ── external network requirement ─────────────────────────────────────
|
|
|
|
def test_no_external_network_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
'networks:\n'
|
|
' mynet:\n'
|
|
' driver: bridge\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('external' in e for e in errs))
|
|
|
|
def test_no_networks_key_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
|
|
# ── devices ──────────────────────────────────────────────────────────
|
|
|
|
def test_devices_key_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' devices:\n'
|
|
' - /dev/snd:/dev/snd\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('devices' in e for e in errs))
|
|
|
|
# ── security_opt ─────────────────────────────────────────────────────
|
|
|
|
def test_security_opt_apparmor_unconfined_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' security_opt:\n'
|
|
' - apparmor=unconfined\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('apparmor=unconfined' in e for e in errs))
|
|
|
|
def test_security_opt_no_new_privileges_passes(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' security_opt:\n'
|
|
' - no-new-privileges:true\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertTrue(ok)
|
|
|
|
def test_security_opt_seccomp_unconfined_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' security_opt:\n'
|
|
' - seccomp=unconfined\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
|
|
# ── command / entrypoint ─────────────────────────────────────────────
|
|
|
|
def test_command_as_string_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: "echo hello"\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('command' in e for e in errs))
|
|
|
|
def test_command_as_list_passes(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo", "hello"]\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertTrue(ok)
|
|
|
|
def test_entrypoint_as_string_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' entrypoint: "/bin/sh -c evil"\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('entrypoint' in e for e in errs))
|
|
|
|
# ── pid / ipc / userns ───────────────────────────────────────────────
|
|
|
|
def test_pid_host_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' pid: host\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('pid' in e for e in errs))
|
|
|
|
def test_ipc_host_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' ipc: host\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
|
|
# ── container_name reserved in compose ──────────────────────────────
|
|
|
|
def test_compose_container_name_cell_api_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' container_name: cell-api\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('reserved' in e for e in errs))
|
|
|
|
def test_compose_container_name_cell_caddy_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' container_name: cell-caddy\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
|
|
def test_compose_container_name_cell_wireguard_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' container_name: cell-wireguard\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
|
|
def test_compose_container_name_cell_coredns_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' container_name: cell-coredns\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
|
|
def test_compose_container_name_non_reserved_passes(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' container_name: cell-myapp\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertTrue(ok)
|
|
|
|
def test_compose_no_container_name_passes(self):
|
|
ok, errs = validate_rendered_compose(_valid_compose())
|
|
self.assertTrue(ok)
|
|
|
|
# ── multi-service ─────────────────────────────────────────────────────
|
|
|
|
def test_multiple_services_one_invalid_rejected(self):
|
|
yaml_text = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' good:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
' bad:\n'
|
|
' image: nginx\n'
|
|
' privileged: true\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('bad' in e for e in errs))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestValidateRenderedComposeHostNetwork
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestValidateRenderedComposeHostNetwork(unittest.TestCase):
|
|
"""Tests for allow_host_network=True — connectivity services."""
|
|
|
|
_HOST_NET_COMPOSE = (
|
|
'services:\n'
|
|
' wireguard-ext:\n'
|
|
' image: git.pic.ngo/roof/svc-wireguard-ext:latest\n'
|
|
' container_name: cell-wg-ext\n'
|
|
' restart: unless-stopped\n'
|
|
' network_mode: host\n'
|
|
' cap_add:\n'
|
|
' - NET_ADMIN\n'
|
|
' volumes:\n'
|
|
' - /app/data/services/wireguard-ext/config:/etc/wireguard\n'
|
|
)
|
|
|
|
def test_host_network_compose_passes_with_flag(self):
|
|
ok, errs = validate_rendered_compose(
|
|
self._HOST_NET_COMPOSE,
|
|
allowed_data_dir='/app/data',
|
|
allow_host_network=True,
|
|
)
|
|
self.assertTrue(ok, errs)
|
|
|
|
def test_host_network_compose_fails_without_flag(self):
|
|
ok, errs = validate_rendered_compose(
|
|
self._HOST_NET_COMPOSE,
|
|
allowed_data_dir='/app/data',
|
|
allow_host_network=False,
|
|
)
|
|
self.assertFalse(ok)
|
|
|
|
def test_network_mode_host_rejected_without_flag(self):
|
|
yaml_text = (
|
|
'services:\n'
|
|
' svc:\n'
|
|
' image: git.pic.ngo/roof/foo:latest\n'
|
|
' network_mode: host\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('network_mode' in e for e in errs))
|
|
|
|
def test_devices_allowed_with_flag(self):
|
|
yaml_text = (
|
|
'services:\n'
|
|
' openvpn-client:\n'
|
|
' image: git.pic.ngo/roof/svc-openvpn-client:latest\n'
|
|
' container_name: cell-openvpn\n'
|
|
' network_mode: host\n'
|
|
' cap_add:\n'
|
|
' - NET_ADMIN\n'
|
|
' devices:\n'
|
|
' - /dev/net/tun\n'
|
|
' volumes:\n'
|
|
' - /app/data/services/openvpn-client/config:/etc/openvpn\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(
|
|
yaml_text,
|
|
allowed_data_dir='/app/data',
|
|
allow_host_network=True,
|
|
)
|
|
self.assertTrue(ok, errs)
|
|
|
|
def test_devices_rejected_without_flag(self):
|
|
yaml_text = (
|
|
'services:\n'
|
|
' svc:\n'
|
|
' image: git.pic.ngo/roof/foo:latest\n'
|
|
' devices:\n'
|
|
' - /dev/net/tun\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('devices' in e for e in errs))
|
|
|
|
def test_no_external_network_ok_with_flag(self):
|
|
yaml_text = (
|
|
'services:\n'
|
|
' tor:\n'
|
|
' image: git.pic.ngo/roof/svc-tor:latest\n'
|
|
' network_mode: host\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text, allow_host_network=True)
|
|
self.assertTrue(ok, errs)
|
|
|
|
def test_privileged_still_rejected_with_flag(self):
|
|
yaml_text = (
|
|
'services:\n'
|
|
' svc:\n'
|
|
' image: git.pic.ngo/roof/foo:latest\n'
|
|
' network_mode: host\n'
|
|
' privileged: true\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text, allow_host_network=True)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('privileged' in e for e in errs))
|
|
|
|
def test_non_host_network_mode_rejected_with_flag(self):
|
|
"""When allow_host_network=True, only 'host' is accepted as network_mode."""
|
|
yaml_text = (
|
|
'services:\n'
|
|
' svc:\n'
|
|
' image: git.pic.ngo/roof/foo:latest\n'
|
|
' network_mode: none\n'
|
|
)
|
|
ok, errs = validate_rendered_compose(yaml_text, allow_host_network=True)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('network_mode' in e for e in errs))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestValidateProvisionHook
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestValidateProvisionHook(unittest.TestCase):
|
|
|
|
def test_none_passes(self):
|
|
ok, msg = validate_provision_hook(None)
|
|
self.assertTrue(ok)
|
|
self.assertEqual(msg, '')
|
|
|
|
def test_string_rejected(self):
|
|
ok, msg = validate_provision_hook('setup email add {username}')
|
|
self.assertFalse(ok)
|
|
self.assertIn('shell string', msg)
|
|
|
|
def test_argv_dict_passes(self):
|
|
ok, msg = validate_provision_hook({'argv': ['setup', 'email', 'add']})
|
|
self.assertTrue(ok)
|
|
self.assertEqual(msg, '')
|
|
|
|
def test_empty_argv_rejected(self):
|
|
ok, msg = validate_provision_hook({'argv': []})
|
|
self.assertFalse(ok)
|
|
self.assertIn('non-empty', msg)
|
|
|
|
def test_single_element_argv_passes(self):
|
|
ok, msg = validate_provision_hook({'argv': ['setup']})
|
|
self.assertTrue(ok)
|
|
|
|
def test_uppercase_binary_rejected(self):
|
|
ok, msg = validate_provision_hook({'argv': ['UPPERCASE']})
|
|
self.assertFalse(ok)
|
|
self.assertIn('argv[0]', msg)
|
|
|
|
def test_nul_byte_in_element_rejected(self):
|
|
ok, msg = validate_provision_hook({'argv': ['setup\x00evil']})
|
|
self.assertFalse(ok)
|
|
self.assertIn('NUL', msg)
|
|
|
|
def test_nul_byte_in_arg_not_binary_rejected(self):
|
|
ok, msg = validate_provision_hook({'argv': ['setup', 'arg\x00evil']})
|
|
self.assertFalse(ok)
|
|
|
|
def test_integer_rejected(self):
|
|
ok, msg = validate_provision_hook(42)
|
|
self.assertFalse(ok)
|
|
self.assertIn('dict', msg)
|
|
|
|
def test_list_rejected(self):
|
|
ok, msg = validate_provision_hook(['setup', 'add'])
|
|
self.assertFalse(ok)
|
|
|
|
def test_dict_without_argv_rejected(self):
|
|
ok, msg = validate_provision_hook({'cmd': ['setup']})
|
|
self.assertFalse(ok)
|
|
self.assertIn('argv', msg)
|
|
|
|
def test_binary_with_digits_passes(self):
|
|
ok, msg = validate_provision_hook({'argv': ['setup2']})
|
|
self.assertTrue(ok)
|
|
|
|
def test_binary_with_hyphens_and_underscores_passes(self):
|
|
ok, msg = validate_provision_hook({'argv': ['my-setup_tool']})
|
|
self.assertTrue(ok)
|
|
|
|
def test_binary_starting_with_digit_rejected(self):
|
|
ok, msg = validate_provision_hook({'argv': ['2setup']})
|
|
self.assertFalse(ok)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestServiceComposerIntegration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestServiceComposerIntegration(unittest.TestCase):
|
|
"""Integration tests for ServiceComposer.write_compose() security gate."""
|
|
|
|
def _make_cm(self):
|
|
cm = MagicMock()
|
|
cm.get_identity.return_value = {
|
|
'cell_name': 'testcell',
|
|
'domain': 'cell.local',
|
|
}
|
|
cm.get_effective_domain.return_value = 'cell.local'
|
|
cm.configs = {}
|
|
return cm
|
|
|
|
def test_write_compose_raises_on_privileged_true(self):
|
|
from service_composer import ServiceComposer
|
|
privileged_template = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' privileged: true\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
with self.assertRaises(ValueError) as ctx:
|
|
composer.write_compose('testsvc', manifest, privileged_template)
|
|
self.assertIn('security validation', str(ctx.exception))
|
|
|
|
def test_write_compose_no_temp_file_on_validation_failure(self):
|
|
"""Validation failure must not leave a .tmp file on disk."""
|
|
from service_composer import ServiceComposer
|
|
privileged_template = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' privileged: true\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
svc_dir = os.path.join(tmpdir, 'services', 'testsvc')
|
|
os.makedirs(svc_dir, exist_ok=True)
|
|
try:
|
|
composer.write_compose('testsvc', manifest, privileged_template)
|
|
except ValueError:
|
|
pass
|
|
tmp_path = os.path.join(svc_dir, 'docker-compose.yml.tmp')
|
|
final_path = os.path.join(svc_dir, 'docker-compose.yml')
|
|
self.assertFalse(
|
|
os.path.exists(tmp_path),
|
|
'Temp file should not exist after validation failure',
|
|
)
|
|
self.assertFalse(
|
|
os.path.exists(final_path),
|
|
'Final compose file should not exist after validation failure',
|
|
)
|
|
|
|
def test_render_template_replaces_pic_data_dir(self):
|
|
from service_composer import ServiceComposer
|
|
from pathlib import Path
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
template = 'DATA=${PIC_DATA_DIR}'
|
|
result = composer.render_template('testsvc', manifest, template)
|
|
expected = str(Path(tmpdir).resolve())
|
|
self.assertIn(expected, result)
|
|
self.assertNotIn('${PIC_DATA_DIR}', result)
|
|
|
|
def test_write_compose_succeeds_on_valid_template(self):
|
|
from service_composer import ServiceComposer
|
|
valid_template = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo", "ok"]\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
content = composer.write_compose('testsvc', manifest, valid_template)
|
|
self.assertIn('nginx', content)
|
|
expected_path = os.path.join(
|
|
tmpdir, 'services', 'testsvc', 'docker-compose.yml'
|
|
)
|
|
self.assertTrue(os.path.exists(expected_path))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestServiceStoreManagerSecurityIntegration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestServiceStoreManagerSecurityIntegration(unittest.TestCase):
|
|
"""Integration tests for ServiceStoreManager._validate_manifest() security additions."""
|
|
|
|
def _make_manager(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
cm = MagicMock()
|
|
cm.get_installed_services.return_value = {}
|
|
cm.get_identity.return_value = {
|
|
'ip_range': '172.20.0.0/16',
|
|
'service_ips': {},
|
|
}
|
|
return ServiceStoreManager(
|
|
config_manager=cm,
|
|
caddy_manager=MagicMock(),
|
|
container_manager=MagicMock(),
|
|
)
|
|
|
|
def _valid_manifest(self, **overrides):
|
|
m = {
|
|
'id': 'myapp',
|
|
'name': 'My App',
|
|
'version': '1.0.0',
|
|
'author': 'Test Author',
|
|
'image': 'git.pic.ngo/roof/myapp@sha256:' + 'a' * 64,
|
|
'container_name': 'cell-myapp',
|
|
}
|
|
m.update(overrides)
|
|
return m
|
|
|
|
def test_validate_manifest_rejects_cap_add_sys_admin(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
m = self._valid_manifest(cap_add=['SYS_ADMIN'])
|
|
ok, errs = ServiceStoreManager._validate_manifest(m)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('SYS_ADMIN' in e for e in errs))
|
|
|
|
def test_validate_manifest_accepts_cap_add_net_admin(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
m = self._valid_manifest(cap_add=['NET_ADMIN'])
|
|
ok, errs = ServiceStoreManager._validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
def test_validate_manifest_rejects_provision_hook_string(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
m = self._valid_manifest(
|
|
accounts={'provision_hook': 'setup user add'}
|
|
)
|
|
ok, errs = ServiceStoreManager._validate_manifest(m)
|
|
self.assertFalse(ok)
|
|
self.assertTrue(any('shell string' in e for e in errs))
|
|
|
|
def test_validate_manifest_rejects_kind_builtin(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
m = self._valid_manifest(kind='builtin')
|
|
ok, errs = ServiceStoreManager._validate_manifest(m)
|
|
self.assertFalse(ok)
|
|
|
|
def test_validate_manifest_rejects_reserved_container_name(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
m = self._valid_manifest(container_name='cell-api')
|
|
ok, errs = ServiceStoreManager._validate_manifest(m)
|
|
self.assertFalse(ok)
|
|
|
|
def test_fetch_manifest_raises_on_oversized_response(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
mgr = self._make_manager()
|
|
oversized_content = b'x' * (256 * 1024 + 1)
|
|
raw_mock = MagicMock()
|
|
raw_mock.read.return_value = oversized_content
|
|
mock_resp = MagicMock()
|
|
mock_resp.raise_for_status = MagicMock()
|
|
mock_resp.raw = raw_mock
|
|
with patch('service_store_manager.requests.get', return_value=mock_resp):
|
|
with self.assertRaises(ValueError) as ctx:
|
|
mgr._fetch_manifest('bigservice')
|
|
self.assertIn('256 KB', str(ctx.exception))
|
|
|
|
def test_fetch_manifest_succeeds_with_small_response(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
mgr = self._make_manager()
|
|
payload = {'id': 'smallsvc', 'name': 'Small'}
|
|
small_content = json.dumps(payload).encode()
|
|
raw_mock = MagicMock()
|
|
raw_mock.read.return_value = small_content
|
|
mock_resp = MagicMock()
|
|
mock_resp.raise_for_status = MagicMock()
|
|
mock_resp.raw = raw_mock
|
|
with patch('service_store_manager.requests.get', return_value=mock_resp):
|
|
result = mgr._fetch_manifest('smallsvc')
|
|
self.assertEqual(result['id'], 'smallsvc')
|
|
|
|
def test_fetch_manifest_requests_stream_true(self):
|
|
"""_fetch_manifest must use stream=True so raw.read() is available."""
|
|
from service_store_manager import ServiceStoreManager
|
|
mgr = self._make_manager()
|
|
payload = json.dumps({'id': 'svc'}).encode()
|
|
raw_mock = MagicMock()
|
|
raw_mock.read.return_value = payload
|
|
mock_resp = MagicMock()
|
|
mock_resp.raise_for_status = MagicMock()
|
|
mock_resp.raw = raw_mock
|
|
with patch('service_store_manager.requests.get', return_value=mock_resp) as mock_get:
|
|
mgr._fetch_manifest('svc')
|
|
_, kwargs = mock_get.call_args
|
|
self.assertTrue(kwargs.get('stream'), 'requests.get must be called with stream=True')
|
|
|
|
def test_validate_manifest_rejects_backend_denylist(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
m = self._valid_manifest(backend='localhost:8080')
|
|
ok, errs = ServiceStoreManager._validate_manifest(m)
|
|
self.assertFalse(ok)
|
|
|
|
def test_validate_manifest_accepts_valid_backend(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
m = self._valid_manifest(backend='cell-myapp:8080')
|
|
ok, errs = ServiceStoreManager._validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
def test_validate_manifest_allows_image_tag_only_first_party(self):
|
|
"""First-party images without a digest pin are allowed (warning only)."""
|
|
from service_store_manager import ServiceStoreManager
|
|
m = self._valid_manifest(image='git.pic.ngo/roof/myapp:latest')
|
|
ok, errs = ServiceStoreManager._validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
def test_validate_manifest_accepts_image_with_digest(self):
|
|
from service_store_manager import ServiceStoreManager
|
|
digest = 'b' * 64
|
|
m = self._valid_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}')
|
|
ok, errs = ServiceStoreManager._validate_manifest(m)
|
|
self.assertTrue(ok)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestInstallManifestValidation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestInstallManifestValidation(unittest.TestCase):
|
|
"""Tests that ServiceStoreManager.install() propagates manifest validation errors."""
|
|
|
|
def _make_ssm(self, manifest):
|
|
from service_store_manager import ServiceStoreManager
|
|
cm = MagicMock()
|
|
cm.get_installed_services.return_value = {}
|
|
composer = MagicMock()
|
|
composer._resolve_requires.return_value = None
|
|
composer.install.return_value = {'ok': True}
|
|
ssm = ServiceStoreManager(
|
|
config_manager=cm,
|
|
caddy_manager=MagicMock(),
|
|
container_manager=MagicMock(),
|
|
service_composer=composer,
|
|
)
|
|
ssm._fetch_manifest = MagicMock(return_value=manifest)
|
|
ssm._fetch_template = MagicMock(return_value=(
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
))
|
|
return ssm
|
|
|
|
def test_install_succeeds_when_image_tag_only_first_party(self):
|
|
# First-party images without a digest pin are allowed (warning, not error).
|
|
manifest = {
|
|
'id': 'myapp',
|
|
'name': 'My App',
|
|
'version': '1.0.0',
|
|
'author': 'Test',
|
|
'image': 'git.pic.ngo/roof/myapp:latest',
|
|
'container_name': 'cell-myapp',
|
|
}
|
|
ssm = self._make_ssm(manifest)
|
|
result = ssm.install('myapp')
|
|
self.assertTrue(result['ok'])
|
|
|
|
def test_install_returns_error_when_kind_is_builtin(self):
|
|
digest = 'c' * 64
|
|
manifest = {
|
|
'id': 'myapp',
|
|
'name': 'My App',
|
|
'version': '1.0.0',
|
|
'author': 'Test',
|
|
'image': f'git.pic.ngo/roof/myapp@sha256:{digest}',
|
|
'container_name': 'cell-myapp',
|
|
'kind': 'builtin',
|
|
}
|
|
ssm = self._make_ssm(manifest)
|
|
result = ssm.install('myapp')
|
|
self.assertFalse(result['ok'])
|
|
|
|
def test_install_returns_error_when_cap_add_sys_admin(self):
|
|
digest = 'd' * 64
|
|
manifest = {
|
|
'id': 'myapp',
|
|
'name': 'My App',
|
|
'version': '1.0.0',
|
|
'author': 'Test',
|
|
'image': f'git.pic.ngo/roof/myapp@sha256:{digest}',
|
|
'container_name': 'cell-myapp',
|
|
'cap_add': ['SYS_ADMIN'],
|
|
}
|
|
ssm = self._make_ssm(manifest)
|
|
result = ssm.install('myapp')
|
|
self.assertFalse(result['ok'])
|
|
|
|
def test_install_succeeds_with_valid_manifest(self):
|
|
digest = 'e' * 64
|
|
manifest = {
|
|
'id': 'myapp',
|
|
'name': 'My App',
|
|
'version': '1.0.0',
|
|
'author': 'Test',
|
|
'image': f'git.pic.ngo/roof/myapp@sha256:{digest}',
|
|
'container_name': 'cell-myapp',
|
|
}
|
|
ssm = self._make_ssm(manifest)
|
|
result = ssm.install('myapp')
|
|
self.assertTrue(result['ok'])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestWriteComposeValidation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestWriteComposeValidation(unittest.TestCase):
|
|
"""Tests that ServiceComposer.write_compose() rejects each specific violation."""
|
|
|
|
def _make_cm(self):
|
|
cm = MagicMock()
|
|
cm.get_identity.return_value = {
|
|
'cell_name': 'testcell',
|
|
'domain': 'cell.local',
|
|
}
|
|
cm.get_effective_domain.return_value = 'cell.local'
|
|
cm.configs = {}
|
|
return cm
|
|
|
|
def _make_template(self, **service_extras):
|
|
"""Build a compose YAML string with optional extra service fields."""
|
|
extra_lines = ''
|
|
for key, val in service_extras.items():
|
|
if isinstance(val, bool):
|
|
extra_lines += f' {key}: {"true" if val else "false"}\n'
|
|
elif isinstance(val, list):
|
|
extra_lines += f' {key}:\n'
|
|
for item in val:
|
|
extra_lines += f' - {item}\n'
|
|
else:
|
|
extra_lines += f' {key}: {val}\n'
|
|
return (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
+ extra_lines
|
|
+ 'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
|
|
def test_write_compose_raises_on_network_mode_host(self):
|
|
from service_composer import ServiceComposer
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
template = self._make_template(network_mode='host')
|
|
with self.assertRaises(ValueError) as ctx:
|
|
composer.write_compose('testsvc', manifest, template)
|
|
self.assertIn('security validation', str(ctx.exception))
|
|
|
|
def test_write_compose_raises_on_pid_host(self):
|
|
from service_composer import ServiceComposer
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
template = self._make_template(pid='host')
|
|
with self.assertRaises(ValueError) as ctx:
|
|
composer.write_compose('testsvc', manifest, template)
|
|
self.assertIn('security validation', str(ctx.exception))
|
|
|
|
def test_write_compose_raises_on_ipc_host(self):
|
|
from service_composer import ServiceComposer
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
template = self._make_template(ipc='host')
|
|
with self.assertRaises(ValueError) as ctx:
|
|
composer.write_compose('testsvc', manifest, template)
|
|
self.assertIn('security validation', str(ctx.exception))
|
|
|
|
def test_write_compose_raises_on_userns_mode_host(self):
|
|
from service_composer import ServiceComposer
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
template = self._make_template(userns_mode='host')
|
|
with self.assertRaises(ValueError) as ctx:
|
|
composer.write_compose('testsvc', manifest, template)
|
|
self.assertIn('security validation', str(ctx.exception))
|
|
|
|
def test_write_compose_raises_on_reserved_container_name(self):
|
|
from service_composer import ServiceComposer
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
template = self._make_template(container_name='cell-api')
|
|
with self.assertRaises(ValueError) as ctx:
|
|
composer.write_compose('testsvc', manifest, template)
|
|
self.assertIn('security validation', str(ctx.exception))
|
|
|
|
def test_write_compose_raises_on_cap_add_all(self):
|
|
from service_composer import ServiceComposer
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
template = self._make_template(cap_add=['ALL'])
|
|
with self.assertRaises(ValueError) as ctx:
|
|
composer.write_compose('testsvc', manifest, template)
|
|
self.assertIn('security validation', str(ctx.exception))
|
|
|
|
def test_write_compose_raises_when_no_external_network(self):
|
|
from service_composer import ServiceComposer
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
template = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: ["echo"]\n'
|
|
'networks:\n'
|
|
' internal:\n'
|
|
' driver: bridge\n'
|
|
)
|
|
with self.assertRaises(ValueError):
|
|
composer.write_compose('testsvc', manifest, template)
|
|
|
|
def test_write_compose_raises_on_string_command(self):
|
|
from service_composer import ServiceComposer
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
|
|
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
|
|
template = (
|
|
'version: "3.8"\n'
|
|
'services:\n'
|
|
' app:\n'
|
|
' image: nginx\n'
|
|
' command: "echo hello && rm -rf /"\n'
|
|
'networks:\n'
|
|
' cell-network:\n'
|
|
' external: true\n'
|
|
)
|
|
with self.assertRaises(ValueError) as ctx:
|
|
composer.write_compose('testsvc', manifest, template)
|
|
self.assertIn('security validation', str(ctx.exception))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|