From 2ab3d2d5acfa142c1a253d3b6110e5a7a0c28509 Mon Sep 17 00:00:00 2001 From: Dmitrii Iurco Date: Thu, 11 Jun 2026 14:12:58 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20secure=20build=20phase=202=20=E2=80=94?= =?UTF-8?q?=20enforce=20image=20verification=20by=20default?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All store images are now digest-pinned and cosign-signed by the publish pipeline, so the warn-by-default training-wheels period is over: an unsigned or undigested image must not install unless the admin explicitly opts out. The service_composer fallback used when the config manager is unavailable or corrupt also flips to enforce — config corruption must fail closed rather than silently weaken verification. Co-Authored-By: Claude Fable 5 --- api/config_manager.py | 10 ++++---- api/service_composer.py | 2 +- tests/test_config_manager.py | 12 ++++----- tests/test_service_composer.py | 40 +++++++++++++++++++++++++++++ tests/test_service_store_manager.py | 15 ++++++++++- 5 files changed, 66 insertions(+), 13 deletions(-) diff --git a/api/config_manager.py b/api/config_manager.py index 3bc1d1a..82fd9af 100644 --- a/api/config_manager.py +++ b/api/config_manager.py @@ -1016,15 +1016,15 @@ class ConfigManager: # enforce — refuse to start a service whose image is undigested, # unsigned, or whose signature does not verify # - # Default is "warn" until the publish pipeline signs all store images; a - # later phase flips the default to "enforce". The section is backed up and - # restored with the rest of cell_config.json automatically. + # All store images are now signed + digest-pinned via the publish pipeline, + # so the default is "enforce". The section is backed up and restored with + # the rest of cell_config.json automatically. def get_image_verification(self) -> Dict[str, Any]: - """Return the image verification config, e.g. {'mode': 'warn'}.""" + """Return the image verification config, e.g. {'mode': 'enforce'}.""" cfg = self.configs.get('image_verification') if not isinstance(cfg, dict) or cfg.get('mode') not in _IMAGE_VERIFY_MODES: - cfg = {'mode': 'warn'} + cfg = {'mode': 'enforce'} self.configs['image_verification'] = cfg return dict(cfg) diff --git a/api/service_composer.py b/api/service_composer.py index 1e0b723..a85160d 100644 --- a/api/service_composer.py +++ b/api/service_composer.py @@ -285,7 +285,7 @@ class ServiceComposer: return getter() except Exception as e: # config corruption must not crash install logger.warning('service_composer: could not read verification mode: %s', e) - return 'warn' + return 'enforce' def _cosign_verify(self, image_ref: str) -> Dict: """Run `cosign verify` against the bundled public key for one image ref. diff --git a/tests/test_config_manager.py b/tests/test_config_manager.py index 580e8f9..44ec52a 100644 --- a/tests/test_config_manager.py +++ b/tests/test_config_manager.py @@ -518,7 +518,7 @@ class TestEmailManagerApply(unittest.TestCase): class TestImageVerificationConfig(unittest.TestCase): - """image_verification config round-trip and warn-by-default behaviour.""" + """image_verification config round-trip; default is now 'enforce' (P3).""" def setUp(self): self.temp_dir = tempfile.mkdtemp() @@ -530,9 +530,9 @@ class TestImageVerificationConfig(unittest.TestCase): def tearDown(self): shutil.rmtree(self.temp_dir) - def test_default_mode_is_warn(self): - self.assertEqual(self.cm.get_image_verification_mode(), 'warn') - self.assertEqual(self.cm.get_image_verification(), {'mode': 'warn'}) + def test_default_mode_is_enforce(self): + self.assertEqual(self.cm.get_image_verification_mode(), 'enforce') + self.assertEqual(self.cm.get_image_verification(), {'mode': 'enforce'}) def test_set_and_get_round_trip(self): for mode in ('off', 'warn', 'enforce'): @@ -548,9 +548,9 @@ class TestImageVerificationConfig(unittest.TestCase): with self.assertRaises(ValueError): self.cm.set_image_verification_mode('paranoid') - def test_corrupt_section_falls_back_to_warn(self): + def test_corrupt_section_falls_back_to_enforce(self): self.cm.configs['image_verification'] = {'mode': 'bogus'} - self.assertEqual(self.cm.get_image_verification_mode(), 'warn') + self.assertEqual(self.cm.get_image_verification_mode(), 'enforce') if __name__ == '__main__': diff --git a/tests/test_service_composer.py b/tests/test_service_composer.py index c501233..77b92dd 100644 --- a/tests/test_service_composer.py +++ b/tests/test_service_composer.py @@ -874,6 +874,46 @@ class TestImageVerification(unittest.TestCase): self.assertTrue(result['ok']) c.up.assert_called_once() + def test_default_mode_is_enforce(self): + """get_image_verification_mode default is 'enforce' (P3 flip).""" + from config_manager import ConfigManager as RealCM + import tempfile, shutil + tmpdir = tempfile.mkdtemp() + try: + cm = RealCM(os.path.join(tmpdir, 'cell_config.json'), + os.path.join(tmpdir, 'data')) + c = ServiceComposer(config_manager=cm, data_dir=tmpdir) + self.assertEqual(c._verification_mode(), 'enforce') + finally: + shutil.rmtree(tmpdir) + + def test_enforce_default_rejects_unsigned_image(self): + """Under the default enforce mode, an undigested image aborts install.""" + c = _verify_composer('enforce') + c._cosign_verify = MagicMock() + manifest = {'image': 'git.pic.ngo/roof/unsigned:latest'} + result = c.install('svc', manifest, 'tpl') + self.assertFalse(result['ok']) + self.assertIn('digest', result['error']) + c._cosign_verify.assert_not_called() + c.up.assert_not_called() + + def test_mode_read_failure_falls_back_to_enforce(self): + """Config corruption must fail closed, not weaken verification.""" + c = _verify_composer('enforce') + c.cm.get_image_verification_mode.side_effect = RuntimeError('corrupt config') + self.assertEqual(c._verification_mode(), 'enforce') + + def test_enforce_default_proceeds_with_signed_digested_image(self): + """Under the default enforce mode, a digest-pinned signed image proceeds.""" + c = _verify_composer('enforce') + c._cosign_verify = MagicMock(return_value={'ok': True, 'stdout': '[]', 'stderr': ''}) + manifest = {'image': _SIGNED_IMAGE} + result = c.install('svc', manifest, 'tpl') + self.assertTrue(result['ok']) + c._cosign_verify.assert_called_once() + c.up.assert_called_once() + if __name__ == '__main__': unittest.main() diff --git a/tests/test_service_store_manager.py b/tests/test_service_store_manager.py index b6f410c..c3c1df4 100644 --- a/tests/test_service_store_manager.py +++ b/tests/test_service_store_manager.py @@ -693,7 +693,7 @@ class TestInstall(unittest.TestCase): composer.install.assert_not_called() def test_install_warn_allows_undigested_image(self): - """Under warn mode (default) an undigested image still installs.""" + """Under warn mode an undigested image still installs (warn is non-default).""" manifest = _valid_manifest( id='myapp', container_name='cell-myapp', image='git.pic.ngo/roof/myapp:latest', @@ -713,6 +713,19 @@ class TestInstall(unittest.TestCase): self.assertTrue(result['ok']) composer.install.assert_called_once() + def test_install_enforce_default_rejects_undigested_image(self): + """The new default mode (enforce) rejects undigested images without explicit mode set.""" + manifest = _valid_manifest( + id='myapp', container_name='cell-myapp', + image='git.pic.ngo/roof/myapp:latest', + ) + ssm, cm, _, composer = _make_ssm(manifest=manifest) + cm.get_image_verification_mode.return_value = 'enforce' + result = ssm.install('myapp') + self.assertFalse(result['ok']) + self.assertIn('digest', result['error'].lower()) + composer.install.assert_not_called() + def test_install_without_composer_stores_record(self): """When service_composer=None, skip compose but still store the install record.""" manifest = _valid_manifest(id='myapp', container_name='cell-myapp')