1 Commits

Author SHA1 Message Date
roof 238db60702 feat: secure build phase 1 — cosign cell-side image verification (warn default) + Dockerfile validation
Unit Tests / test (push) Successful in 13m28s
- config/cosign/cosign.pub: public verification key committed to repo (safe);
  cosign private key lives in /home/roof/.pic-secrets/ and is NEVER committed
- api/config_manager.py: image_verification config block (modes: off|warn|enforce,
  default: warn) so existing deployments are unaffected until images are signed
- api/service_composer.py: cosign verify before pull/up; enforce aborts the
  operation, warn logs and proceeds, off skips entirely; also fixes the prior
  unsafe proceed-on-pull-failure path
- api/service_store_manager.py: store-image digest requirement (warn default,
  reject under enforce)
- api/Dockerfile: cosign binary copied from the official cosign image
- docker-compose.yml: config/cosign/ bind-mounted into cell-api container
- install.sh: ensure/verify bundled cosign pubkey on new cell installs
- api/manifest_validator.py: validate_build_context() — Dockerfile lint
- tests: full coverage for config modes, composer verify paths, store digest
  guard, and validate_build_context

Verification defaults to warn so nothing breaks in production until images are
signed (phase 2). Private key stored outside git at /home/roof/.pic-secrets/.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 03:53:47 -04:00
12 changed files with 622 additions and 2 deletions
+6
View File
@@ -1,5 +1,7 @@
FROM docker:27-cli@sha256:851f91d241214e7c6db86513b270d58776379aacc5eb9c4a87e5b47115e3065c AS dockercli FROM docker:27-cli@sha256:851f91d241214e7c6db86513b270d58776379aacc5eb9c4a87e5b47115e3065c AS dockercli
FROM gcr.io/projectsigstore/cosign:v2.4.1@sha256:b03690aa52bfe94054187142fba24dc54137650682810633901767d8a3e15b31 AS cosign
FROM python:3.11-slim@sha256:a3ab0b966bc4e91546a033e22093cb840908979487a9fc0e6e38295747e49ac0 FROM python:3.11-slim@sha256:a3ab0b966bc4e91546a033e22093cb840908979487a9fc0e6e38295747e49ac0
WORKDIR /app/api WORKDIR /app/api
@@ -8,6 +10,10 @@ WORKDIR /app/api
# docker-execs into sibling containers. Non-root is not feasible here. # docker-execs into sibling containers. Non-root is not feasible here.
COPY --from=dockercli /usr/local/bin/docker /usr/local/bin/docker COPY --from=dockercli /usr/local/bin/docker /usr/local/bin/docker
# cosign verifies store-service image signatures against the bundled public key
# (config/cosign/cosign.pub) before ServiceComposer starts a container.
COPY --from=cosign /ko-app/cosign /usr/local/bin/cosign
RUN apt-get update \ RUN apt-get update \
&& apt-get install -y --no-install-recommends \ && apt-get install -y --no-install-recommends \
wireguard-tools \ wireguard-tools \
+38
View File
@@ -47,6 +47,9 @@ logger = logging.getLogger(__name__)
# Valid Python logging levels for the `logging` config section. # Valid Python logging levels for the `logging` config section.
_VALID_LOG_LEVELS = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL') _VALID_LOG_LEVELS = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
# Image signature verification modes (see get/set_image_verification).
_IMAGE_VERIFY_MODES = ('off', 'warn', 'enforce')
# Per-service Python loggers exposed in the verbosity panel. # Per-service Python loggers exposed in the verbosity panel.
_LOGGING_PYTHON_SERVICES = ( _LOGGING_PYTHON_SERVICES = (
'network', 'wireguard', 'email', 'calendar', 'network', 'wireguard', 'email', 'calendar',
@@ -1005,6 +1008,41 @@ class ConfigManager:
ident.setdefault('service_ips', {}).pop(service_id, None) ident.setdefault('service_ips', {}).pop(service_id, None)
self._save_all_configs() self._save_all_configs()
# ── Image signature verification configuration ────────────────────────
#
# Controls how a cell treats store-service container images at install:
# off — skip cosign verification and the digest-pin requirement
# warn — log a warning on a missing digest / failed signature, proceed
# enforce — refuse to start a service whose image is undigested,
# unsigned, or whose signature does not verify
#
# Default is "warn" until the publish pipeline signs all store images; a
# later phase flips the default to "enforce". The section is backed up and
# restored with the rest of cell_config.json automatically.
def get_image_verification(self) -> Dict[str, Any]:
"""Return the image verification config, e.g. {'mode': 'warn'}."""
cfg = self.configs.get('image_verification')
if not isinstance(cfg, dict) or cfg.get('mode') not in _IMAGE_VERIFY_MODES:
cfg = {'mode': 'warn'}
self.configs['image_verification'] = cfg
return dict(cfg)
def get_image_verification_mode(self) -> str:
"""Return just the verification mode string (off|warn|enforce)."""
return self.get_image_verification()['mode']
def set_image_verification_mode(self, mode: str) -> None:
"""Persist the verification mode. Raises ValueError on an invalid mode."""
mode = (mode or '').lower()
if mode not in _IMAGE_VERIFY_MODES:
raise ValueError(
f"Invalid image verification mode: {mode!r} "
f"(expected one of {sorted(_IMAGE_VERIFY_MODES)})"
)
self.configs['image_verification'] = {'mode': mode}
self._save_all_configs()
# ── Logging verbosity configuration ─────────────────────────────────── # ── Logging verbosity configuration ───────────────────────────────────
def _ensure_logging_config(self) -> None: def _ensure_logging_config(self) -> None:
"""Ensure a well-formed `logging` section exists, migrating the legacy """Ensure a well-formed `logging` section exists, migrating the legacy
+185
View File
@@ -48,6 +48,48 @@ _IMAGE_DIGEST_RE = re.compile(
r'^git\.pic\.ngo/roof/[a-zA-Z0-9._/-]+@sha256:[0-9a-f]{64}$' r'^git\.pic\.ngo/roof/[a-zA-Z0-9._/-]+@sha256:[0-9a-f]{64}$'
) )
# ── Build-context (Dockerfile) lint ───────────────────────────────────────
#
# These checks are *defense-in-depth*, not a guarantee. A Dockerfile is
# Turing-ish: a determined author can still fetch code at build time via a
# permitted base image's package manager, multi-stage tricks, or build args.
# The real trust boundary is the isolated builder + cosign signature applied
# by the trusted publish stage (P2). This static lint exists to catch the
# obvious-and-cheap mistakes (un-pinned bases, remote ADD, secret-named args)
# before an image is ever built, and to keep the published corpus uniform.
# Base images a community Dockerfile may build FROM. Each MUST be digest
# pinned so the build is reproducible and the base cannot be swapped under us.
# Keep this curated and small; extend deliberately as P2/P3 add languages.
BUILD_BASE_IMAGE_ALLOWLIST = frozenset({
'docker.io/library/alpine',
'docker.io/library/debian',
'docker.io/library/python',
'docker.io/library/golang',
'docker.io/library/node',
'alpine',
'debian',
'python',
'golang',
'node',
'gcr.io/distroless/static',
'gcr.io/distroless/base',
})
# FROM scratch is only allowed for these (otherwise rejected). Empty by
# default — community images should start from a pinned, scannable base.
BUILD_SCRATCH_ALLOWLIST = frozenset()
_DOCKERFILE_SECRET_NAME_RE = re.compile(r'(TOKEN|KEY|PASSWORD|SECRET)', re.IGNORECASE)
_FROM_RE = re.compile(r'^FROM\s+(.+?)(?:\s+AS\s+\S+)?$', re.IGNORECASE)
_ADD_RE = re.compile(r'^ADD\s+(.+)$', re.IGNORECASE)
_ARG_RE = re.compile(r'^ARG\s+([A-Za-z_][A-Za-z0-9_]*)', re.IGNORECASE)
_ENV_RE = re.compile(r'^ENV\s+(.+)$', re.IGNORECASE)
# Context size / file-count caps — a community build context should be small
# (a Dockerfile + a handful of config/entrypoint files), never a whole tree.
BUILD_CONTEXT_MAX_BYTES = 5 * 1024 * 1024 # 5 MiB
BUILD_CONTEXT_MAX_FILES = 200
def validate_manifest(manifest: dict) -> tuple: def validate_manifest(manifest: dict) -> tuple:
""" """
@@ -294,6 +336,149 @@ def validate_rendered_compose(yaml_text: str, allowed_data_dir: str = None,
return (len(errors) == 0, errors) return (len(errors) == 0, errors)
def _stage_aliases(dockerfile_text: str) -> set:
"""Collect multi-stage build aliases (FROM x AS alias) so later FROM <alias>
references resolve to a same-file stage rather than an external base."""
aliases = set()
for raw in dockerfile_text.splitlines():
line = raw.strip()
m = re.match(r'^FROM\s+\S+\s+AS\s+(\S+)\s*$', line, re.IGNORECASE)
if m:
aliases.add(m.group(1).lower())
return aliases
def _base_is_allowed(base_ref: str) -> tuple:
"""Return (ok, error_or_None) for a single FROM base image reference.
Requires an @sha256: digest pin and that the repository part (sans tag/
digest) is in BUILD_BASE_IMAGE_ALLOWLIST. 'scratch' is handled separately.
"""
if '@sha256:' not in base_ref:
return (False, f'FROM base image must be digest-pinned (@sha256:): {base_ref!r}')
repo = base_ref.split('@', 1)[0].split(':', 1)[0]
if repo not in BUILD_BASE_IMAGE_ALLOWLIST:
return (False, f'FROM base image not in allowlist: {repo!r}')
return (True, None)
def validate_build_context(dockerfile_text: str, context_files=None) -> tuple:
"""
Static lint of a community Dockerfile and its build context.
Returns (True, []) when the Dockerfile passes; (False, [errors]) otherwise.
Enforced (defense-in-depth see module note above, this is NOT a sandbox):
- every external FROM base must be in BUILD_BASE_IMAGE_ALLOWLIST and
digest-pinned (@sha256:)
- FROM scratch only when allowlisted in BUILD_SCRATCH_ALLOWLIST
- no `ADD http(s)://...` (fetches arbitrary remote content at build time)
- no ARG/ENV whose name matches /(TOKEN|KEY|PASSWORD|SECRET)/i (baking a
secret into a layer / build cache)
- context size and file-count caps when context_files metadata is given
context_files: optional iterable of (path, size_bytes) tuples describing the
build context. Pass None to skip the size/count checks (e.g. when only the
Dockerfile text is available, as in CI lint of the manifest repo).
"""
errors = []
if not isinstance(dockerfile_text, str) or not dockerfile_text.strip():
return (False, ['Dockerfile is empty'])
aliases = _stage_aliases(dockerfile_text)
# Join backslash-continued lines so a multi-line instruction is one logical line.
logical_lines = []
buf = ''
for raw in dockerfile_text.splitlines():
stripped = raw.rstrip()
if stripped.endswith('\\'):
buf += stripped[:-1] + ' '
continue
buf += stripped
logical_lines.append(buf)
buf = ''
if buf:
logical_lines.append(buf)
saw_from = False
for line in logical_lines:
line = line.strip()
if not line or line.startswith('#'):
continue
m_from = _FROM_RE.match(line)
if m_from:
saw_from = True
base = m_from.group(1).strip().split()[0]
base_l = base.lower()
if base_l in aliases:
continue # references an earlier build stage, not an external base
if base_l == 'scratch':
if 'scratch' not in BUILD_SCRATCH_ALLOWLIST:
errors.append('FROM scratch is not allowed')
continue
ok, err = _base_is_allowed(base)
if not ok:
errors.append(err)
continue
m_add = _ADD_RE.match(line)
if m_add:
if re.search(r'https?://', m_add.group(1), re.IGNORECASE):
errors.append(f'ADD from a remote URL is not allowed: {line!r}')
continue
m_arg = _ARG_RE.match(line)
if m_arg and _DOCKERFILE_SECRET_NAME_RE.search(m_arg.group(1)):
errors.append(
f'ARG name looks secret-bearing (matches TOKEN|KEY|PASSWORD|SECRET): {m_arg.group(1)!r}'
)
continue
m_env = _ENV_RE.match(line)
if m_env:
# ENV NAME value | ENV NAME=value [NAME2=value2 ...]
body = m_env.group(1).strip()
names = []
if '=' in body:
for tok in body.split():
if '=' in tok:
names.append(tok.split('=', 1)[0])
else:
names.append(body.split()[0] if body.split() else '')
for name in names:
if name and _DOCKERFILE_SECRET_NAME_RE.search(name):
errors.append(
f'ENV name looks secret-bearing (matches TOKEN|KEY|PASSWORD|SECRET): {name!r}'
)
if not saw_from:
errors.append('Dockerfile has no FROM instruction')
if context_files is not None:
total_bytes = 0
count = 0
for entry in context_files:
try:
_path, size = entry
except (TypeError, ValueError):
_path, size = entry, 0
count += 1
total_bytes += int(size or 0)
if count > BUILD_CONTEXT_MAX_FILES:
errors.append(
f'build context has too many files: {count} > {BUILD_CONTEXT_MAX_FILES}'
)
if total_bytes > BUILD_CONTEXT_MAX_BYTES:
errors.append(
f'build context too large: {total_bytes} bytes > {BUILD_CONTEXT_MAX_BYTES}'
)
return (len(errors) == 0, errors)
def validate_provision_hook(hook) -> tuple: def validate_provision_hook(hook) -> tuple:
""" """
Validate a provision_hook value from accounts.provision_hook. Validate a provision_hook value from accounts.provision_hook.
+94 -2
View File
@@ -32,6 +32,16 @@ logger = logging.getLogger('picell')
_SECRET_RE = re.compile(r'\$\{(PIC_SECRET_\w+)\}') _SECRET_RE = re.compile(r'\$\{(PIC_SECRET_\w+)\}')
_SAFE_ID_RE = re.compile(r'^[a-z0-9][a-z0-9_-]{0,63}$') _SAFE_ID_RE = re.compile(r'^[a-z0-9][a-z0-9_-]{0,63}$')
_DIGEST_RE = re.compile(r'@sha256:[0-9a-f]{64}$')
# Bundled cosign public key — shipped in the repo (config/cosign/cosign.pub) so
# every cell can verify store-service image signatures offline. install.sh keeps
# it at /opt/pic/config/cosign/cosign.pub; in the cell-api container it is
# COPYed to /app/config/cosign/cosign.pub.
_COSIGN_PUBKEY_PATH = os.environ.get(
'PIC_COSIGN_PUBKEY', '/app/config/cosign/cosign.pub'
)
_COSIGN_BIN = os.environ.get('PIC_COSIGN_BIN', 'cosign')
class ServiceComposer: class ServiceComposer:
@@ -265,18 +275,100 @@ class ServiceComposer:
self.write_compose(service_id, manifest, template_content) self.write_compose(service_id, manifest, template_content)
return self.up(service_id) return self.up(service_id)
# ── Image signature verification ──────────────────────────────────────
def _verification_mode(self) -> str:
"""Resolve the configured image verification mode (off|warn|enforce)."""
getter = getattr(self.cm, 'get_image_verification_mode', None)
if callable(getter):
try:
return getter()
except Exception as e: # config corruption must not crash install
logger.warning('service_composer: could not read verification mode: %s', e)
return 'warn'
def _cosign_verify(self, image_ref: str) -> Dict:
"""Run `cosign verify` against the bundled public key for one image ref.
Factored out so tests can mock it / mock the subprocess call. Returns a
_run-style dict ({'ok': bool, 'stdout', 'stderr'/'error'}).
"""
cmd = [
_COSIGN_BIN, 'verify',
'--key', _COSIGN_PUBKEY_PATH,
'--insecure-ignore-tlog=true',
image_ref,
]
return self._run(cmd, timeout=120)
def verify_image(self, service_id: str, manifest: Dict) -> Dict:
"""Verify a store image's signature subject to the configured mode.
Returns {'ok': True, 'skipped'|'verified'|'warned': ...} when the install
may proceed, or {'ok': False, 'error': ...} when it must abort (enforce
mode with a missing digest or a failed/absent signature).
"""
mode = self._verification_mode()
if mode == 'off':
return {'ok': True, 'skipped': True}
image_ref = (manifest or {}).get('image', '')
if not image_ref:
# No image to verify (e.g. builtin-style manifest); nothing to do.
return {'ok': True, 'skipped': True}
# Store images must be digest-pinned to be verifiable by digest.
if not _DIGEST_RE.search(image_ref):
msg = (f'image {image_ref!r} for {service_id} is not digest-pinned '
'(@sha256:) — cannot verify signature')
if mode == 'enforce':
logger.error('service_composer: %s; aborting install (enforce)', msg)
return {'ok': False, 'error': msg}
logger.warning('service_composer: %s; proceeding (warn)', msg)
return {'ok': True, 'warned': True}
result = self._cosign_verify(image_ref)
if result.get('ok'):
logger.info('service_composer: cosign verified %s', image_ref)
return {'ok': True, 'verified': True}
detail = result.get('stderr') or result.get('error') or 'signature verification failed'
msg = f'cosign verification failed for {image_ref}: {str(detail)[:200]}'
if mode == 'enforce':
logger.error('service_composer: %s; aborting install (enforce)', msg)
return {'ok': False, 'error': msg}
logger.warning('service_composer: %s; proceeding (warn)', msg)
return {'ok': True, 'warned': True}
def install(self, service_id: str, manifest: Dict, def install(self, service_id: str, manifest: Dict,
template_content: str) -> Dict: template_content: str) -> Dict:
"""Write compose file, pull image, then start containers. """Write compose file, verify + pull image, then start containers.
Image signature verification runs before pull/up. Under enforce mode a
missing digest, missing signature, or failed verification aborts the
install (containers are never started); under warn mode the problem is
logged and the install proceeds; under off mode verification is skipped.
pull is run first so the up step doesn't time out on slow connections. pull is run first so the up step doesn't time out on slow connections.
A single retry handles transient registry hiccups on first install. A single retry handles transient registry hiccups on first install.
""" """
self.write_compose(service_id, manifest, template_content) self.write_compose(service_id, manifest, template_content)
verify = self.verify_image(service_id, manifest)
if not verify.get('ok'):
return {'ok': False, 'error': verify.get('error', 'image verification failed')}
mode = self._verification_mode()
pull = self._store_cmd(service_id, 'pull', timeout=600) pull = self._store_cmd(service_id, 'pull', timeout=600)
if not pull.get('ok'): if not pull.get('ok'):
pull_err = pull.get('stderr') or pull.get('error') or 'unknown error'
if mode == 'enforce':
logger.error('service_composer: image pull for %s failed under enforce, '
'aborting: %s', service_id, str(pull_err)[:200])
return {'ok': False,
'error': f'image pull failed (enforce): {str(pull_err)[:200]}'}
logger.warning('service_composer: image pull for %s failed, proceeding anyway: %s', logger.warning('service_composer: image pull for %s failed, proceeding anyway: %s',
service_id, pull.get('stderr', '')[:200]) service_id, str(pull_err)[:200])
result = self.up(service_id) result = self.up(service_id)
if not result.get('ok'): if not result.get('ok'):
logger.info('service_composer: retrying up for %s after initial failure', service_id) logger.info('service_composer: retrying up for %s after initial failure', service_id)
+15
View File
@@ -306,6 +306,21 @@ class ServiceStoreManager(BaseServiceManager):
if not ok2: if not ok2:
return {'ok': False, 'errors': errs2} return {'ok': False, 'errors': errs2}
# Digest-pin requirement is mode-dependent: the static validators
# above only warn on a missing @sha256: pin (so installs keep
# working until the publish pipeline writes digests). Under
# enforce, a store image without a digest pin is fatal.
mode = self.config_manager.get_image_verification_mode()
image = manifest.get('image', '')
if mode == 'enforce' and image and '@sha256:' not in image:
return {
'ok': False,
'error': (
f'image {image!r} must be digest-pinned (@sha256:) '
'under image_verification mode "enforce"'
),
}
# Dependency check # Dependency check
if self.service_composer is not None: if self.service_composer is not None:
err = self.service_composer._resolve_requires(manifest, installed) err = self.service_composer._resolve_requires(manifest, installed)
+4
View File
@@ -0,0 +1,4 @@
-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEjzJzXg0lMxYRVnJXecvl5YZUhUpK
2WQnyK1SB8Bn9K2JRCHkTIk0D3/78Q4Y5cNuj7i6LFgqx21L/QAiDY21Zw==
-----END PUBLIC KEY-----
+1
View File
@@ -120,6 +120,7 @@ services:
- ./data/api:/app/data - ./data/api:/app/data
- ./data/dns:/app/data/dns - ./data/dns:/app/data/dns
- ./config/api:/app/config - ./config/api:/app/config
- ./config/cosign:/app/config/cosign:ro
- ./config/caddy:/app/config-caddy - ./config/caddy:/app/config-caddy
- ./config/wireguard:/app/config/wireguard - ./config/wireguard:/app/config/wireguard
- ./config/dns:/app/config/dns - ./config/dns:/app/config/dns
+11
View File
@@ -353,6 +353,17 @@ fi
sudo git config --system --add safe.directory "$PIC_DIR" 2>/dev/null || true sudo git config --system --add safe.directory "$PIC_DIR" 2>/dev/null || true
# The cosign public key ships in the repo and is bind-mounted into cell-api so
# store-service image signatures can be verified offline. It is checked in
# (config/cosign/cosign.pub), so the clone above should already provide it;
# warn loudly if it is somehow missing rather than silently skipping verify.
COSIGN_PUBKEY="${PIC_DIR}/config/cosign/cosign.pub"
if [ -f "$COSIGN_PUBKEY" ]; then
log_ok "cosign public key present at ${COSIGN_PUBKEY}"
else
log_warn "cosign public key missing at ${COSIGN_PUBKEY} — image signature verification will be unavailable"
fi
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Step 5 — Run make install # Step 5 — Run make install
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
+36
View File
@@ -517,5 +517,41 @@ class TestEmailManagerApply(unittest.TestCase):
self.assertEqual(result['restarted'], []) self.assertEqual(result['restarted'], [])
class TestImageVerificationConfig(unittest.TestCase):
"""image_verification config round-trip and warn-by-default behaviour."""
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.config_file = os.path.join(self.temp_dir, 'cell_config.json')
self.data_dir = os.path.join(self.temp_dir, 'data')
os.makedirs(self.data_dir, exist_ok=True)
self.cm = ConfigManager(self.config_file, self.data_dir)
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_default_mode_is_warn(self):
self.assertEqual(self.cm.get_image_verification_mode(), 'warn')
self.assertEqual(self.cm.get_image_verification(), {'mode': 'warn'})
def test_set_and_get_round_trip(self):
for mode in ('off', 'warn', 'enforce'):
self.cm.set_image_verification_mode(mode)
self.assertEqual(self.cm.get_image_verification_mode(), mode)
def test_set_mode_persists_across_reload(self):
self.cm.set_image_verification_mode('enforce')
cm2 = ConfigManager(self.config_file, self.data_dir)
self.assertEqual(cm2.get_image_verification_mode(), 'enforce')
def test_invalid_mode_rejected(self):
with self.assertRaises(ValueError):
self.cm.set_image_verification_mode('paranoid')
def test_corrupt_section_falls_back_to_warn(self):
self.cm.configs['image_verification'] = {'mode': 'bogus'}
self.assertEqual(self.cm.get_image_verification_mode(), 'warn')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
+96
View File
@@ -22,8 +22,13 @@ from manifest_validator import (
validate_manifest, validate_manifest,
validate_rendered_compose, validate_rendered_compose,
validate_provision_hook, validate_provision_hook,
validate_build_context,
BUILD_CONTEXT_MAX_FILES,
BUILD_CONTEXT_MAX_BYTES,
) )
_ALPINE_PINNED = 'alpine@sha256:' + 'b' * 64
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helpers # Helpers
@@ -1646,5 +1651,96 @@ class TestWriteComposeValidation(unittest.TestCase):
self.assertIn('security validation', str(ctx.exception)) self.assertIn('security validation', str(ctx.exception))
class TestValidateBuildContext(unittest.TestCase):
"""Static Dockerfile lint (defense-in-depth)."""
def test_allowlisted_pinned_base_ok(self):
df = f'FROM {_ALPINE_PINNED}\nRUN echo hi\n'
ok, errs = validate_build_context(df)
self.assertTrue(ok, errs)
def test_unpinned_base_rejected(self):
ok, errs = validate_build_context('FROM alpine:3.19\n')
self.assertFalse(ok)
self.assertTrue(any('digest-pinned' in e for e in errs))
def test_non_allowlisted_base_rejected(self):
df = 'FROM evil/image@sha256:' + 'c' * 64 + '\n'
ok, errs = validate_build_context(df)
self.assertFalse(ok)
self.assertTrue(any('allowlist' in e for e in errs))
def test_from_scratch_rejected(self):
ok, errs = validate_build_context('FROM scratch\n')
self.assertFalse(ok)
self.assertTrue(any('scratch' in e for e in errs))
def test_multistage_alias_reference_ok(self):
df = (
f'FROM {_ALPINE_PINNED} AS builder\n'
'RUN echo build\n'
'FROM builder\n'
'RUN echo final\n'
)
ok, errs = validate_build_context(df)
self.assertTrue(ok, errs)
def test_add_remote_url_rejected(self):
df = f'FROM {_ALPINE_PINNED}\nADD https://evil.example/x.sh /x.sh\n'
ok, errs = validate_build_context(df)
self.assertFalse(ok)
self.assertTrue(any('remote URL' in e for e in errs))
def test_add_local_path_ok(self):
df = f'FROM {_ALPINE_PINNED}\nADD ./local.tar /opt/\n'
ok, errs = validate_build_context(df)
self.assertTrue(ok, errs)
def test_secret_named_arg_rejected(self):
df = f'FROM {_ALPINE_PINNED}\nARG GITHUB_TOKEN\n'
ok, errs = validate_build_context(df)
self.assertFalse(ok)
self.assertTrue(any('ARG' in e for e in errs))
def test_secret_named_env_rejected(self):
df = f'FROM {_ALPINE_PINNED}\nENV API_SECRET=abc\n'
ok, errs = validate_build_context(df)
self.assertFalse(ok)
self.assertTrue(any('ENV' in e for e in errs))
def test_benign_arg_env_ok(self):
df = f'FROM {_ALPINE_PINNED}\nARG VERSION=1.0\nENV TZ=UTC\n'
ok, errs = validate_build_context(df)
self.assertTrue(ok, errs)
def test_no_from_rejected(self):
ok, errs = validate_build_context('RUN echo hi\n')
self.assertFalse(ok)
def test_empty_dockerfile_rejected(self):
ok, errs = validate_build_context('')
self.assertFalse(ok)
def test_oversized_context_rejected(self):
df = f'FROM {_ALPINE_PINNED}\n'
files = [('big.bin', BUILD_CONTEXT_MAX_BYTES + 1)]
ok, errs = validate_build_context(df, context_files=files)
self.assertFalse(ok)
self.assertTrue(any('too large' in e for e in errs))
def test_too_many_files_rejected(self):
df = f'FROM {_ALPINE_PINNED}\n'
files = [(f'f{i}', 1) for i in range(BUILD_CONTEXT_MAX_FILES + 1)]
ok, errs = validate_build_context(df, context_files=files)
self.assertFalse(ok)
self.assertTrue(any('too many files' in e for e in errs))
def test_small_context_ok(self):
df = f'FROM {_ALPINE_PINNED}\n'
files = [('Dockerfile', 100), ('entrypoint.sh', 500)]
ok, errs = validate_build_context(df, context_files=files)
self.assertTrue(ok, errs)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
+102
View File
@@ -773,5 +773,107 @@ class TestPerInstanceRender(unittest.TestCase):
self.assertEqual(ServiceComposer.instance_id_for('conn_a1b2c3d4'), 'a1b2c3d4') self.assertEqual(ServiceComposer.instance_id_for('conn_a1b2c3d4'), 'a1b2c3d4')
# ── Image signature verification ───────────────────────────────────────────
_DIGEST = '@sha256:' + 'a' * 64
_SIGNED_IMAGE = 'git.pic.ngo/roof/foo' + _DIGEST
def _verify_composer(mode):
cm = _make_cm()
cm.get_image_verification_mode.return_value = mode
c = ServiceComposer(config_manager=cm, data_dir='/fake/data')
# Isolate verification from disk/docker: stub everything except verify logic.
c.write_compose = MagicMock(return_value='')
c.up = MagicMock(return_value={'ok': True})
c._store_cmd = MagicMock(return_value={'ok': True})
return c
class TestImageVerification(unittest.TestCase):
def test_off_mode_skips_cosign(self):
c = _verify_composer('off')
c._cosign_verify = MagicMock()
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c._cosign_verify.assert_not_called()
c.up.assert_called_once()
def test_good_signature_proceeds(self):
c = _verify_composer('enforce')
c._cosign_verify = MagicMock(return_value={'ok': True, 'stdout': '', 'stderr': ''})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c._cosign_verify.assert_called_once()
c.up.assert_called_once()
def test_enforce_bad_signature_aborts(self):
c = _verify_composer('enforce')
c._cosign_verify = MagicMock(return_value={'ok': False, 'stderr': 'no matching signatures'})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertFalse(result['ok'])
self.assertIn('verification failed', result['error'])
c.up.assert_not_called()
def test_warn_bad_signature_proceeds(self):
c = _verify_composer('warn')
c._cosign_verify = MagicMock(return_value={'ok': False, 'stderr': 'no matching signatures'})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c.up.assert_called_once()
def test_enforce_undigested_image_aborts(self):
c = _verify_composer('enforce')
c._cosign_verify = MagicMock()
manifest = {'image': 'git.pic.ngo/roof/foo:latest'}
result = c.install('svc', manifest, 'tpl')
self.assertFalse(result['ok'])
self.assertIn('digest', result['error'])
c._cosign_verify.assert_not_called()
c.up.assert_not_called()
def test_warn_undigested_image_proceeds(self):
c = _verify_composer('warn')
c._cosign_verify = MagicMock()
manifest = {'image': 'git.pic.ngo/roof/foo:latest'}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c._cosign_verify.assert_not_called()
c.up.assert_called_once()
def test_cosign_verify_builds_expected_command(self):
c = _verify_composer('enforce')
with patch.object(c, '_run', return_value={'ok': True}) as mock_run:
c._cosign_verify(_SIGNED_IMAGE)
cmd = mock_run.call_args[0][0]
self.assertEqual(cmd[0:2], ['cosign', 'verify'])
self.assertIn('--key', cmd)
self.assertIn(_SIGNED_IMAGE, cmd)
def test_enforce_pull_failure_aborts(self):
c = _verify_composer('enforce')
c._cosign_verify = MagicMock(return_value={'ok': True})
c._store_cmd = MagicMock(return_value={'ok': False, 'stderr': 'manifest unknown'})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertFalse(result['ok'])
self.assertIn('pull failed', result['error'])
c.up.assert_not_called()
def test_warn_pull_failure_proceeds(self):
c = _verify_composer('warn')
c._cosign_verify = MagicMock(return_value={'ok': True})
c._store_cmd = MagicMock(return_value={'ok': False, 'stderr': 'manifest unknown'})
manifest = {'image': _SIGNED_IMAGE}
result = c.install('svc', manifest, 'tpl')
self.assertTrue(result['ok'])
c.up.assert_called_once()
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
+34
View File
@@ -679,6 +679,40 @@ class TestInstall(unittest.TestCase):
self.assertTrue(result['ok']) self.assertTrue(result['ok'])
self.assertFalse(result.get('already_installed', False)) self.assertFalse(result.get('already_installed', False))
def test_install_enforce_rejects_undigested_image(self):
"""Under enforce mode a store image without @sha256: pin is fatal."""
manifest = _valid_manifest(
id='myapp', container_name='cell-myapp',
image='git.pic.ngo/roof/myapp:latest',
)
ssm, cm, _, composer = _make_ssm(manifest=manifest)
cm.get_image_verification_mode.return_value = 'enforce'
result = ssm.install('myapp')
self.assertFalse(result['ok'])
self.assertIn('digest', result['error'].lower())
composer.install.assert_not_called()
def test_install_warn_allows_undigested_image(self):
"""Under warn mode (default) an undigested image still installs."""
manifest = _valid_manifest(
id='myapp', container_name='cell-myapp',
image='git.pic.ngo/roof/myapp:latest',
)
ssm, cm, _, composer = _make_ssm(manifest=manifest)
cm.get_image_verification_mode.return_value = 'warn'
result = ssm.install('myapp')
self.assertTrue(result['ok'])
composer.install.assert_called_once()
def test_install_enforce_allows_digested_image(self):
"""Under enforce mode a properly digest-pinned image installs."""
manifest = _valid_manifest(id='myapp', container_name='cell-myapp')
ssm, cm, _, composer = _make_ssm(manifest=manifest)
cm.get_image_verification_mode.return_value = 'enforce'
result = ssm.install('myapp')
self.assertTrue(result['ok'])
composer.install.assert_called_once()
def test_install_without_composer_stores_record(self): def test_install_without_composer_stores_record(self):
"""When service_composer=None, skip compose but still store the install record.""" """When service_composer=None, skip compose but still store the install record."""
manifest = _valid_manifest(id='myapp', container_name='cell-myapp') manifest = _valid_manifest(id='myapp', container_name='cell-myapp')