1f2f9d9f6e
Unit Tests / test (push) Successful in 11m18s
Rejects privileged compose configs (network_mode:host, pid:host, ipc:host, userns_mode:host, cap_add:ALL, string commands, missing cell-network, reserved container names). Validates manifest schema_version=3, image digest pinning (sha256 required, :tag-only rejected), and provision hook format. Wired into ServiceComposer.write_compose() and ServiceStoreManager.install() as a single enforcement point. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
324 lines
12 KiB
Python
324 lines
12 KiB
Python
"""
|
|
manifest_validator — single chokepoint for all manifest and compose YAML security checks.
|
|
|
|
Both ServiceComposer and ServiceStoreManager import from here so validation logic
|
|
lives in exactly one place and cannot be bypassed by taking either code path.
|
|
"""
|
|
|
|
import re
|
|
import yaml
|
|
|
|
_SUBDOMAIN_RE = re.compile(r'^[a-z][a-z0-9-]{0,30}$')
|
|
_BACKEND_RE = re.compile(r'^[A-Za-z0-9._-]+:\d{1,5}$')
|
|
_CAP_ALLOWLIST = frozenset({
|
|
'NET_ADMIN', 'NET_RAW', 'NET_BIND_SERVICE', 'CHOWN', 'DAC_OVERRIDE',
|
|
'SETUID', 'SETGID', 'KILL', 'SYS_NICE',
|
|
})
|
|
_CAP_DENYLIST = frozenset({
|
|
'ALL', 'SYS_ADMIN', 'SYS_MODULE', 'SYS_PTRACE', 'SYS_RAWIO',
|
|
'SYS_BOOT', 'MAC_ADMIN', 'MAC_OVERRIDE', 'SYS_TIME', 'SYS_TTY_CONFIG',
|
|
})
|
|
_RESERVED_SUBDOMAINS = frozenset({
|
|
# Core PIC infrastructure — never allow store services to hijack these
|
|
'api', 'webui', 'admin', 'www', 'ns1', 'ns2', 'git', 'registry', 'install',
|
|
# 'mail', 'calendar', 'files', 'webdav', 'webmail' are intentionally absent:
|
|
# they belong to official PIC store services and must be claimable by them.
|
|
})
|
|
_BACKEND_DENYLIST = frozenset({
|
|
'cell-api', 'cell-caddy', 'cell-coredns', 'cell-dnsmasq',
|
|
'cell-wireguard', 'cell-vault', 'localhost', '127.0.0.1',
|
|
'0.0.0.0', 'host.docker.internal',
|
|
})
|
|
_RESERVED_CONTAINER_NAMES = frozenset({
|
|
'cell-api', 'cell-caddy', 'cell-webui', 'cell-coredns',
|
|
'cell-dnsmasq', 'cell-wireguard', 'cell-chrony',
|
|
})
|
|
_CONTAINER_NAME_RE = re.compile(r'^cell-[a-z0-9][a-z0-9-]{0,30}$')
|
|
_ENV_VALUE_RE = re.compile(r'^[A-Za-z0-9._@:/+\-]{0,256}$')
|
|
_HOOK_BINARY_RE = re.compile(r'^[a-z][a-z0-9_-]{0,31}$')
|
|
_CAP_NAME_RE = re.compile(r'^[A-Z_]+$')
|
|
_ID_RE = re.compile(r'^[a-z][a-z0-9_-]{0,30}$')
|
|
_IMAGE_DIGEST_RE = re.compile(
|
|
r'^git\.pic\.ngo/roof/[a-zA-Z0-9._/-]+@sha256:[0-9a-f]{64}$'
|
|
)
|
|
|
|
|
|
def validate_manifest(manifest: dict) -> tuple:
|
|
"""
|
|
Validate security-relevant fields of a store manifest.
|
|
|
|
Returns (True, []) when all checks pass; (False, [error_strings]) otherwise.
|
|
Does not replace the existing _validate_manifest in ServiceStoreManager —
|
|
it supplements it as a second layer focused on security-critical fields.
|
|
"""
|
|
errors = []
|
|
|
|
# schema_version must be 3
|
|
schema_version = manifest.get('schema_version')
|
|
if schema_version is not None and schema_version != 3:
|
|
errors.append(
|
|
f'schema_version must be 3, got: {schema_version!r}'
|
|
)
|
|
|
|
# kind must be "store" if present — reject builtins coming in over the wire
|
|
kind = manifest.get('kind')
|
|
if kind is not None and kind != 'store':
|
|
errors.append(f'manifest kind must be "store", got: {kind!r}')
|
|
|
|
# id format check
|
|
manifest_id = manifest.get('id')
|
|
if manifest_id is not None:
|
|
if not isinstance(manifest_id, str) or not _ID_RE.match(manifest_id):
|
|
errors.append(
|
|
f'id must match ^[a-z][a-z0-9_-]{{0,30}}$, got: {manifest_id!r}'
|
|
)
|
|
|
|
# image must be digest-pinned from git.pic.ngo/roof/*
|
|
image = manifest.get('image')
|
|
if image is not None:
|
|
if not isinstance(image, str) or not _IMAGE_DIGEST_RE.match(image):
|
|
errors.append(
|
|
f'image must match git.pic.ngo/roof/*@sha256:<64-hex>, got: {image!r}'
|
|
)
|
|
|
|
# container_name structural check
|
|
cname = manifest.get('container_name')
|
|
if cname is not None:
|
|
if not _CONTAINER_NAME_RE.match(cname):
|
|
errors.append(
|
|
f'container_name must match ^cell-[a-z0-9][a-z0-9-]{{0,30}}$, got: {cname!r}'
|
|
)
|
|
elif cname in _RESERVED_CONTAINER_NAMES:
|
|
errors.append(f'container_name is reserved: {cname!r}')
|
|
|
|
# subdomain
|
|
subdomain = manifest.get('subdomain')
|
|
if subdomain is not None:
|
|
_check_subdomain(subdomain, 'subdomain', errors)
|
|
|
|
# extra_subdomains
|
|
for sub in manifest.get('extra_subdomains') or []:
|
|
_check_subdomain(sub, 'extra_subdomains entry', errors)
|
|
|
|
# backend
|
|
backend = manifest.get('backend')
|
|
if backend is not None:
|
|
_check_backend(backend, 'backend', errors)
|
|
|
|
# extra_backends
|
|
for sub_key, bknd_val in (manifest.get('extra_backends') or {}).items():
|
|
_check_backend(bknd_val, f'extra_backends[{sub_key!r}]', errors)
|
|
|
|
# cap_add
|
|
cap_add = manifest.get('cap_add')
|
|
if cap_add is not None:
|
|
if not isinstance(cap_add, list):
|
|
errors.append('cap_add must be a list')
|
|
else:
|
|
for cap in cap_add:
|
|
if not isinstance(cap, str):
|
|
errors.append(f'cap_add entry must be a string, got: {cap!r}')
|
|
continue
|
|
if not _CAP_NAME_RE.match(cap):
|
|
errors.append(f'cap_add entry must match ^[A-Z_]+$, got: {cap!r}')
|
|
continue
|
|
if cap in _CAP_DENYLIST:
|
|
errors.append(f'cap_add entry is explicitly denied: {cap}')
|
|
elif cap not in _CAP_ALLOWLIST:
|
|
errors.append(f'cap_add entry not in allowlist: {cap}')
|
|
|
|
# env values
|
|
for env_entry in manifest.get('env') or []:
|
|
val = str(env_entry.get('value', ''))
|
|
if not _ENV_VALUE_RE.match(val):
|
|
errors.append(
|
|
f'env[].value contains disallowed characters: {val!r}'
|
|
)
|
|
|
|
# provision_hook
|
|
hook = (manifest.get('accounts') or {}).get('provision_hook')
|
|
if hook is not None:
|
|
ok, msg = validate_provision_hook(hook)
|
|
if not ok:
|
|
errors.append(msg)
|
|
|
|
return (len(errors) == 0, errors)
|
|
|
|
|
|
def validate_rendered_compose(yaml_text: str, allowed_data_dir: str = None) -> tuple:
|
|
"""
|
|
Parse and security-validate a rendered docker-compose YAML string.
|
|
|
|
Returns (True, []) when safe; (False, [error_strings]) otherwise.
|
|
Rejects constructs that would give a store service elevated access to the host.
|
|
|
|
allowed_data_dir: when set, absolute bind mounts under this prefix are
|
|
permitted — they come from ${PIC_DATA_DIR} substitution and land in the
|
|
designated service data directory.
|
|
"""
|
|
errors = []
|
|
|
|
try:
|
|
doc = yaml.safe_load(yaml_text)
|
|
except yaml.YAMLError as exc:
|
|
return (False, [f'YAML parse error: {exc}'])
|
|
|
|
if not isinstance(doc, dict):
|
|
return (False, ['compose file must be a YAML mapping'])
|
|
|
|
# At least one external network must exist so the container joins the cell network
|
|
# rather than an isolated bridge that would be invisible to Caddy and CoreDNS.
|
|
networks = doc.get('networks') or {}
|
|
has_external = any(
|
|
isinstance(v, dict) and v.get('external')
|
|
for v in networks.values()
|
|
)
|
|
if not has_external:
|
|
errors.append(
|
|
'compose file must declare at least one network with external: true'
|
|
)
|
|
|
|
for svc_name, svc in (doc.get('services') or {}).items():
|
|
if not isinstance(svc, dict):
|
|
continue
|
|
prefix = f'service {svc_name!r}'
|
|
|
|
cname = svc.get('container_name')
|
|
if cname is not None and cname in _RESERVED_CONTAINER_NAMES:
|
|
errors.append(f'{prefix}: container_name {cname!r} is reserved')
|
|
|
|
if svc.get('privileged') is True:
|
|
errors.append(f'{prefix}: privileged: true is not allowed')
|
|
|
|
net_mode = svc.get('network_mode')
|
|
if net_mode is not None and net_mode not in (None, 'bridge'):
|
|
errors.append(
|
|
f'{prefix}: network_mode {net_mode!r} is not allowed (only bridge)'
|
|
)
|
|
|
|
if svc.get('pid') == 'host':
|
|
errors.append(f'{prefix}: pid: host is not allowed')
|
|
|
|
if svc.get('ipc') == 'host':
|
|
errors.append(f'{prefix}: ipc: host is not allowed')
|
|
|
|
if svc.get('userns_mode') == 'host':
|
|
errors.append(f'{prefix}: userns_mode: host is not allowed')
|
|
|
|
# cap_add
|
|
for cap in svc.get('cap_add') or []:
|
|
cap_str = str(cap)
|
|
if cap_str in _CAP_DENYLIST:
|
|
errors.append(f'{prefix}: cap_add {cap_str!r} is explicitly denied')
|
|
elif cap_str not in _CAP_ALLOWLIST:
|
|
errors.append(f'{prefix}: cap_add {cap_str!r} not in allowlist')
|
|
|
|
# volumes — reject absolute host-side bind mounts unless they're under
|
|
# the sanctioned data directory (injected by ServiceComposer via PIC_DATA_DIR)
|
|
for vol in svc.get('volumes') or []:
|
|
vol_str = str(vol)
|
|
src = vol_str.split(':')[0] if ':' in vol_str else vol_str
|
|
if src.startswith('/'):
|
|
if allowed_data_dir and src.startswith(allowed_data_dir):
|
|
continue
|
|
errors.append(
|
|
f'{prefix}: absolute host bind mount not allowed: {vol_str!r}'
|
|
)
|
|
|
|
if 'devices' in svc:
|
|
errors.append(f'{prefix}: devices key is not allowed')
|
|
|
|
for opt in svc.get('security_opt') or []:
|
|
opt_str = str(opt)
|
|
if opt_str in ('apparmor=unconfined', 'seccomp=unconfined'):
|
|
errors.append(
|
|
f'{prefix}: security_opt {opt_str!r} is not allowed'
|
|
)
|
|
|
|
# command must be a list — string form passes through the shell
|
|
cmd = svc.get('command')
|
|
if cmd is not None and isinstance(cmd, str):
|
|
errors.append(
|
|
f'{prefix}: command must be a list, not a shell string'
|
|
)
|
|
|
|
# entrypoint must also be a list for the same reason
|
|
ep = svc.get('entrypoint')
|
|
if ep is not None and isinstance(ep, str):
|
|
errors.append(
|
|
f'{prefix}: entrypoint must be a list, not a shell string'
|
|
)
|
|
|
|
return (len(errors) == 0, errors)
|
|
|
|
|
|
def validate_provision_hook(hook) -> tuple:
|
|
"""
|
|
Validate a provision_hook value from accounts.provision_hook.
|
|
|
|
Acceptable: None/absent, or a dict {"argv": ["binary", "arg1", ...]}.
|
|
Rejected: any plain string (shell injection risk), empty argv, uppercase binary,
|
|
NUL bytes in any element.
|
|
|
|
Returns (True, "") on success; (False, error_string) on failure.
|
|
"""
|
|
if hook is None:
|
|
return (True, '')
|
|
|
|
if isinstance(hook, str):
|
|
return (
|
|
False,
|
|
'provision_hook must be an argv list dict {"argv": [...]}, not a shell string',
|
|
)
|
|
|
|
if not isinstance(hook, dict):
|
|
return (False, 'provision_hook must be a dict with argv list')
|
|
|
|
argv = hook.get('argv')
|
|
if not isinstance(argv, list) or len(argv) == 0:
|
|
return (False, 'provision_hook.argv must be a non-empty list')
|
|
|
|
# NUL-byte check must precede regex check so the error message is unambiguous.
|
|
for elem in argv:
|
|
if isinstance(elem, str) and '\x00' in elem:
|
|
return (False, 'provision_hook.argv element contains NUL byte')
|
|
|
|
binary = argv[0]
|
|
if not isinstance(binary, str) or not _HOOK_BINARY_RE.match(binary):
|
|
return (
|
|
False,
|
|
f'provision_hook.argv[0] must match ^[a-z][a-z0-9_-]{{0,31}}$, got: {binary!r}',
|
|
)
|
|
|
|
return (True, '')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _check_subdomain(value, field_name: str, errors: list) -> None:
|
|
if not isinstance(value, str):
|
|
errors.append(f'{field_name} must be a string')
|
|
return
|
|
if value in _RESERVED_SUBDOMAINS:
|
|
errors.append(f'{field_name} is reserved: {value!r}')
|
|
elif not _SUBDOMAIN_RE.match(value):
|
|
errors.append(
|
|
f'{field_name} must match ^[a-z][a-z0-9-]{{0,30}}$, got: {value!r}'
|
|
)
|
|
|
|
|
|
def _check_backend(value, field_name: str, errors: list) -> None:
|
|
if not isinstance(value, str):
|
|
errors.append(f'{field_name} must be a string')
|
|
return
|
|
if not _BACKEND_RE.match(value):
|
|
errors.append(
|
|
f'{field_name} must be host:port (e.g. cell-foo:8080), got: {value!r}'
|
|
)
|
|
return
|
|
host = value.split(':')[0]
|
|
if host in _BACKEND_DENYLIST:
|
|
errors.append(f'{field_name} host {host!r} is in the backend denylist')
|