Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 238db60702 |
@@ -1,5 +1,7 @@
|
||||
FROM docker:27-cli@sha256:851f91d241214e7c6db86513b270d58776379aacc5eb9c4a87e5b47115e3065c AS dockercli
|
||||
|
||||
FROM gcr.io/projectsigstore/cosign:v2.4.1@sha256:b03690aa52bfe94054187142fba24dc54137650682810633901767d8a3e15b31 AS cosign
|
||||
|
||||
FROM python:3.11-slim@sha256:a3ab0b966bc4e91546a033e22093cb840908979487a9fc0e6e38295747e49ac0
|
||||
|
||||
WORKDIR /app/api
|
||||
@@ -8,6 +10,10 @@ WORKDIR /app/api
|
||||
# docker-execs into sibling containers. Non-root is not feasible here.
|
||||
COPY --from=dockercli /usr/local/bin/docker /usr/local/bin/docker
|
||||
|
||||
# cosign verifies store-service image signatures against the bundled public key
|
||||
# (config/cosign/cosign.pub) before ServiceComposer starts a container.
|
||||
COPY --from=cosign /ko-app/cosign /usr/local/bin/cosign
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends \
|
||||
wireguard-tools \
|
||||
|
||||
@@ -47,6 +47,9 @@ logger = logging.getLogger(__name__)
|
||||
# Valid Python logging levels for the `logging` config section.
|
||||
_VALID_LOG_LEVELS = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
|
||||
|
||||
# Image signature verification modes (see get/set_image_verification).
|
||||
_IMAGE_VERIFY_MODES = ('off', 'warn', 'enforce')
|
||||
|
||||
# Per-service Python loggers exposed in the verbosity panel.
|
||||
_LOGGING_PYTHON_SERVICES = (
|
||||
'network', 'wireguard', 'email', 'calendar',
|
||||
@@ -1005,6 +1008,41 @@ class ConfigManager:
|
||||
ident.setdefault('service_ips', {}).pop(service_id, None)
|
||||
self._save_all_configs()
|
||||
|
||||
# ── Image signature verification configuration ────────────────────────
|
||||
#
|
||||
# Controls how a cell treats store-service container images at install:
|
||||
# off — skip cosign verification and the digest-pin requirement
|
||||
# warn — log a warning on a missing digest / failed signature, proceed
|
||||
# enforce — refuse to start a service whose image is undigested,
|
||||
# unsigned, or whose signature does not verify
|
||||
#
|
||||
# Default is "warn" until the publish pipeline signs all store images; a
|
||||
# later phase flips the default to "enforce". The section is backed up and
|
||||
# restored with the rest of cell_config.json automatically.
|
||||
|
||||
def get_image_verification(self) -> Dict[str, Any]:
|
||||
"""Return the image verification config, e.g. {'mode': 'warn'}."""
|
||||
cfg = self.configs.get('image_verification')
|
||||
if not isinstance(cfg, dict) or cfg.get('mode') not in _IMAGE_VERIFY_MODES:
|
||||
cfg = {'mode': 'warn'}
|
||||
self.configs['image_verification'] = cfg
|
||||
return dict(cfg)
|
||||
|
||||
def get_image_verification_mode(self) -> str:
|
||||
"""Return just the verification mode string (off|warn|enforce)."""
|
||||
return self.get_image_verification()['mode']
|
||||
|
||||
def set_image_verification_mode(self, mode: str) -> None:
|
||||
"""Persist the verification mode. Raises ValueError on an invalid mode."""
|
||||
mode = (mode or '').lower()
|
||||
if mode not in _IMAGE_VERIFY_MODES:
|
||||
raise ValueError(
|
||||
f"Invalid image verification mode: {mode!r} "
|
||||
f"(expected one of {sorted(_IMAGE_VERIFY_MODES)})"
|
||||
)
|
||||
self.configs['image_verification'] = {'mode': mode}
|
||||
self._save_all_configs()
|
||||
|
||||
# ── Logging verbosity configuration ───────────────────────────────────
|
||||
def _ensure_logging_config(self) -> None:
|
||||
"""Ensure a well-formed `logging` section exists, migrating the legacy
|
||||
|
||||
@@ -48,6 +48,48 @@ _IMAGE_DIGEST_RE = re.compile(
|
||||
r'^git\.pic\.ngo/roof/[a-zA-Z0-9._/-]+@sha256:[0-9a-f]{64}$'
|
||||
)
|
||||
|
||||
# ── Build-context (Dockerfile) lint ───────────────────────────────────────
|
||||
#
|
||||
# These checks are *defense-in-depth*, not a guarantee. A Dockerfile is
|
||||
# Turing-ish: a determined author can still fetch code at build time via a
|
||||
# permitted base image's package manager, multi-stage tricks, or build args.
|
||||
# The real trust boundary is the isolated builder + cosign signature applied
|
||||
# by the trusted publish stage (P2). This static lint exists to catch the
|
||||
# obvious-and-cheap mistakes (un-pinned bases, remote ADD, secret-named args)
|
||||
# before an image is ever built, and to keep the published corpus uniform.
|
||||
|
||||
# Base images a community Dockerfile may build FROM. Each MUST be digest
|
||||
# pinned so the build is reproducible and the base cannot be swapped under us.
|
||||
# Keep this curated and small; extend deliberately as P2/P3 add languages.
|
||||
BUILD_BASE_IMAGE_ALLOWLIST = frozenset({
|
||||
'docker.io/library/alpine',
|
||||
'docker.io/library/debian',
|
||||
'docker.io/library/python',
|
||||
'docker.io/library/golang',
|
||||
'docker.io/library/node',
|
||||
'alpine',
|
||||
'debian',
|
||||
'python',
|
||||
'golang',
|
||||
'node',
|
||||
'gcr.io/distroless/static',
|
||||
'gcr.io/distroless/base',
|
||||
})
|
||||
# FROM scratch is only allowed for these (otherwise rejected). Empty by
|
||||
# default — community images should start from a pinned, scannable base.
|
||||
BUILD_SCRATCH_ALLOWLIST = frozenset()
|
||||
|
||||
_DOCKERFILE_SECRET_NAME_RE = re.compile(r'(TOKEN|KEY|PASSWORD|SECRET)', re.IGNORECASE)
|
||||
_FROM_RE = re.compile(r'^FROM\s+(.+?)(?:\s+AS\s+\S+)?$', re.IGNORECASE)
|
||||
_ADD_RE = re.compile(r'^ADD\s+(.+)$', re.IGNORECASE)
|
||||
_ARG_RE = re.compile(r'^ARG\s+([A-Za-z_][A-Za-z0-9_]*)', re.IGNORECASE)
|
||||
_ENV_RE = re.compile(r'^ENV\s+(.+)$', re.IGNORECASE)
|
||||
|
||||
# Context size / file-count caps — a community build context should be small
|
||||
# (a Dockerfile + a handful of config/entrypoint files), never a whole tree.
|
||||
BUILD_CONTEXT_MAX_BYTES = 5 * 1024 * 1024 # 5 MiB
|
||||
BUILD_CONTEXT_MAX_FILES = 200
|
||||
|
||||
|
||||
def validate_manifest(manifest: dict) -> tuple:
|
||||
"""
|
||||
@@ -294,6 +336,149 @@ def validate_rendered_compose(yaml_text: str, allowed_data_dir: str = None,
|
||||
return (len(errors) == 0, errors)
|
||||
|
||||
|
||||
def _stage_aliases(dockerfile_text: str) -> set:
|
||||
"""Collect multi-stage build aliases (FROM x AS alias) so later FROM <alias>
|
||||
references resolve to a same-file stage rather than an external base."""
|
||||
aliases = set()
|
||||
for raw in dockerfile_text.splitlines():
|
||||
line = raw.strip()
|
||||
m = re.match(r'^FROM\s+\S+\s+AS\s+(\S+)\s*$', line, re.IGNORECASE)
|
||||
if m:
|
||||
aliases.add(m.group(1).lower())
|
||||
return aliases
|
||||
|
||||
|
||||
def _base_is_allowed(base_ref: str) -> tuple:
|
||||
"""Return (ok, error_or_None) for a single FROM base image reference.
|
||||
|
||||
Requires an @sha256: digest pin and that the repository part (sans tag/
|
||||
digest) is in BUILD_BASE_IMAGE_ALLOWLIST. 'scratch' is handled separately.
|
||||
"""
|
||||
if '@sha256:' not in base_ref:
|
||||
return (False, f'FROM base image must be digest-pinned (@sha256:): {base_ref!r}')
|
||||
repo = base_ref.split('@', 1)[0].split(':', 1)[0]
|
||||
if repo not in BUILD_BASE_IMAGE_ALLOWLIST:
|
||||
return (False, f'FROM base image not in allowlist: {repo!r}')
|
||||
return (True, None)
|
||||
|
||||
|
||||
def validate_build_context(dockerfile_text: str, context_files=None) -> tuple:
|
||||
"""
|
||||
Static lint of a community Dockerfile and its build context.
|
||||
|
||||
Returns (True, []) when the Dockerfile passes; (False, [errors]) otherwise.
|
||||
|
||||
Enforced (defense-in-depth — see module note above, this is NOT a sandbox):
|
||||
- every external FROM base must be in BUILD_BASE_IMAGE_ALLOWLIST and
|
||||
digest-pinned (@sha256:)
|
||||
- FROM scratch only when allowlisted in BUILD_SCRATCH_ALLOWLIST
|
||||
- no `ADD http(s)://...` (fetches arbitrary remote content at build time)
|
||||
- no ARG/ENV whose name matches /(TOKEN|KEY|PASSWORD|SECRET)/i (baking a
|
||||
secret into a layer / build cache)
|
||||
- context size and file-count caps when context_files metadata is given
|
||||
|
||||
context_files: optional iterable of (path, size_bytes) tuples describing the
|
||||
build context. Pass None to skip the size/count checks (e.g. when only the
|
||||
Dockerfile text is available, as in CI lint of the manifest repo).
|
||||
"""
|
||||
errors = []
|
||||
|
||||
if not isinstance(dockerfile_text, str) or not dockerfile_text.strip():
|
||||
return (False, ['Dockerfile is empty'])
|
||||
|
||||
aliases = _stage_aliases(dockerfile_text)
|
||||
|
||||
# Join backslash-continued lines so a multi-line instruction is one logical line.
|
||||
logical_lines = []
|
||||
buf = ''
|
||||
for raw in dockerfile_text.splitlines():
|
||||
stripped = raw.rstrip()
|
||||
if stripped.endswith('\\'):
|
||||
buf += stripped[:-1] + ' '
|
||||
continue
|
||||
buf += stripped
|
||||
logical_lines.append(buf)
|
||||
buf = ''
|
||||
if buf:
|
||||
logical_lines.append(buf)
|
||||
|
||||
saw_from = False
|
||||
for line in logical_lines:
|
||||
line = line.strip()
|
||||
if not line or line.startswith('#'):
|
||||
continue
|
||||
|
||||
m_from = _FROM_RE.match(line)
|
||||
if m_from:
|
||||
saw_from = True
|
||||
base = m_from.group(1).strip().split()[0]
|
||||
base_l = base.lower()
|
||||
if base_l in aliases:
|
||||
continue # references an earlier build stage, not an external base
|
||||
if base_l == 'scratch':
|
||||
if 'scratch' not in BUILD_SCRATCH_ALLOWLIST:
|
||||
errors.append('FROM scratch is not allowed')
|
||||
continue
|
||||
ok, err = _base_is_allowed(base)
|
||||
if not ok:
|
||||
errors.append(err)
|
||||
continue
|
||||
|
||||
m_add = _ADD_RE.match(line)
|
||||
if m_add:
|
||||
if re.search(r'https?://', m_add.group(1), re.IGNORECASE):
|
||||
errors.append(f'ADD from a remote URL is not allowed: {line!r}')
|
||||
continue
|
||||
|
||||
m_arg = _ARG_RE.match(line)
|
||||
if m_arg and _DOCKERFILE_SECRET_NAME_RE.search(m_arg.group(1)):
|
||||
errors.append(
|
||||
f'ARG name looks secret-bearing (matches TOKEN|KEY|PASSWORD|SECRET): {m_arg.group(1)!r}'
|
||||
)
|
||||
continue
|
||||
|
||||
m_env = _ENV_RE.match(line)
|
||||
if m_env:
|
||||
# ENV NAME value | ENV NAME=value [NAME2=value2 ...]
|
||||
body = m_env.group(1).strip()
|
||||
names = []
|
||||
if '=' in body:
|
||||
for tok in body.split():
|
||||
if '=' in tok:
|
||||
names.append(tok.split('=', 1)[0])
|
||||
else:
|
||||
names.append(body.split()[0] if body.split() else '')
|
||||
for name in names:
|
||||
if name and _DOCKERFILE_SECRET_NAME_RE.search(name):
|
||||
errors.append(
|
||||
f'ENV name looks secret-bearing (matches TOKEN|KEY|PASSWORD|SECRET): {name!r}'
|
||||
)
|
||||
|
||||
if not saw_from:
|
||||
errors.append('Dockerfile has no FROM instruction')
|
||||
|
||||
if context_files is not None:
|
||||
total_bytes = 0
|
||||
count = 0
|
||||
for entry in context_files:
|
||||
try:
|
||||
_path, size = entry
|
||||
except (TypeError, ValueError):
|
||||
_path, size = entry, 0
|
||||
count += 1
|
||||
total_bytes += int(size or 0)
|
||||
if count > BUILD_CONTEXT_MAX_FILES:
|
||||
errors.append(
|
||||
f'build context has too many files: {count} > {BUILD_CONTEXT_MAX_FILES}'
|
||||
)
|
||||
if total_bytes > BUILD_CONTEXT_MAX_BYTES:
|
||||
errors.append(
|
||||
f'build context too large: {total_bytes} bytes > {BUILD_CONTEXT_MAX_BYTES}'
|
||||
)
|
||||
|
||||
return (len(errors) == 0, errors)
|
||||
|
||||
|
||||
def validate_provision_hook(hook) -> tuple:
|
||||
"""
|
||||
Validate a provision_hook value from accounts.provision_hook.
|
||||
|
||||
+94
-2
@@ -32,6 +32,16 @@ logger = logging.getLogger('picell')
|
||||
|
||||
_SECRET_RE = re.compile(r'\$\{(PIC_SECRET_\w+)\}')
|
||||
_SAFE_ID_RE = re.compile(r'^[a-z0-9][a-z0-9_-]{0,63}$')
|
||||
_DIGEST_RE = re.compile(r'@sha256:[0-9a-f]{64}$')
|
||||
|
||||
# Bundled cosign public key — shipped in the repo (config/cosign/cosign.pub) so
|
||||
# every cell can verify store-service image signatures offline. install.sh keeps
|
||||
# it at /opt/pic/config/cosign/cosign.pub; in the cell-api container it is
|
||||
# COPYed to /app/config/cosign/cosign.pub.
|
||||
_COSIGN_PUBKEY_PATH = os.environ.get(
|
||||
'PIC_COSIGN_PUBKEY', '/app/config/cosign/cosign.pub'
|
||||
)
|
||||
_COSIGN_BIN = os.environ.get('PIC_COSIGN_BIN', 'cosign')
|
||||
|
||||
|
||||
class ServiceComposer:
|
||||
@@ -265,18 +275,100 @@ class ServiceComposer:
|
||||
self.write_compose(service_id, manifest, template_content)
|
||||
return self.up(service_id)
|
||||
|
||||
# ── Image signature verification ──────────────────────────────────────
|
||||
|
||||
def _verification_mode(self) -> str:
|
||||
"""Resolve the configured image verification mode (off|warn|enforce)."""
|
||||
getter = getattr(self.cm, 'get_image_verification_mode', None)
|
||||
if callable(getter):
|
||||
try:
|
||||
return getter()
|
||||
except Exception as e: # config corruption must not crash install
|
||||
logger.warning('service_composer: could not read verification mode: %s', e)
|
||||
return 'warn'
|
||||
|
||||
def _cosign_verify(self, image_ref: str) -> Dict:
|
||||
"""Run `cosign verify` against the bundled public key for one image ref.
|
||||
|
||||
Factored out so tests can mock it / mock the subprocess call. Returns a
|
||||
_run-style dict ({'ok': bool, 'stdout', 'stderr'/'error'}).
|
||||
"""
|
||||
cmd = [
|
||||
_COSIGN_BIN, 'verify',
|
||||
'--key', _COSIGN_PUBKEY_PATH,
|
||||
'--insecure-ignore-tlog=true',
|
||||
image_ref,
|
||||
]
|
||||
return self._run(cmd, timeout=120)
|
||||
|
||||
def verify_image(self, service_id: str, manifest: Dict) -> Dict:
|
||||
"""Verify a store image's signature subject to the configured mode.
|
||||
|
||||
Returns {'ok': True, 'skipped'|'verified'|'warned': ...} when the install
|
||||
may proceed, or {'ok': False, 'error': ...} when it must abort (enforce
|
||||
mode with a missing digest or a failed/absent signature).
|
||||
"""
|
||||
mode = self._verification_mode()
|
||||
if mode == 'off':
|
||||
return {'ok': True, 'skipped': True}
|
||||
|
||||
image_ref = (manifest or {}).get('image', '')
|
||||
if not image_ref:
|
||||
# No image to verify (e.g. builtin-style manifest); nothing to do.
|
||||
return {'ok': True, 'skipped': True}
|
||||
|
||||
# Store images must be digest-pinned to be verifiable by digest.
|
||||
if not _DIGEST_RE.search(image_ref):
|
||||
msg = (f'image {image_ref!r} for {service_id} is not digest-pinned '
|
||||
'(@sha256:) — cannot verify signature')
|
||||
if mode == 'enforce':
|
||||
logger.error('service_composer: %s; aborting install (enforce)', msg)
|
||||
return {'ok': False, 'error': msg}
|
||||
logger.warning('service_composer: %s; proceeding (warn)', msg)
|
||||
return {'ok': True, 'warned': True}
|
||||
|
||||
result = self._cosign_verify(image_ref)
|
||||
if result.get('ok'):
|
||||
logger.info('service_composer: cosign verified %s', image_ref)
|
||||
return {'ok': True, 'verified': True}
|
||||
|
||||
detail = result.get('stderr') or result.get('error') or 'signature verification failed'
|
||||
msg = f'cosign verification failed for {image_ref}: {str(detail)[:200]}'
|
||||
if mode == 'enforce':
|
||||
logger.error('service_composer: %s; aborting install (enforce)', msg)
|
||||
return {'ok': False, 'error': msg}
|
||||
logger.warning('service_composer: %s; proceeding (warn)', msg)
|
||||
return {'ok': True, 'warned': True}
|
||||
|
||||
def install(self, service_id: str, manifest: Dict,
|
||||
template_content: str) -> Dict:
|
||||
"""Write compose file, pull image, then start containers.
|
||||
"""Write compose file, verify + pull image, then start containers.
|
||||
|
||||
Image signature verification runs before pull/up. Under enforce mode a
|
||||
missing digest, missing signature, or failed verification aborts the
|
||||
install (containers are never started); under warn mode the problem is
|
||||
logged and the install proceeds; under off mode verification is skipped.
|
||||
|
||||
pull is run first so the up step doesn't time out on slow connections.
|
||||
A single retry handles transient registry hiccups on first install.
|
||||
"""
|
||||
self.write_compose(service_id, manifest, template_content)
|
||||
|
||||
verify = self.verify_image(service_id, manifest)
|
||||
if not verify.get('ok'):
|
||||
return {'ok': False, 'error': verify.get('error', 'image verification failed')}
|
||||
|
||||
mode = self._verification_mode()
|
||||
pull = self._store_cmd(service_id, 'pull', timeout=600)
|
||||
if not pull.get('ok'):
|
||||
pull_err = pull.get('stderr') or pull.get('error') or 'unknown error'
|
||||
if mode == 'enforce':
|
||||
logger.error('service_composer: image pull for %s failed under enforce, '
|
||||
'aborting: %s', service_id, str(pull_err)[:200])
|
||||
return {'ok': False,
|
||||
'error': f'image pull failed (enforce): {str(pull_err)[:200]}'}
|
||||
logger.warning('service_composer: image pull for %s failed, proceeding anyway: %s',
|
||||
service_id, pull.get('stderr', '')[:200])
|
||||
service_id, str(pull_err)[:200])
|
||||
result = self.up(service_id)
|
||||
if not result.get('ok'):
|
||||
logger.info('service_composer: retrying up for %s after initial failure', service_id)
|
||||
|
||||
@@ -306,6 +306,21 @@ class ServiceStoreManager(BaseServiceManager):
|
||||
if not ok2:
|
||||
return {'ok': False, 'errors': errs2}
|
||||
|
||||
# Digest-pin requirement is mode-dependent: the static validators
|
||||
# above only warn on a missing @sha256: pin (so installs keep
|
||||
# working until the publish pipeline writes digests). Under
|
||||
# enforce, a store image without a digest pin is fatal.
|
||||
mode = self.config_manager.get_image_verification_mode()
|
||||
image = manifest.get('image', '')
|
||||
if mode == 'enforce' and image and '@sha256:' not in image:
|
||||
return {
|
||||
'ok': False,
|
||||
'error': (
|
||||
f'image {image!r} must be digest-pinned (@sha256:) '
|
||||
'under image_verification mode "enforce"'
|
||||
),
|
||||
}
|
||||
|
||||
# Dependency check
|
||||
if self.service_composer is not None:
|
||||
err = self.service_composer._resolve_requires(manifest, installed)
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
-----BEGIN PUBLIC KEY-----
|
||||
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEjzJzXg0lMxYRVnJXecvl5YZUhUpK
|
||||
2WQnyK1SB8Bn9K2JRCHkTIk0D3/78Q4Y5cNuj7i6LFgqx21L/QAiDY21Zw==
|
||||
-----END PUBLIC KEY-----
|
||||
@@ -120,6 +120,7 @@ services:
|
||||
- ./data/api:/app/data
|
||||
- ./data/dns:/app/data/dns
|
||||
- ./config/api:/app/config
|
||||
- ./config/cosign:/app/config/cosign:ro
|
||||
- ./config/caddy:/app/config-caddy
|
||||
- ./config/wireguard:/app/config/wireguard
|
||||
- ./config/dns:/app/config/dns
|
||||
|
||||
+11
@@ -353,6 +353,17 @@ fi
|
||||
|
||||
sudo git config --system --add safe.directory "$PIC_DIR" 2>/dev/null || true
|
||||
|
||||
# The cosign public key ships in the repo and is bind-mounted into cell-api so
|
||||
# store-service image signatures can be verified offline. It is checked in
|
||||
# (config/cosign/cosign.pub), so the clone above should already provide it;
|
||||
# warn loudly if it is somehow missing rather than silently skipping verify.
|
||||
COSIGN_PUBKEY="${PIC_DIR}/config/cosign/cosign.pub"
|
||||
if [ -f "$COSIGN_PUBKEY" ]; then
|
||||
log_ok "cosign public key present at ${COSIGN_PUBKEY}"
|
||||
else
|
||||
log_warn "cosign public key missing at ${COSIGN_PUBKEY} — image signature verification will be unavailable"
|
||||
fi
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Step 5 — Run make install
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -517,5 +517,41 @@ class TestEmailManagerApply(unittest.TestCase):
|
||||
self.assertEqual(result['restarted'], [])
|
||||
|
||||
|
||||
class TestImageVerificationConfig(unittest.TestCase):
|
||||
"""image_verification config round-trip and warn-by-default behaviour."""
|
||||
|
||||
def setUp(self):
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.config_file = os.path.join(self.temp_dir, 'cell_config.json')
|
||||
self.data_dir = os.path.join(self.temp_dir, 'data')
|
||||
os.makedirs(self.data_dir, exist_ok=True)
|
||||
self.cm = ConfigManager(self.config_file, self.data_dir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_default_mode_is_warn(self):
|
||||
self.assertEqual(self.cm.get_image_verification_mode(), 'warn')
|
||||
self.assertEqual(self.cm.get_image_verification(), {'mode': 'warn'})
|
||||
|
||||
def test_set_and_get_round_trip(self):
|
||||
for mode in ('off', 'warn', 'enforce'):
|
||||
self.cm.set_image_verification_mode(mode)
|
||||
self.assertEqual(self.cm.get_image_verification_mode(), mode)
|
||||
|
||||
def test_set_mode_persists_across_reload(self):
|
||||
self.cm.set_image_verification_mode('enforce')
|
||||
cm2 = ConfigManager(self.config_file, self.data_dir)
|
||||
self.assertEqual(cm2.get_image_verification_mode(), 'enforce')
|
||||
|
||||
def test_invalid_mode_rejected(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.cm.set_image_verification_mode('paranoid')
|
||||
|
||||
def test_corrupt_section_falls_back_to_warn(self):
|
||||
self.cm.configs['image_verification'] = {'mode': 'bogus'}
|
||||
self.assertEqual(self.cm.get_image_verification_mode(), 'warn')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -22,8 +22,13 @@ from manifest_validator import (
|
||||
validate_manifest,
|
||||
validate_rendered_compose,
|
||||
validate_provision_hook,
|
||||
validate_build_context,
|
||||
BUILD_CONTEXT_MAX_FILES,
|
||||
BUILD_CONTEXT_MAX_BYTES,
|
||||
)
|
||||
|
||||
_ALPINE_PINNED = 'alpine@sha256:' + 'b' * 64
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
@@ -1646,5 +1651,96 @@ class TestWriteComposeValidation(unittest.TestCase):
|
||||
self.assertIn('security validation', str(ctx.exception))
|
||||
|
||||
|
||||
class TestValidateBuildContext(unittest.TestCase):
|
||||
"""Static Dockerfile lint (defense-in-depth)."""
|
||||
|
||||
def test_allowlisted_pinned_base_ok(self):
|
||||
df = f'FROM {_ALPINE_PINNED}\nRUN echo hi\n'
|
||||
ok, errs = validate_build_context(df)
|
||||
self.assertTrue(ok, errs)
|
||||
|
||||
def test_unpinned_base_rejected(self):
|
||||
ok, errs = validate_build_context('FROM alpine:3.19\n')
|
||||
self.assertFalse(ok)
|
||||
self.assertTrue(any('digest-pinned' in e for e in errs))
|
||||
|
||||
def test_non_allowlisted_base_rejected(self):
|
||||
df = 'FROM evil/image@sha256:' + 'c' * 64 + '\n'
|
||||
ok, errs = validate_build_context(df)
|
||||
self.assertFalse(ok)
|
||||
self.assertTrue(any('allowlist' in e for e in errs))
|
||||
|
||||
def test_from_scratch_rejected(self):
|
||||
ok, errs = validate_build_context('FROM scratch\n')
|
||||
self.assertFalse(ok)
|
||||
self.assertTrue(any('scratch' in e for e in errs))
|
||||
|
||||
def test_multistage_alias_reference_ok(self):
|
||||
df = (
|
||||
f'FROM {_ALPINE_PINNED} AS builder\n'
|
||||
'RUN echo build\n'
|
||||
'FROM builder\n'
|
||||
'RUN echo final\n'
|
||||
)
|
||||
ok, errs = validate_build_context(df)
|
||||
self.assertTrue(ok, errs)
|
||||
|
||||
def test_add_remote_url_rejected(self):
|
||||
df = f'FROM {_ALPINE_PINNED}\nADD https://evil.example/x.sh /x.sh\n'
|
||||
ok, errs = validate_build_context(df)
|
||||
self.assertFalse(ok)
|
||||
self.assertTrue(any('remote URL' in e for e in errs))
|
||||
|
||||
def test_add_local_path_ok(self):
|
||||
df = f'FROM {_ALPINE_PINNED}\nADD ./local.tar /opt/\n'
|
||||
ok, errs = validate_build_context(df)
|
||||
self.assertTrue(ok, errs)
|
||||
|
||||
def test_secret_named_arg_rejected(self):
|
||||
df = f'FROM {_ALPINE_PINNED}\nARG GITHUB_TOKEN\n'
|
||||
ok, errs = validate_build_context(df)
|
||||
self.assertFalse(ok)
|
||||
self.assertTrue(any('ARG' in e for e in errs))
|
||||
|
||||
def test_secret_named_env_rejected(self):
|
||||
df = f'FROM {_ALPINE_PINNED}\nENV API_SECRET=abc\n'
|
||||
ok, errs = validate_build_context(df)
|
||||
self.assertFalse(ok)
|
||||
self.assertTrue(any('ENV' in e for e in errs))
|
||||
|
||||
def test_benign_arg_env_ok(self):
|
||||
df = f'FROM {_ALPINE_PINNED}\nARG VERSION=1.0\nENV TZ=UTC\n'
|
||||
ok, errs = validate_build_context(df)
|
||||
self.assertTrue(ok, errs)
|
||||
|
||||
def test_no_from_rejected(self):
|
||||
ok, errs = validate_build_context('RUN echo hi\n')
|
||||
self.assertFalse(ok)
|
||||
|
||||
def test_empty_dockerfile_rejected(self):
|
||||
ok, errs = validate_build_context('')
|
||||
self.assertFalse(ok)
|
||||
|
||||
def test_oversized_context_rejected(self):
|
||||
df = f'FROM {_ALPINE_PINNED}\n'
|
||||
files = [('big.bin', BUILD_CONTEXT_MAX_BYTES + 1)]
|
||||
ok, errs = validate_build_context(df, context_files=files)
|
||||
self.assertFalse(ok)
|
||||
self.assertTrue(any('too large' in e for e in errs))
|
||||
|
||||
def test_too_many_files_rejected(self):
|
||||
df = f'FROM {_ALPINE_PINNED}\n'
|
||||
files = [(f'f{i}', 1) for i in range(BUILD_CONTEXT_MAX_FILES + 1)]
|
||||
ok, errs = validate_build_context(df, context_files=files)
|
||||
self.assertFalse(ok)
|
||||
self.assertTrue(any('too many files' in e for e in errs))
|
||||
|
||||
def test_small_context_ok(self):
|
||||
df = f'FROM {_ALPINE_PINNED}\n'
|
||||
files = [('Dockerfile', 100), ('entrypoint.sh', 500)]
|
||||
ok, errs = validate_build_context(df, context_files=files)
|
||||
self.assertTrue(ok, errs)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -773,5 +773,107 @@ class TestPerInstanceRender(unittest.TestCase):
|
||||
self.assertEqual(ServiceComposer.instance_id_for('conn_a1b2c3d4'), 'a1b2c3d4')
|
||||
|
||||
|
||||
# ── Image signature verification ───────────────────────────────────────────
|
||||
|
||||
_DIGEST = '@sha256:' + 'a' * 64
|
||||
_SIGNED_IMAGE = 'git.pic.ngo/roof/foo' + _DIGEST
|
||||
|
||||
|
||||
def _verify_composer(mode):
|
||||
cm = _make_cm()
|
||||
cm.get_image_verification_mode.return_value = mode
|
||||
c = ServiceComposer(config_manager=cm, data_dir='/fake/data')
|
||||
# Isolate verification from disk/docker: stub everything except verify logic.
|
||||
c.write_compose = MagicMock(return_value='')
|
||||
c.up = MagicMock(return_value={'ok': True})
|
||||
c._store_cmd = MagicMock(return_value={'ok': True})
|
||||
return c
|
||||
|
||||
|
||||
class TestImageVerification(unittest.TestCase):
|
||||
|
||||
def test_off_mode_skips_cosign(self):
|
||||
c = _verify_composer('off')
|
||||
c._cosign_verify = MagicMock()
|
||||
manifest = {'image': _SIGNED_IMAGE}
|
||||
result = c.install('svc', manifest, 'tpl')
|
||||
self.assertTrue(result['ok'])
|
||||
c._cosign_verify.assert_not_called()
|
||||
c.up.assert_called_once()
|
||||
|
||||
def test_good_signature_proceeds(self):
|
||||
c = _verify_composer('enforce')
|
||||
c._cosign_verify = MagicMock(return_value={'ok': True, 'stdout': '', 'stderr': ''})
|
||||
manifest = {'image': _SIGNED_IMAGE}
|
||||
result = c.install('svc', manifest, 'tpl')
|
||||
self.assertTrue(result['ok'])
|
||||
c._cosign_verify.assert_called_once()
|
||||
c.up.assert_called_once()
|
||||
|
||||
def test_enforce_bad_signature_aborts(self):
|
||||
c = _verify_composer('enforce')
|
||||
c._cosign_verify = MagicMock(return_value={'ok': False, 'stderr': 'no matching signatures'})
|
||||
manifest = {'image': _SIGNED_IMAGE}
|
||||
result = c.install('svc', manifest, 'tpl')
|
||||
self.assertFalse(result['ok'])
|
||||
self.assertIn('verification failed', result['error'])
|
||||
c.up.assert_not_called()
|
||||
|
||||
def test_warn_bad_signature_proceeds(self):
|
||||
c = _verify_composer('warn')
|
||||
c._cosign_verify = MagicMock(return_value={'ok': False, 'stderr': 'no matching signatures'})
|
||||
manifest = {'image': _SIGNED_IMAGE}
|
||||
result = c.install('svc', manifest, 'tpl')
|
||||
self.assertTrue(result['ok'])
|
||||
c.up.assert_called_once()
|
||||
|
||||
def test_enforce_undigested_image_aborts(self):
|
||||
c = _verify_composer('enforce')
|
||||
c._cosign_verify = MagicMock()
|
||||
manifest = {'image': 'git.pic.ngo/roof/foo:latest'}
|
||||
result = c.install('svc', manifest, 'tpl')
|
||||
self.assertFalse(result['ok'])
|
||||
self.assertIn('digest', result['error'])
|
||||
c._cosign_verify.assert_not_called()
|
||||
c.up.assert_not_called()
|
||||
|
||||
def test_warn_undigested_image_proceeds(self):
|
||||
c = _verify_composer('warn')
|
||||
c._cosign_verify = MagicMock()
|
||||
manifest = {'image': 'git.pic.ngo/roof/foo:latest'}
|
||||
result = c.install('svc', manifest, 'tpl')
|
||||
self.assertTrue(result['ok'])
|
||||
c._cosign_verify.assert_not_called()
|
||||
c.up.assert_called_once()
|
||||
|
||||
def test_cosign_verify_builds_expected_command(self):
|
||||
c = _verify_composer('enforce')
|
||||
with patch.object(c, '_run', return_value={'ok': True}) as mock_run:
|
||||
c._cosign_verify(_SIGNED_IMAGE)
|
||||
cmd = mock_run.call_args[0][0]
|
||||
self.assertEqual(cmd[0:2], ['cosign', 'verify'])
|
||||
self.assertIn('--key', cmd)
|
||||
self.assertIn(_SIGNED_IMAGE, cmd)
|
||||
|
||||
def test_enforce_pull_failure_aborts(self):
|
||||
c = _verify_composer('enforce')
|
||||
c._cosign_verify = MagicMock(return_value={'ok': True})
|
||||
c._store_cmd = MagicMock(return_value={'ok': False, 'stderr': 'manifest unknown'})
|
||||
manifest = {'image': _SIGNED_IMAGE}
|
||||
result = c.install('svc', manifest, 'tpl')
|
||||
self.assertFalse(result['ok'])
|
||||
self.assertIn('pull failed', result['error'])
|
||||
c.up.assert_not_called()
|
||||
|
||||
def test_warn_pull_failure_proceeds(self):
|
||||
c = _verify_composer('warn')
|
||||
c._cosign_verify = MagicMock(return_value={'ok': True})
|
||||
c._store_cmd = MagicMock(return_value={'ok': False, 'stderr': 'manifest unknown'})
|
||||
manifest = {'image': _SIGNED_IMAGE}
|
||||
result = c.install('svc', manifest, 'tpl')
|
||||
self.assertTrue(result['ok'])
|
||||
c.up.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -679,6 +679,40 @@ class TestInstall(unittest.TestCase):
|
||||
self.assertTrue(result['ok'])
|
||||
self.assertFalse(result.get('already_installed', False))
|
||||
|
||||
def test_install_enforce_rejects_undigested_image(self):
|
||||
"""Under enforce mode a store image without @sha256: pin is fatal."""
|
||||
manifest = _valid_manifest(
|
||||
id='myapp', container_name='cell-myapp',
|
||||
image='git.pic.ngo/roof/myapp:latest',
|
||||
)
|
||||
ssm, cm, _, composer = _make_ssm(manifest=manifest)
|
||||
cm.get_image_verification_mode.return_value = 'enforce'
|
||||
result = ssm.install('myapp')
|
||||
self.assertFalse(result['ok'])
|
||||
self.assertIn('digest', result['error'].lower())
|
||||
composer.install.assert_not_called()
|
||||
|
||||
def test_install_warn_allows_undigested_image(self):
|
||||
"""Under warn mode (default) an undigested image still installs."""
|
||||
manifest = _valid_manifest(
|
||||
id='myapp', container_name='cell-myapp',
|
||||
image='git.pic.ngo/roof/myapp:latest',
|
||||
)
|
||||
ssm, cm, _, composer = _make_ssm(manifest=manifest)
|
||||
cm.get_image_verification_mode.return_value = 'warn'
|
||||
result = ssm.install('myapp')
|
||||
self.assertTrue(result['ok'])
|
||||
composer.install.assert_called_once()
|
||||
|
||||
def test_install_enforce_allows_digested_image(self):
|
||||
"""Under enforce mode a properly digest-pinned image installs."""
|
||||
manifest = _valid_manifest(id='myapp', container_name='cell-myapp')
|
||||
ssm, cm, _, composer = _make_ssm(manifest=manifest)
|
||||
cm.get_image_verification_mode.return_value = 'enforce'
|
||||
result = ssm.install('myapp')
|
||||
self.assertTrue(result['ok'])
|
||||
composer.install.assert_called_once()
|
||||
|
||||
def test_install_without_composer_stores_record(self):
|
||||
"""When service_composer=None, skip compose but still store the install record."""
|
||||
manifest = _valid_manifest(id='myapp', container_name='cell-myapp')
|
||||
|
||||
Reference in New Issue
Block a user