feat: add EgressManager — per-service egress enforcement via host iptables
Unit Tests / test (push) Successful in 11m20s
Unit Tests / test (push) Successful in 11m20s
Routes outbound traffic from installed service containers through alternate exits (wireguard_ext, openvpn, tor) using host-side iptables fwmark policy-routing in a dedicated PIC_EGRESS chain. Marks 0x110/0x120/0x130 are distinct from ConnectivityManager's 0x10/0x20/0x30. Container IPs discovered at runtime via docker inspect. Wired into ServiceStoreManager install/remove lifecycle and managers.py singleton. 22 new tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,352 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
EgressManager — per-service egress enforcement.
|
||||
|
||||
Routes outbound traffic from installed service containers through
|
||||
alternate exits (wireguard_ext, openvpn, tor) using host-side
|
||||
iptables fwmark policy-routing. Integrates with ServiceStoreManager
|
||||
for install/remove lifecycle hooks.
|
||||
|
||||
Rules live on the HOST in PIC_EGRESS chains in the mangle and nat
|
||||
tables. Container IPs are discovered via docker inspect using the
|
||||
container_name from the service manifest. Marks are distinct from
|
||||
ConnectivityManager to prevent rule collisions.
|
||||
"""
|
||||
import logging
|
||||
import subprocess
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
EXIT_TYPES = ("default", "wireguard_ext", "openvpn", "tor")
|
||||
|
||||
# fwmark values — must not collide with ConnectivityManager (0x10, 0x20, 0x30)
|
||||
MARKS = {"wireguard_ext": 0x110, "openvpn": 0x120, "tor": 0x130}
|
||||
|
||||
# Policy routing table IDs
|
||||
TABLES = {"wireguard_ext": 210, "openvpn": 220, "tor": 230}
|
||||
|
||||
EGRESS_CHAIN = "PIC_EGRESS"
|
||||
|
||||
# Transparent proxy port used by Tor
|
||||
_TOR_TRANS_PORT = 9040
|
||||
|
||||
|
||||
class EgressManager:
|
||||
"""Per-service egress enforcement via host iptables fwmark policy-routing."""
|
||||
|
||||
def __init__(self, config_manager, service_store_manager=None,
|
||||
data_dir: str = "/app/data", config_dir: str = "/app/config"):
|
||||
self.config_manager = config_manager
|
||||
self.service_store_manager = service_store_manager
|
||||
self._data_dir = data_dir
|
||||
self._config_dir = config_dir
|
||||
|
||||
# ── Public API ─────────────────────────────────────────────────────────
|
||||
|
||||
def apply_service(self, service_id: str) -> Dict[str, Any]:
|
||||
"""Idempotently apply egress rules for one installed service.
|
||||
|
||||
Steps:
|
||||
1. Look up the service manifest.
|
||||
2. clear_service first (ensures idempotency).
|
||||
3. If the manifest has no egress block, skip silently.
|
||||
4. Discover the container IP.
|
||||
5. Resolve the exit type (override > manifest default > 'default').
|
||||
6. If exit is 'default', return early with no rules.
|
||||
7. Otherwise create chains, ensure ip rules, add mark rules.
|
||||
"""
|
||||
manifest = self._get_manifest(service_id)
|
||||
if manifest is None:
|
||||
return {'ok': False, 'error': f'manifest not found for {service_id}'}
|
||||
|
||||
# Always clear first for idempotency
|
||||
self.clear_service(service_id)
|
||||
|
||||
if not self._has_egress(manifest):
|
||||
return {'ok': True, 'skipped': True}
|
||||
|
||||
container_name = manifest.get('container_name', '')
|
||||
container_ip = self._discover_container_ip(container_name)
|
||||
if not container_ip:
|
||||
return {'ok': False, 'error': 'container IP not discoverable'}
|
||||
|
||||
exit_via = self._resolve_exit(service_id, manifest)
|
||||
|
||||
# Validate exit_via is a known, non-default value
|
||||
if exit_via not in EXIT_TYPES:
|
||||
return {
|
||||
'ok': False,
|
||||
'error': f'unknown exit_via {exit_via!r}; must be one of {EXIT_TYPES}',
|
||||
}
|
||||
|
||||
if exit_via == 'default':
|
||||
return {'ok': True, 'exit_via': 'default'}
|
||||
|
||||
if exit_via not in MARKS:
|
||||
return {
|
||||
'ok': False,
|
||||
'error': f'unknown exit_via {exit_via!r}; must be one of {EXIT_TYPES}',
|
||||
}
|
||||
|
||||
try:
|
||||
self._ensure_chains()
|
||||
self._ensure_host_ip_rules()
|
||||
self._add_mark_rule(container_ip, MARKS[exit_via], service_id)
|
||||
if exit_via == 'tor':
|
||||
self._add_tor_redirect(container_ip, service_id)
|
||||
except Exception as exc:
|
||||
logger.error('apply_service(%s): %s', service_id, exc)
|
||||
return {'ok': False, 'error': str(exc)}
|
||||
|
||||
return {'ok': True, 'exit_via': exit_via, 'container_ip': container_ip}
|
||||
|
||||
def clear_service(self, service_id: str) -> Dict[str, Any]:
|
||||
"""Remove all PIC_EGRESS rules tagged for this service."""
|
||||
try:
|
||||
self._clear_egress_rules(service_id)
|
||||
return {'ok': True}
|
||||
except Exception as exc:
|
||||
logger.error('clear_service(%s): %s', service_id, exc)
|
||||
return {'ok': False, 'error': str(exc)}
|
||||
|
||||
def apply_all(self) -> Dict[str, Any]:
|
||||
"""Apply egress rules for every installed service that has a manifest."""
|
||||
installed = self.config_manager.get_installed_services()
|
||||
results: Dict[str, Any] = {}
|
||||
for svc_id, record in installed.items():
|
||||
if not isinstance(record, dict) or not record.get('manifest'):
|
||||
continue
|
||||
results[svc_id] = self.apply_service(svc_id)
|
||||
return {'ok': True, 'services': results}
|
||||
|
||||
def set_service_exit(self, service_id: str, exit_type: str) -> Dict[str, Any]:
|
||||
"""Persist a per-service egress override and immediately reapply rules.
|
||||
|
||||
exit_type must appear in the manifest's egress.allowed list.
|
||||
"""
|
||||
manifest = self._get_manifest(service_id)
|
||||
if manifest is None:
|
||||
return {'ok': False, 'error': f'service {service_id!r} not installed'}
|
||||
|
||||
if not self._has_egress(manifest):
|
||||
return {'ok': False, 'error': f'service {service_id!r} has no egress configuration'}
|
||||
|
||||
egress = manifest.get('egress', {})
|
||||
allowed = egress.get('allowed', list(EXIT_TYPES))
|
||||
|
||||
if exit_type not in allowed:
|
||||
return {
|
||||
'ok': False,
|
||||
'error': (
|
||||
f'exit_type {exit_type!r} is not in the allowed list '
|
||||
f'for {service_id}: {allowed}'
|
||||
),
|
||||
}
|
||||
|
||||
if exit_type not in EXIT_TYPES:
|
||||
return {
|
||||
'ok': False,
|
||||
'error': f'unknown exit_type {exit_type!r}; must be one of {EXIT_TYPES}',
|
||||
}
|
||||
|
||||
# Persist the override so it survives restarts
|
||||
overrides = self._get_egress_overrides()
|
||||
overrides[service_id] = exit_type
|
||||
self._set_egress_overrides(overrides)
|
||||
|
||||
return self.apply_service(service_id)
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""Return egress status for every installed service that has egress config."""
|
||||
installed = self.config_manager.get_installed_services()
|
||||
statuses: Dict[str, Any] = {}
|
||||
for svc_id, record in installed.items():
|
||||
if not isinstance(record, dict):
|
||||
continue
|
||||
manifest = record.get('manifest')
|
||||
if not manifest or not self._has_egress(manifest):
|
||||
continue
|
||||
container_name = manifest.get('container_name', '')
|
||||
container_ip = self._discover_container_ip(container_name, retries=1)
|
||||
exit_via = self._resolve_exit(svc_id, manifest)
|
||||
statuses[svc_id] = {
|
||||
'exit_via': exit_via,
|
||||
'container_ip': container_ip,
|
||||
'has_egress': True,
|
||||
}
|
||||
return {'ok': True, 'services': statuses}
|
||||
|
||||
# ── Internals ──────────────────────────────────────────────────────────
|
||||
|
||||
def _get_manifest(self, service_id: str) -> Optional[dict]:
|
||||
"""Retrieve the manifest for an installed service, if available."""
|
||||
installed = self.config_manager.get_installed_services()
|
||||
record = installed.get(service_id)
|
||||
if not record:
|
||||
return None
|
||||
return record.get('manifest')
|
||||
|
||||
def _has_egress(self, manifest: dict) -> bool:
|
||||
"""Return True only when the manifest explicitly declares an egress block."""
|
||||
return bool(manifest.get('has_egress', False) and manifest.get('egress'))
|
||||
|
||||
def _resolve_exit(self, service_id: str, manifest: dict) -> str:
|
||||
"""Determine the effective exit for a service.
|
||||
|
||||
Priority: persisted override > manifest egress.default > 'default'.
|
||||
"""
|
||||
overrides = self._get_egress_overrides()
|
||||
if service_id in overrides:
|
||||
return overrides[service_id]
|
||||
egress = manifest.get('egress') or {}
|
||||
return egress.get('default', 'default')
|
||||
|
||||
def _discover_container_ip(self, container_name: str,
|
||||
retries: int = 5, delay: float = 0.2) -> Optional[str]:
|
||||
"""Return the container's cell-network IP, retrying on transient failure."""
|
||||
if not container_name:
|
||||
return None
|
||||
for attempt in range(retries):
|
||||
result = subprocess.run(
|
||||
[
|
||||
'docker', 'inspect',
|
||||
'-f', '{{.NetworkSettings.Networks.cell-network.IPAddress}}',
|
||||
container_name,
|
||||
],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
ip = result.stdout.strip()
|
||||
if ip and result.returncode == 0:
|
||||
return ip
|
||||
if attempt < retries - 1:
|
||||
time.sleep(delay)
|
||||
return None
|
||||
|
||||
def _ensure_chains(self) -> None:
|
||||
"""Idempotently create PIC_EGRESS chains in mangle and nat on the host."""
|
||||
for table in ('mangle', 'nat'):
|
||||
# Create the chain if it does not yet exist
|
||||
check = self._iptables(['-t', table, '-L', EGRESS_CHAIN, '-n'])
|
||||
if check.returncode != 0:
|
||||
create = self._iptables(['-t', table, '-N', EGRESS_CHAIN])
|
||||
if create.returncode != 0 and 'exists' not in (create.stderr or ''):
|
||||
logger.warning(
|
||||
'_ensure_chains: cannot create %s/%s: %s',
|
||||
table, EGRESS_CHAIN, (create.stderr or '').strip(),
|
||||
)
|
||||
|
||||
# Insert jump from PREROUTING at position 1 (idempotent via -C check)
|
||||
jump_check = self._iptables(
|
||||
['-t', table, '-C', 'PREROUTING', '-j', EGRESS_CHAIN]
|
||||
)
|
||||
if jump_check.returncode != 0:
|
||||
self._iptables(
|
||||
['-t', table, '-I', 'PREROUTING', '1', '-j', EGRESS_CHAIN]
|
||||
)
|
||||
|
||||
def _ensure_host_ip_rules(self) -> None:
|
||||
"""Ensure `ip rule fwmark <mark> lookup <table>` exists for each exit."""
|
||||
for exit_type, mark in MARKS.items():
|
||||
table = TABLES[exit_type]
|
||||
# Remove any existing duplicate rules first, then add once
|
||||
for _ in range(8):
|
||||
r = self._ip_rule(['del', 'fwmark', hex(mark), 'lookup', str(table)])
|
||||
if r.returncode != 0:
|
||||
break
|
||||
self._ip_rule(['add', 'fwmark', hex(mark), 'lookup', str(table)])
|
||||
|
||||
def _add_mark_rule(self, service_ip: str, mark: int, service_id: str) -> None:
|
||||
"""Mark outbound packets from the service container with fwmark."""
|
||||
self._iptables([
|
||||
'-t', 'mangle', '-A', EGRESS_CHAIN,
|
||||
'-s', service_ip,
|
||||
'-j', 'MARK', '--set-mark', hex(mark),
|
||||
'-m', 'comment', '--comment', self._tag(service_id),
|
||||
])
|
||||
|
||||
def _add_tor_redirect(self, service_ip: str, service_id: str) -> None:
|
||||
"""Redirect the service container's TCP traffic to the local Tor TransPort."""
|
||||
self._iptables([
|
||||
'-t', 'nat', '-A', EGRESS_CHAIN,
|
||||
'-s', service_ip, '-p', 'tcp',
|
||||
'-j', 'REDIRECT', '--to-ports', str(_TOR_TRANS_PORT),
|
||||
'-m', 'comment', '--comment', self._tag(service_id),
|
||||
])
|
||||
|
||||
def _clear_egress_rules(self, service_id: str) -> None:
|
||||
"""Remove all rules tagged pic-egr-<service_id> from mangle and nat."""
|
||||
import re as _re
|
||||
tag = self._tag(service_id)
|
||||
comment_re = _re.compile(
|
||||
rf'--comment\s+["\']?{_re.escape(tag)}["\']?(\s|$)'
|
||||
)
|
||||
for table in ('mangle', 'nat'):
|
||||
try:
|
||||
save = subprocess.run(
|
||||
['iptables-save', '-t', table],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
if save.returncode != 0:
|
||||
continue
|
||||
lines = save.stdout.splitlines()
|
||||
filtered = [ln for ln in lines if not comment_re.search(ln)]
|
||||
if len(filtered) == len(lines):
|
||||
continue # nothing to remove
|
||||
restore_input = '\n'.join(filtered) + '\n'
|
||||
restore = subprocess.run(
|
||||
['iptables-restore', '-T', table],
|
||||
input=restore_input,
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
if restore.returncode != 0:
|
||||
logger.warning(
|
||||
'_clear_egress_rules(%s): iptables-restore for %s failed: %s',
|
||||
service_id, table, (restore.stderr or '').strip(),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error('_clear_egress_rules(%s, %s): %s', service_id, table, exc)
|
||||
|
||||
@staticmethod
|
||||
def _tag(service_id: str) -> str:
|
||||
"""iptables comment tag used to identify rules belonging to a service."""
|
||||
return f'pic-egr-{service_id}'
|
||||
|
||||
def _iptables(self, args: List[str], check: bool = False) -> subprocess.CompletedProcess:
|
||||
"""Run iptables on the host with the given arguments."""
|
||||
cmd = ['iptables'] + args
|
||||
try:
|
||||
return subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||||
except Exception as exc:
|
||||
logger.error('_iptables %s: %s', args, exc)
|
||||
raise
|
||||
|
||||
def _ip_rule(self, args: List[str]) -> subprocess.CompletedProcess:
|
||||
"""Run `ip rule` on the host with the given arguments."""
|
||||
cmd = ['ip', 'rule'] + args
|
||||
try:
|
||||
return subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||||
except Exception as exc:
|
||||
logger.error('_ip_rule %s: %s', args, exc)
|
||||
raise
|
||||
|
||||
# ── Config persistence helpers ─────────────────────────────────────────
|
||||
|
||||
def _get_egress_overrides(self) -> Dict[str, str]:
|
||||
"""Return the persisted egress override map {service_id: exit_type}."""
|
||||
try:
|
||||
overrides = self.config_manager.configs.get('egress_overrides')
|
||||
if isinstance(overrides, dict):
|
||||
return dict(overrides)
|
||||
except Exception:
|
||||
pass
|
||||
return {}
|
||||
|
||||
def _set_egress_overrides(self, overrides: Dict[str, str]) -> None:
|
||||
"""Persist the egress override map to config."""
|
||||
try:
|
||||
self.config_manager.configs['egress_overrides'] = overrides
|
||||
self.config_manager._save_all_configs()
|
||||
except Exception as exc:
|
||||
logger.error('_set_egress_overrides: %s', exc)
|
||||
Reference in New Issue
Block a user