Files
pic/api/manifest_validator.py
T
roof 7d5c5421f1
Unit Tests / test (push) Successful in 11m31s
Implement connectivity store services (wireguard-ext, openvpn-client, tor)
- ConnectivityManager: move config dirs to data_dir/services/<id>/config so
  Docker can bind-mount them into store-service containers (Docker resolves
  bind-mount paths on the host, not inside the API container).  Add
  _migrate_legacy_configs to copy existing files from the old config_dir
  location on first boot.

- manifest_validator: add allow_host_network parameter to
  validate_rendered_compose.  When True, waives the external-network
  requirement, permits network_mode: host, and allows devices: — all needed
  by VPN/Tor containers that must share the host network namespace to create
  tun/wg interfaces.  Non-host services are unaffected.

- service_composer: read requires_host_network from the manifest and pass
  allow_host_network=True to validate_rendered_compose for connectivity
  services.

- Tests: update file-path assertions to new data_dir layout; add
  TestMigrateLegacyConfigs, TestValidateRenderedComposeHostNetwork, and
  two TestWriteCompose cases for the host-network path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-30 10:06:48 -04:00

353 lines
13 KiB
Python

"""
manifest_validator — single chokepoint for all manifest and compose YAML security checks.
Both ServiceComposer and ServiceStoreManager import from here so validation logic
lives in exactly one place and cannot be bypassed by taking either code path.
"""
import logging
import re
import yaml
logger = logging.getLogger('picell')
_SUBDOMAIN_RE = re.compile(r'^[a-z][a-z0-9-]{0,30}$')
_BACKEND_RE = re.compile(r'^[A-Za-z0-9._-]+:\d{1,5}$')
_CAP_ALLOWLIST = frozenset({
'NET_ADMIN', 'NET_RAW', 'NET_BIND_SERVICE', 'CHOWN', 'DAC_OVERRIDE',
'SETUID', 'SETGID', 'KILL', 'SYS_NICE',
})
_CAP_DENYLIST = frozenset({
'ALL', 'SYS_ADMIN', 'SYS_MODULE', 'SYS_PTRACE', 'SYS_RAWIO',
'SYS_BOOT', 'MAC_ADMIN', 'MAC_OVERRIDE', 'SYS_TIME', 'SYS_TTY_CONFIG',
})
_RESERVED_SUBDOMAINS = frozenset({
# Core PIC infrastructure — never allow store services to hijack these
'api', 'webui', 'admin', 'www', 'ns1', 'ns2', 'git', 'registry', 'install',
# 'mail', 'calendar', 'files', 'webdav', 'webmail' are intentionally absent:
# they belong to official PIC store services and must be claimable by them.
})
_BACKEND_DENYLIST = frozenset({
'cell-api', 'cell-caddy', 'cell-coredns', 'cell-dnsmasq',
'cell-wireguard', 'cell-vault', 'localhost', '127.0.0.1',
'0.0.0.0', 'host.docker.internal',
})
_RESERVED_CONTAINER_NAMES = frozenset({
'cell-api', 'cell-caddy', 'cell-webui', 'cell-coredns',
'cell-dnsmasq', 'cell-wireguard', 'cell-chrony',
})
_CONTAINER_NAME_RE = re.compile(r'^cell-[a-z0-9][a-z0-9-]{0,30}$')
_ENV_VALUE_RE = re.compile(r'^[A-Za-z0-9._@:/+\-]{0,256}$')
_HOOK_BINARY_RE = re.compile(r'^[a-z][a-z0-9_-]{0,31}$')
_CAP_NAME_RE = re.compile(r'^[A-Z_]+$')
_ID_RE = re.compile(r'^[a-z][a-z0-9_-]{0,30}$')
_IMAGE_DIGEST_RE = re.compile(
r'^git\.pic\.ngo/roof/[a-zA-Z0-9._/-]+@sha256:[0-9a-f]{64}$'
)
def validate_manifest(manifest: dict) -> tuple:
"""
Validate security-relevant fields of a store manifest.
Returns (True, []) when all checks pass; (False, [error_strings]) otherwise.
Does not replace the existing _validate_manifest in ServiceStoreManager —
it supplements it as a second layer focused on security-critical fields.
"""
errors = []
# schema_version must be 3
schema_version = manifest.get('schema_version')
if schema_version is not None and schema_version != 3:
errors.append(
f'schema_version must be 3, got: {schema_version!r}'
)
# kind must be "store" if present — reject builtins coming in over the wire
kind = manifest.get('kind')
if kind is not None and kind != 'store':
errors.append(f'manifest kind must be "store", got: {kind!r}')
# id format check
manifest_id = manifest.get('id')
if manifest_id is not None:
if not isinstance(manifest_id, str) or not _ID_RE.match(manifest_id):
errors.append(
f'id must match ^[a-z][a-z0-9_-]{{0,30}}$, got: {manifest_id!r}'
)
# image must come from git.pic.ngo/roof/*; if a digest IS provided it must be
# valid; first-party images without a digest pin are allowed with a warning.
image = manifest.get('image')
if image is not None:
if not isinstance(image, str):
errors.append(f'image must be a string, got: {image!r}')
elif not image.startswith('git.pic.ngo/roof/'):
errors.append(
f'image must be from git.pic.ngo/roof/*, got: {image!r}'
)
elif '@sha256:' in image:
if not _IMAGE_DIGEST_RE.match(image):
errors.append(
f'image digest must match @sha256:<64-hex>, got: {image!r}'
)
else:
logger.warning('manifest image %s has no digest pin', image)
# container_name structural check
cname = manifest.get('container_name')
if cname is not None:
if not _CONTAINER_NAME_RE.match(cname):
errors.append(
f'container_name must match ^cell-[a-z0-9][a-z0-9-]{{0,30}}$, got: {cname!r}'
)
elif cname in _RESERVED_CONTAINER_NAMES:
errors.append(f'container_name is reserved: {cname!r}')
# subdomain
subdomain = manifest.get('subdomain')
if subdomain is not None:
_check_subdomain(subdomain, 'subdomain', errors)
# extra_subdomains
for sub in manifest.get('extra_subdomains') or []:
_check_subdomain(sub, 'extra_subdomains entry', errors)
# backend
backend = manifest.get('backend')
if backend is not None:
_check_backend(backend, 'backend', errors)
# extra_backends
for sub_key, bknd_val in (manifest.get('extra_backends') or {}).items():
_check_backend(bknd_val, f'extra_backends[{sub_key!r}]', errors)
# cap_add
cap_add = manifest.get('cap_add')
if cap_add is not None:
if not isinstance(cap_add, list):
errors.append('cap_add must be a list')
else:
for cap in cap_add:
if not isinstance(cap, str):
errors.append(f'cap_add entry must be a string, got: {cap!r}')
continue
if not _CAP_NAME_RE.match(cap):
errors.append(f'cap_add entry must match ^[A-Z_]+$, got: {cap!r}')
continue
if cap in _CAP_DENYLIST:
errors.append(f'cap_add entry is explicitly denied: {cap}')
elif cap not in _CAP_ALLOWLIST:
errors.append(f'cap_add entry not in allowlist: {cap}')
# env values
for env_entry in manifest.get('env') or []:
val = str(env_entry.get('value', ''))
if not _ENV_VALUE_RE.match(val):
errors.append(
f'env[].value contains disallowed characters: {val!r}'
)
# provision_hook
hook = (manifest.get('accounts') or {}).get('provision_hook')
if hook is not None:
ok, msg = validate_provision_hook(hook)
if not ok:
errors.append(msg)
return (len(errors) == 0, errors)
def validate_rendered_compose(yaml_text: str, allowed_data_dir: str = None,
allow_host_network: bool = False) -> tuple:
"""
Parse and security-validate a rendered docker-compose YAML string.
Returns (True, []) when safe; (False, [error_strings]) otherwise.
Rejects constructs that would give a store service elevated access to the host.
allowed_data_dir: when set, absolute bind mounts under this prefix are
permitted — they come from ${PIC_DATA_DIR} substitution and land in the
designated service data directory.
allow_host_network: when True, the compose file is permitted to use
network_mode: host and devices: — required for connectivity services
(wireguard-ext, openvpn-client, tor) that must share the host network
namespace to create tun/wg interfaces. The external-network requirement
is also waived since host-network containers reach the cell network directly.
"""
errors = []
try:
doc = yaml.safe_load(yaml_text)
except yaml.YAMLError as exc:
return (False, [f'YAML parse error: {exc}'])
if not isinstance(doc, dict):
return (False, ['compose file must be a YAML mapping'])
# Regular (bridged) services must join the cell-network so Caddy and CoreDNS
# can reach them. Host-network services share the host namespace directly,
# so the external network declaration would be wrong and is omitted.
if not allow_host_network:
networks = doc.get('networks') or {}
has_external = any(
isinstance(v, dict) and v.get('external')
for v in networks.values()
)
if not has_external:
errors.append(
'compose file must declare at least one network with external: true'
)
for svc_name, svc in (doc.get('services') or {}).items():
if not isinstance(svc, dict):
continue
prefix = f'service {svc_name!r}'
cname = svc.get('container_name')
if cname is not None and cname in _RESERVED_CONTAINER_NAMES:
errors.append(f'{prefix}: container_name {cname!r} is reserved')
if svc.get('privileged') is True:
errors.append(f'{prefix}: privileged: true is not allowed')
net_mode = svc.get('network_mode')
if allow_host_network:
if net_mode is not None and net_mode not in ('host',):
errors.append(
f'{prefix}: network_mode {net_mode!r} is not allowed '
'(connectivity services must use host)'
)
else:
if net_mode is not None and net_mode not in (None, 'bridge'):
errors.append(
f'{prefix}: network_mode {net_mode!r} is not allowed (only bridge)'
)
if svc.get('pid') == 'host':
errors.append(f'{prefix}: pid: host is not allowed')
if svc.get('ipc') == 'host':
errors.append(f'{prefix}: ipc: host is not allowed')
if svc.get('userns_mode') == 'host':
errors.append(f'{prefix}: userns_mode: host is not allowed')
# cap_add
for cap in svc.get('cap_add') or []:
cap_str = str(cap)
if cap_str in _CAP_DENYLIST:
errors.append(f'{prefix}: cap_add {cap_str!r} is explicitly denied')
elif cap_str not in _CAP_ALLOWLIST:
errors.append(f'{prefix}: cap_add {cap_str!r} not in allowlist')
# volumes — reject absolute host-side bind mounts unless they're under
# the sanctioned data directory (injected by ServiceComposer via PIC_DATA_DIR)
for vol in svc.get('volumes') or []:
vol_str = str(vol)
src = vol_str.split(':')[0] if ':' in vol_str else vol_str
if src.startswith('/'):
if allowed_data_dir and src.startswith(allowed_data_dir):
continue
errors.append(
f'{prefix}: absolute host bind mount not allowed: {vol_str!r}'
)
if 'devices' in svc and not allow_host_network:
errors.append(f'{prefix}: devices key is not allowed')
for opt in svc.get('security_opt') or []:
opt_str = str(opt)
if opt_str in ('apparmor=unconfined', 'seccomp=unconfined'):
errors.append(
f'{prefix}: security_opt {opt_str!r} is not allowed'
)
# command must be a list — string form passes through the shell
cmd = svc.get('command')
if cmd is not None and isinstance(cmd, str):
errors.append(
f'{prefix}: command must be a list, not a shell string'
)
# entrypoint must also be a list for the same reason
ep = svc.get('entrypoint')
if ep is not None and isinstance(ep, str):
errors.append(
f'{prefix}: entrypoint must be a list, not a shell string'
)
return (len(errors) == 0, errors)
def validate_provision_hook(hook) -> tuple:
"""
Validate a provision_hook value from accounts.provision_hook.
Acceptable: None/absent, or a dict {"argv": ["binary", "arg1", ...]}.
Rejected: any plain string (shell injection risk), empty argv, uppercase binary,
NUL bytes in any element.
Returns (True, "") on success; (False, error_string) on failure.
"""
if hook is None:
return (True, '')
if isinstance(hook, str):
return (
False,
'provision_hook must be an argv list dict {"argv": [...]}, not a shell string',
)
if not isinstance(hook, dict):
return (False, 'provision_hook must be a dict with argv list')
argv = hook.get('argv')
if not isinstance(argv, list) or len(argv) == 0:
return (False, 'provision_hook.argv must be a non-empty list')
# NUL-byte check must precede regex check so the error message is unambiguous.
for elem in argv:
if isinstance(elem, str) and '\x00' in elem:
return (False, 'provision_hook.argv element contains NUL byte')
binary = argv[0]
if not isinstance(binary, str) or not _HOOK_BINARY_RE.match(binary):
return (
False,
f'provision_hook.argv[0] must match ^[a-z][a-z0-9_-]{{0,31}}$, got: {binary!r}',
)
return (True, '')
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _check_subdomain(value, field_name: str, errors: list) -> None:
if not isinstance(value, str):
errors.append(f'{field_name} must be a string')
return
if value in _RESERVED_SUBDOMAINS:
errors.append(f'{field_name} is reserved: {value!r}')
elif not _SUBDOMAIN_RE.match(value):
errors.append(
f'{field_name} must match ^[a-z][a-z0-9-]{{0,30}}$, got: {value!r}'
)
def _check_backend(value, field_name: str, errors: list) -> None:
if not isinstance(value, str):
errors.append(f'{field_name} must be a string')
return
if not _BACKEND_RE.match(value):
errors.append(
f'{field_name} must be host:port (e.g. cell-foo:8080), got: {value!r}'
)
return
host = value.split(':')[0]
if host in _BACKEND_DENYLIST:
errors.append(f'{field_name} host {host!r} is in the backend denylist')