feat: add manifest_validator.py — security chokepoint for compose and manifest validation
Unit Tests / test (push) Successful in 11m18s

Rejects privileged compose configs (network_mode:host, pid:host, ipc:host,
userns_mode:host, cap_add:ALL, string commands, missing cell-network,
reserved container names). Validates manifest schema_version=3, image
digest pinning (sha256 required, :tag-only rejected), and provision hook
format. Wired into ServiceComposer.write_compose() and
ServiceStoreManager.install() as a single enforcement point.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 18:45:45 -04:00
parent 62b31b072b
commit 1f2f9d9f6e
4 changed files with 495 additions and 7 deletions
+441 -1
View File
@@ -88,6 +88,120 @@ class TestValidateManifest(unittest.TestCase):
for e in errs:
self.assertIsInstance(e, str)
# ── schema_version ───────────────────────────────────────────────────
def test_schema_version_3_passes(self):
ok, errs = validate_manifest(_minimal_manifest(schema_version=3))
self.assertTrue(ok)
def test_schema_version_2_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(schema_version=2))
self.assertFalse(ok)
self.assertTrue(any('schema_version' in e for e in errs))
def test_schema_version_1_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(schema_version=1))
self.assertFalse(ok)
def test_schema_version_string_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(schema_version='3'))
self.assertFalse(ok)
self.assertTrue(any('schema_version' in e for e in errs))
def test_schema_version_absent_passes(self):
m = _minimal_manifest()
m.pop('schema_version', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ── id ───────────────────────────────────────────────────────────────
def test_id_valid_lowercase_passes(self):
ok, errs = validate_manifest(_minimal_manifest(id='myapp'))
self.assertTrue(ok)
def test_id_with_hyphen_and_underscore_passes(self):
ok, errs = validate_manifest(_minimal_manifest(id='my-app_v2'))
self.assertTrue(ok)
def test_id_starts_with_digit_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(id='1app'))
self.assertFalse(ok)
self.assertTrue(any('id' in e for e in errs))
def test_id_uppercase_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(id='MyApp'))
self.assertFalse(ok)
def test_id_with_space_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(id='my app'))
self.assertFalse(ok)
def test_id_too_long_rejected(self):
ok, errs = validate_manifest(_minimal_manifest(id='a' * 32))
self.assertFalse(ok)
self.assertTrue(any('id' in e for e in errs))
def test_id_31_chars_passes(self):
# Pattern allows up to 31 chars total (1 + 30)
ok, errs = validate_manifest(_minimal_manifest(id='a' + 'b' * 30))
self.assertTrue(ok)
def test_id_absent_passes(self):
m = _minimal_manifest()
m.pop('id', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ── image ────────────────────────────────────────────────────────────
def test_image_with_digest_passes(self):
digest = 'a' * 64
ok, errs = validate_manifest(
_minimal_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}')
)
self.assertTrue(ok)
def test_image_tag_only_rejected(self):
ok, errs = validate_manifest(
_minimal_manifest(image='git.pic.ngo/roof/myapp:latest')
)
self.assertFalse(ok)
self.assertTrue(any('image' in e for e in errs))
def test_image_wrong_registry_rejected(self):
digest = 'a' * 64
ok, errs = validate_manifest(
_minimal_manifest(image=f'docker.io/library/nginx@sha256:{digest}')
)
self.assertFalse(ok)
def test_image_digest_too_short_rejected(self):
ok, errs = validate_manifest(
_minimal_manifest(image='git.pic.ngo/roof/myapp@sha256:abc123')
)
self.assertFalse(ok)
def test_image_digest_with_uppercase_hex_rejected(self):
# sha256 digest must be lowercase hex
digest = 'A' * 64
ok, errs = validate_manifest(
_minimal_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}')
)
self.assertFalse(ok)
def test_image_no_tag_no_digest_rejected(self):
ok, errs = validate_manifest(
_minimal_manifest(image='git.pic.ngo/roof/myapp')
)
self.assertFalse(ok)
def test_image_absent_passes(self):
m = _minimal_manifest()
m.pop('image', None)
ok, errs = validate_manifest(m)
self.assertTrue(ok)
# ── kind ─────────────────────────────────────────────────────────────
def test_kind_builtin_rejected(self):
@@ -747,6 +861,88 @@ class TestValidateRenderedCompose(unittest.TestCase):
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
# ── container_name reserved in compose ──────────────────────────────
def test_compose_container_name_cell_api_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-api\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
self.assertTrue(any('reserved' in e for e in errs))
def test_compose_container_name_cell_caddy_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-caddy\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
def test_compose_container_name_cell_wireguard_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-wireguard\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
def test_compose_container_name_cell_coredns_rejected(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-coredns\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertFalse(ok)
def test_compose_container_name_non_reserved_passes(self):
yaml_text = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
' container_name: cell-myapp\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
ok, errs = validate_rendered_compose(yaml_text)
self.assertTrue(ok)
def test_compose_no_container_name_passes(self):
ok, errs = validate_rendered_compose(_valid_compose())
self.assertTrue(ok)
# ── multi-service ─────────────────────────────────────────────────────
def test_multiple_services_one_invalid_rejected(self):
@@ -970,7 +1166,7 @@ class TestServiceStoreManagerSecurityIntegration(unittest.TestCase):
'name': 'My App',
'version': '1.0.0',
'author': 'Test Author',
'image': 'git.pic.ngo/roof/myapp:latest',
'image': 'git.pic.ngo/roof/myapp@sha256:' + 'a' * 64,
'container_name': 'cell-myapp',
}
m.update(overrides)
@@ -1065,6 +1261,250 @@ class TestServiceStoreManagerSecurityIntegration(unittest.TestCase):
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertTrue(ok)
def test_validate_manifest_rejects_image_tag_only(self):
"""Image without digest pin must be rejected even for git.pic.ngo/roof/* images."""
from service_store_manager import ServiceStoreManager
m = self._valid_manifest(image='git.pic.ngo/roof/myapp:latest')
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertFalse(ok)
def test_validate_manifest_accepts_image_with_digest(self):
from service_store_manager import ServiceStoreManager
digest = 'b' * 64
m = self._valid_manifest(image=f'git.pic.ngo/roof/myapp@sha256:{digest}')
ok, errs = ServiceStoreManager._validate_manifest(m)
self.assertTrue(ok)
# ---------------------------------------------------------------------------
# TestInstallManifestValidation
# ---------------------------------------------------------------------------
class TestInstallManifestValidation(unittest.TestCase):
"""Tests that ServiceStoreManager.install() propagates manifest validation errors."""
def _make_ssm(self, manifest):
from service_store_manager import ServiceStoreManager
cm = MagicMock()
cm.get_installed_services.return_value = {}
composer = MagicMock()
composer._resolve_requires.return_value = None
composer.install.return_value = {'ok': True}
ssm = ServiceStoreManager(
config_manager=cm,
caddy_manager=MagicMock(),
container_manager=MagicMock(),
service_composer=composer,
)
ssm._fetch_manifest = MagicMock(return_value=manifest)
ssm._fetch_template = MagicMock(return_value=(
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
))
return ssm
def test_install_returns_error_when_image_tag_only(self):
manifest = {
'id': 'myapp',
'name': 'My App',
'version': '1.0.0',
'author': 'Test',
'image': 'git.pic.ngo/roof/myapp:latest',
'container_name': 'cell-myapp',
}
ssm = self._make_ssm(manifest)
result = ssm.install('myapp')
self.assertFalse(result['ok'])
self.assertIn('errors', result)
def test_install_returns_error_when_kind_is_builtin(self):
digest = 'c' * 64
manifest = {
'id': 'myapp',
'name': 'My App',
'version': '1.0.0',
'author': 'Test',
'image': f'git.pic.ngo/roof/myapp@sha256:{digest}',
'container_name': 'cell-myapp',
'kind': 'builtin',
}
ssm = self._make_ssm(manifest)
result = ssm.install('myapp')
self.assertFalse(result['ok'])
def test_install_returns_error_when_cap_add_sys_admin(self):
digest = 'd' * 64
manifest = {
'id': 'myapp',
'name': 'My App',
'version': '1.0.0',
'author': 'Test',
'image': f'git.pic.ngo/roof/myapp@sha256:{digest}',
'container_name': 'cell-myapp',
'cap_add': ['SYS_ADMIN'],
}
ssm = self._make_ssm(manifest)
result = ssm.install('myapp')
self.assertFalse(result['ok'])
def test_install_succeeds_with_valid_manifest(self):
digest = 'e' * 64
manifest = {
'id': 'myapp',
'name': 'My App',
'version': '1.0.0',
'author': 'Test',
'image': f'git.pic.ngo/roof/myapp@sha256:{digest}',
'container_name': 'cell-myapp',
}
ssm = self._make_ssm(manifest)
result = ssm.install('myapp')
self.assertTrue(result['ok'])
# ---------------------------------------------------------------------------
# TestWriteComposeValidation
# ---------------------------------------------------------------------------
class TestWriteComposeValidation(unittest.TestCase):
"""Tests that ServiceComposer.write_compose() rejects each specific violation."""
def _make_cm(self):
cm = MagicMock()
cm.get_identity.return_value = {
'cell_name': 'testcell',
'domain': 'cell.local',
}
cm.get_effective_domain.return_value = 'cell.local'
cm.configs = {}
return cm
def _make_template(self, **service_extras):
"""Build a compose YAML string with optional extra service fields."""
extra_lines = ''
for key, val in service_extras.items():
if isinstance(val, bool):
extra_lines += f' {key}: {"true" if val else "false"}\n'
elif isinstance(val, list):
extra_lines += f' {key}:\n'
for item in val:
extra_lines += f' - {item}\n'
else:
extra_lines += f' {key}: {val}\n'
return (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
+ extra_lines
+ 'networks:\n'
' cell-network:\n'
' external: true\n'
)
def test_write_compose_raises_on_network_mode_host(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(network_mode='host')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_pid_host(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(pid='host')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_ipc_host(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(ipc='host')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_userns_mode_host(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(userns_mode='host')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_reserved_container_name(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(container_name='cell-api')
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_on_cap_add_all(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = self._make_template(cap_add=['ALL'])
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
def test_write_compose_raises_when_no_external_network(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: ["echo"]\n'
'networks:\n'
' internal:\n'
' driver: bridge\n'
)
with self.assertRaises(ValueError):
composer.write_compose('testsvc', manifest, template)
def test_write_compose_raises_on_string_command(self):
from service_composer import ServiceComposer
with tempfile.TemporaryDirectory() as tmpdir:
composer = ServiceComposer(config_manager=self._make_cm(), data_dir=tmpdir)
manifest = {'id': 'test', 'kind': 'store', 'config_schema': {}}
template = (
'version: "3.8"\n'
'services:\n'
' app:\n'
' image: nginx\n'
' command: "echo hello && rm -rf /"\n'
'networks:\n'
' cell-network:\n'
' external: true\n'
)
with self.assertRaises(ValueError) as ctx:
composer.write_compose('testsvc', manifest, template)
self.assertIn('security validation', str(ctx.exception))
if __name__ == '__main__':
unittest.main()