From 03a67ad922007792af7e7eb6a8b65a1f33d65127 Mon Sep 17 00:00:00 2001 From: Dmitrii Iurco Date: Sat, 30 May 2026 00:58:47 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20add=20EgressManager=20=E2=80=94=20per-s?= =?UTF-8?q?ervice=20egress=20enforcement=20via=20host=20iptables?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Routes outbound traffic from installed service containers through alternate exits (wireguard_ext, openvpn, tor) using host-side iptables fwmark policy-routing in a dedicated PIC_EGRESS chain. Marks 0x110/0x120/0x130 are distinct from ConnectivityManager's 0x10/0x20/0x30. Container IPs discovered at runtime via docker inspect. Wired into ServiceStoreManager install/remove lifecycle and managers.py singleton. 22 new tests. Co-Authored-By: Claude Sonnet 4.6 --- api/egress_manager.py | 352 +++++++++++++++++++++ api/managers.py | 10 + api/service_store_manager.py | 15 +- tests/test_egress_manager.py | 586 +++++++++++++++++++++++++++++++++++ 4 files changed, 962 insertions(+), 1 deletion(-) create mode 100644 api/egress_manager.py create mode 100644 tests/test_egress_manager.py diff --git a/api/egress_manager.py b/api/egress_manager.py new file mode 100644 index 0000000..d113c5e --- /dev/null +++ b/api/egress_manager.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 +""" +EgressManager — per-service egress enforcement. + +Routes outbound traffic from installed service containers through +alternate exits (wireguard_ext, openvpn, tor) using host-side +iptables fwmark policy-routing. Integrates with ServiceStoreManager +for install/remove lifecycle hooks. + +Rules live on the HOST in PIC_EGRESS chains in the mangle and nat +tables. Container IPs are discovered via docker inspect using the +container_name from the service manifest. Marks are distinct from +ConnectivityManager to prevent rule collisions. +""" +import logging +import subprocess +import time +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +EXIT_TYPES = ("default", "wireguard_ext", "openvpn", "tor") + +# fwmark values — must not collide with ConnectivityManager (0x10, 0x20, 0x30) +MARKS = {"wireguard_ext": 0x110, "openvpn": 0x120, "tor": 0x130} + +# Policy routing table IDs +TABLES = {"wireguard_ext": 210, "openvpn": 220, "tor": 230} + +EGRESS_CHAIN = "PIC_EGRESS" + +# Transparent proxy port used by Tor +_TOR_TRANS_PORT = 9040 + + +class EgressManager: + """Per-service egress enforcement via host iptables fwmark policy-routing.""" + + def __init__(self, config_manager, service_store_manager=None, + data_dir: str = "/app/data", config_dir: str = "/app/config"): + self.config_manager = config_manager + self.service_store_manager = service_store_manager + self._data_dir = data_dir + self._config_dir = config_dir + + # ── Public API ───────────────────────────────────────────────────────── + + def apply_service(self, service_id: str) -> Dict[str, Any]: + """Idempotently apply egress rules for one installed service. + + Steps: + 1. Look up the service manifest. + 2. clear_service first (ensures idempotency). + 3. If the manifest has no egress block, skip silently. + 4. Discover the container IP. + 5. Resolve the exit type (override > manifest default > 'default'). + 6. If exit is 'default', return early with no rules. + 7. Otherwise create chains, ensure ip rules, add mark rules. + """ + manifest = self._get_manifest(service_id) + if manifest is None: + return {'ok': False, 'error': f'manifest not found for {service_id}'} + + # Always clear first for idempotency + self.clear_service(service_id) + + if not self._has_egress(manifest): + return {'ok': True, 'skipped': True} + + container_name = manifest.get('container_name', '') + container_ip = self._discover_container_ip(container_name) + if not container_ip: + return {'ok': False, 'error': 'container IP not discoverable'} + + exit_via = self._resolve_exit(service_id, manifest) + + # Validate exit_via is a known, non-default value + if exit_via not in EXIT_TYPES: + return { + 'ok': False, + 'error': f'unknown exit_via {exit_via!r}; must be one of {EXIT_TYPES}', + } + + if exit_via == 'default': + return {'ok': True, 'exit_via': 'default'} + + if exit_via not in MARKS: + return { + 'ok': False, + 'error': f'unknown exit_via {exit_via!r}; must be one of {EXIT_TYPES}', + } + + try: + self._ensure_chains() + self._ensure_host_ip_rules() + self._add_mark_rule(container_ip, MARKS[exit_via], service_id) + if exit_via == 'tor': + self._add_tor_redirect(container_ip, service_id) + except Exception as exc: + logger.error('apply_service(%s): %s', service_id, exc) + return {'ok': False, 'error': str(exc)} + + return {'ok': True, 'exit_via': exit_via, 'container_ip': container_ip} + + def clear_service(self, service_id: str) -> Dict[str, Any]: + """Remove all PIC_EGRESS rules tagged for this service.""" + try: + self._clear_egress_rules(service_id) + return {'ok': True} + except Exception as exc: + logger.error('clear_service(%s): %s', service_id, exc) + return {'ok': False, 'error': str(exc)} + + def apply_all(self) -> Dict[str, Any]: + """Apply egress rules for every installed service that has a manifest.""" + installed = self.config_manager.get_installed_services() + results: Dict[str, Any] = {} + for svc_id, record in installed.items(): + if not isinstance(record, dict) or not record.get('manifest'): + continue + results[svc_id] = self.apply_service(svc_id) + return {'ok': True, 'services': results} + + def set_service_exit(self, service_id: str, exit_type: str) -> Dict[str, Any]: + """Persist a per-service egress override and immediately reapply rules. + + exit_type must appear in the manifest's egress.allowed list. + """ + manifest = self._get_manifest(service_id) + if manifest is None: + return {'ok': False, 'error': f'service {service_id!r} not installed'} + + if not self._has_egress(manifest): + return {'ok': False, 'error': f'service {service_id!r} has no egress configuration'} + + egress = manifest.get('egress', {}) + allowed = egress.get('allowed', list(EXIT_TYPES)) + + if exit_type not in allowed: + return { + 'ok': False, + 'error': ( + f'exit_type {exit_type!r} is not in the allowed list ' + f'for {service_id}: {allowed}' + ), + } + + if exit_type not in EXIT_TYPES: + return { + 'ok': False, + 'error': f'unknown exit_type {exit_type!r}; must be one of {EXIT_TYPES}', + } + + # Persist the override so it survives restarts + overrides = self._get_egress_overrides() + overrides[service_id] = exit_type + self._set_egress_overrides(overrides) + + return self.apply_service(service_id) + + def get_status(self) -> Dict[str, Any]: + """Return egress status for every installed service that has egress config.""" + installed = self.config_manager.get_installed_services() + statuses: Dict[str, Any] = {} + for svc_id, record in installed.items(): + if not isinstance(record, dict): + continue + manifest = record.get('manifest') + if not manifest or not self._has_egress(manifest): + continue + container_name = manifest.get('container_name', '') + container_ip = self._discover_container_ip(container_name, retries=1) + exit_via = self._resolve_exit(svc_id, manifest) + statuses[svc_id] = { + 'exit_via': exit_via, + 'container_ip': container_ip, + 'has_egress': True, + } + return {'ok': True, 'services': statuses} + + # ── Internals ────────────────────────────────────────────────────────── + + def _get_manifest(self, service_id: str) -> Optional[dict]: + """Retrieve the manifest for an installed service, if available.""" + installed = self.config_manager.get_installed_services() + record = installed.get(service_id) + if not record: + return None + return record.get('manifest') + + def _has_egress(self, manifest: dict) -> bool: + """Return True only when the manifest explicitly declares an egress block.""" + return bool(manifest.get('has_egress', False) and manifest.get('egress')) + + def _resolve_exit(self, service_id: str, manifest: dict) -> str: + """Determine the effective exit for a service. + + Priority: persisted override > manifest egress.default > 'default'. + """ + overrides = self._get_egress_overrides() + if service_id in overrides: + return overrides[service_id] + egress = manifest.get('egress') or {} + return egress.get('default', 'default') + + def _discover_container_ip(self, container_name: str, + retries: int = 5, delay: float = 0.2) -> Optional[str]: + """Return the container's cell-network IP, retrying on transient failure.""" + if not container_name: + return None + for attempt in range(retries): + result = subprocess.run( + [ + 'docker', 'inspect', + '-f', '{{.NetworkSettings.Networks.cell-network.IPAddress}}', + container_name, + ], + capture_output=True, text=True, timeout=10, + ) + ip = result.stdout.strip() + if ip and result.returncode == 0: + return ip + if attempt < retries - 1: + time.sleep(delay) + return None + + def _ensure_chains(self) -> None: + """Idempotently create PIC_EGRESS chains in mangle and nat on the host.""" + for table in ('mangle', 'nat'): + # Create the chain if it does not yet exist + check = self._iptables(['-t', table, '-L', EGRESS_CHAIN, '-n']) + if check.returncode != 0: + create = self._iptables(['-t', table, '-N', EGRESS_CHAIN]) + if create.returncode != 0 and 'exists' not in (create.stderr or ''): + logger.warning( + '_ensure_chains: cannot create %s/%s: %s', + table, EGRESS_CHAIN, (create.stderr or '').strip(), + ) + + # Insert jump from PREROUTING at position 1 (idempotent via -C check) + jump_check = self._iptables( + ['-t', table, '-C', 'PREROUTING', '-j', EGRESS_CHAIN] + ) + if jump_check.returncode != 0: + self._iptables( + ['-t', table, '-I', 'PREROUTING', '1', '-j', EGRESS_CHAIN] + ) + + def _ensure_host_ip_rules(self) -> None: + """Ensure `ip rule fwmark lookup ` exists for each exit.""" + for exit_type, mark in MARKS.items(): + table = TABLES[exit_type] + # Remove any existing duplicate rules first, then add once + for _ in range(8): + r = self._ip_rule(['del', 'fwmark', hex(mark), 'lookup', str(table)]) + if r.returncode != 0: + break + self._ip_rule(['add', 'fwmark', hex(mark), 'lookup', str(table)]) + + def _add_mark_rule(self, service_ip: str, mark: int, service_id: str) -> None: + """Mark outbound packets from the service container with fwmark.""" + self._iptables([ + '-t', 'mangle', '-A', EGRESS_CHAIN, + '-s', service_ip, + '-j', 'MARK', '--set-mark', hex(mark), + '-m', 'comment', '--comment', self._tag(service_id), + ]) + + def _add_tor_redirect(self, service_ip: str, service_id: str) -> None: + """Redirect the service container's TCP traffic to the local Tor TransPort.""" + self._iptables([ + '-t', 'nat', '-A', EGRESS_CHAIN, + '-s', service_ip, '-p', 'tcp', + '-j', 'REDIRECT', '--to-ports', str(_TOR_TRANS_PORT), + '-m', 'comment', '--comment', self._tag(service_id), + ]) + + def _clear_egress_rules(self, service_id: str) -> None: + """Remove all rules tagged pic-egr- from mangle and nat.""" + import re as _re + tag = self._tag(service_id) + comment_re = _re.compile( + rf'--comment\s+["\']?{_re.escape(tag)}["\']?(\s|$)' + ) + for table in ('mangle', 'nat'): + try: + save = subprocess.run( + ['iptables-save', '-t', table], + capture_output=True, text=True, timeout=10, + ) + if save.returncode != 0: + continue + lines = save.stdout.splitlines() + filtered = [ln for ln in lines if not comment_re.search(ln)] + if len(filtered) == len(lines): + continue # nothing to remove + restore_input = '\n'.join(filtered) + '\n' + restore = subprocess.run( + ['iptables-restore', '-T', table], + input=restore_input, + capture_output=True, text=True, timeout=10, + ) + if restore.returncode != 0: + logger.warning( + '_clear_egress_rules(%s): iptables-restore for %s failed: %s', + service_id, table, (restore.stderr or '').strip(), + ) + except Exception as exc: + logger.error('_clear_egress_rules(%s, %s): %s', service_id, table, exc) + + @staticmethod + def _tag(service_id: str) -> str: + """iptables comment tag used to identify rules belonging to a service.""" + return f'pic-egr-{service_id}' + + def _iptables(self, args: List[str], check: bool = False) -> subprocess.CompletedProcess: + """Run iptables on the host with the given arguments.""" + cmd = ['iptables'] + args + try: + return subprocess.run(cmd, capture_output=True, text=True, timeout=10) + except Exception as exc: + logger.error('_iptables %s: %s', args, exc) + raise + + def _ip_rule(self, args: List[str]) -> subprocess.CompletedProcess: + """Run `ip rule` on the host with the given arguments.""" + cmd = ['ip', 'rule'] + args + try: + return subprocess.run(cmd, capture_output=True, text=True, timeout=10) + except Exception as exc: + logger.error('_ip_rule %s: %s', args, exc) + raise + + # ── Config persistence helpers ───────────────────────────────────────── + + def _get_egress_overrides(self) -> Dict[str, str]: + """Return the persisted egress override map {service_id: exit_type}.""" + try: + overrides = self.config_manager.configs.get('egress_overrides') + if isinstance(overrides, dict): + return dict(overrides) + except Exception: + pass + return {} + + def _set_egress_overrides(self, overrides: Dict[str, str]) -> None: + """Persist the egress override map to config.""" + try: + self.config_manager.configs['egress_overrides'] = overrides + self.config_manager._save_all_configs() + except Exception as exc: + logger.error('_set_egress_overrides: %s', exc) diff --git a/api/managers.py b/api/managers.py index e5b792f..93c6dce 100644 --- a/api/managers.py +++ b/api/managers.py @@ -95,6 +95,15 @@ service_store_manager = ServiceStoreManager( service_composer=service_composer, ) +from egress_manager import EgressManager +egress_manager = EgressManager( + config_manager=config_manager, + service_store_manager=service_store_manager, + data_dir=DATA_DIR, + config_dir=CONFIG_DIR, +) +service_store_manager.egress_manager = egress_manager + setup_manager = SetupManager(config_manager=config_manager, auth_manager=auth_manager) # Service logger configuration @@ -132,6 +141,7 @@ __all__ = [ 'cell_link_manager', 'auth_manager', 'setup_manager', 'caddy_manager', 'ddns_manager', 'service_store_manager', 'connectivity_manager', 'service_registry', 'service_composer', 'account_manager', + 'egress_manager', 'firewall_manager', 'EventType', 'DATA_DIR', 'CONFIG_DIR', ] diff --git a/api/service_store_manager.py b/api/service_store_manager.py index b25ae99..e97a179 100644 --- a/api/service_store_manager.py +++ b/api/service_store_manager.py @@ -75,12 +75,13 @@ class ServiceStoreManager(BaseServiceManager): def __init__(self, config_manager, caddy_manager, container_manager, data_dir: str = '', config_dir: str = '', - service_composer=None): + service_composer=None, egress_manager=None): super().__init__('service_store', data_dir, config_dir) self.config_manager = config_manager self.caddy_manager = caddy_manager self.container_manager = container_manager self.service_composer = service_composer + self.egress_manager = egress_manager self.compose_override = os.environ.get( 'COMPOSE_SERVICES_PATH', '/app/docker-compose.services.yml' ) @@ -345,6 +346,12 @@ class ServiceStoreManager(BaseServiceManager): except Exception as e: logger.warning('install: caddy regenerate failed for %s (non-fatal): %s', service_id, e) + if self.egress_manager: + try: + self.egress_manager.apply_service(service_id) + except Exception as exc: + logger.warning('Egress apply failed for %s (non-fatal): %s', service_id, exc) + return {'ok': True} def remove(self, service_id: str, purge_data: bool = False) -> dict: @@ -363,6 +370,12 @@ class ServiceStoreManager(BaseServiceManager): 'error': f'Cannot remove {service_id}: required by {", ".join(sorted(dependents))}', } + if self.egress_manager: + try: + self.egress_manager.clear_service(service_id) + except Exception as exc: + logger.warning('Egress clear failed for %s (non-fatal): %s', service_id, exc) + # Stop and remove containers (best-effort) if self.service_composer is not None: try: diff --git a/tests/test_egress_manager.py b/tests/test_egress_manager.py new file mode 100644 index 0000000..3139725 --- /dev/null +++ b/tests/test_egress_manager.py @@ -0,0 +1,586 @@ +""" +Tests for EgressManager — per-service egress enforcement via host iptables. + +All subprocess calls (iptables, iptables-save, iptables-restore, ip rule, +docker inspect) and config_manager state are mocked so these tests run +without any live infrastructure or root privileges. +""" + +import os +import sys +import unittest +from unittest.mock import MagicMock, patch, call + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api')) + +import egress_manager as em_module +from egress_manager import EgressManager, MARKS, TABLES, EXIT_TYPES, EGRESS_CHAIN + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_manager(installed=None, overrides=None): + """Build an EgressManager backed by a mock config_manager.""" + cm = MagicMock() + cm.get_installed_services.return_value = installed or {} + # Wire up configs dict so _get_egress_overrides / _set_egress_overrides work + cm.configs = {'egress_overrides': overrides or {}} + cm._save_all_configs = MagicMock() + return EgressManager(config_manager=cm), cm + + +def _subprocess_ok(stdout=''): + """Return a MagicMock simulating a successful subprocess.run result.""" + return MagicMock(returncode=0, stdout=stdout, stderr='') + + +def _subprocess_fail(stderr='error', stdout=''): + """Return a MagicMock simulating a failed subprocess.run result.""" + return MagicMock(returncode=1, stdout=stdout, stderr=stderr) + + +def _make_manifest(has_egress=True, egress_default='wireguard_ext', + allowed=None, container_name='cell-myapp'): + """Return a minimal manifest dict with optional egress configuration.""" + m = { + 'id': 'myapp', + 'name': 'My App', + 'container_name': container_name, + } + if has_egress: + m['has_egress'] = True + m['egress'] = { + 'default': egress_default, + 'allowed': allowed if allowed is not None else list(EXIT_TYPES), + } + else: + m['has_egress'] = False + return m + + +def _installed_with_manifest(manifest, service_id='myapp'): + """Return an installed-services dict containing one service record.""" + return {service_id: {'id': service_id, 'manifest': manifest}} + + +# --------------------------------------------------------------------------- +# 1. test_apply_service_default_exit_no_iptables_calls +# --------------------------------------------------------------------------- + +class TestApplyServiceDefaultExit(unittest.TestCase): + + def test_apply_service_default_exit_no_iptables_calls(self): + """When egress.default is 'default', apply_service must not touch iptables.""" + manifest = _make_manifest(egress_default='default') + mgr, _ = _make_manager(installed=_installed_with_manifest(manifest)) + + with patch('subprocess.run') as mock_run: + # docker inspect must return an IP so we don't fail earlier + mock_run.return_value = _subprocess_ok(stdout='172.20.0.50\n') + result = mgr.apply_service('myapp') + + self.assertTrue(result['ok']) + self.assertEqual(result.get('exit_via'), 'default') + + # No iptables rule-insertion or mark call should have been made. + # iptables-save from clear_service is allowed; we only check that + # no iptables -A / -I (rule-adding) calls were made. + rule_add_calls = [ + c for c in mock_run.call_args_list + if c.args and c.args[0][:1] == ['iptables'] + and any(a in c.args[0] for a in ('-A', '-I', 'MARK', 'REDIRECT')) + ] + self.assertEqual(rule_add_calls, []) + + +# --------------------------------------------------------------------------- +# 2. test_apply_service_wireguard_ext_adds_mark_rule +# --------------------------------------------------------------------------- + +class TestApplyServiceWireguardExt(unittest.TestCase): + + def test_apply_service_wireguard_ext_adds_mark_rule(self): + """wireguard_ext exit must add a mangle MARK rule with 0x110 and the correct comment.""" + manifest = _make_manifest(egress_default='wireguard_ext') + mgr, _ = _make_manager(installed=_installed_with_manifest(manifest)) + + calls_made = [] + + def fake_run(cmd, **kwargs): + calls_made.append(cmd) + # docker inspect → return IP + if 'docker' in cmd and 'inspect' in cmd: + return _subprocess_ok(stdout='172.20.0.50\n') + # iptables-save → empty ruleset + if 'iptables-save' in cmd: + return _subprocess_ok(stdout='') + # iptables-restore → success + if 'iptables-restore' in cmd: + return _subprocess_ok() + # ip rule del → fail (none to delete) + if cmd[:3] == ['ip', 'rule', 'del']: + return _subprocess_fail() + return _subprocess_ok() + + with patch('subprocess.run', side_effect=fake_run): + result = mgr.apply_service('myapp') + + self.assertTrue(result['ok'], result) + self.assertEqual(result['exit_via'], 'wireguard_ext') + + # Find the mangle MARK -A call + mark_calls = [ + c for c in calls_made + if 'iptables' in str(c) and 'MARK' in c and '--set-mark' in c + ] + self.assertGreater(len(mark_calls), 0, 'No MARK rule was added') + mark_cmd = ' '.join(mark_calls[0]) + self.assertIn('0x110', mark_cmd) + self.assertIn('pic-egr-myapp', mark_cmd) + self.assertIn('mangle', mark_cmd) + + +# --------------------------------------------------------------------------- +# 3. test_apply_service_openvpn_adds_mark_rule +# --------------------------------------------------------------------------- + +class TestApplyServiceOpenVPN(unittest.TestCase): + + def test_apply_service_openvpn_adds_mark_rule(self): + """openvpn exit must add a mangle MARK rule with 0x120.""" + manifest = _make_manifest(egress_default='openvpn') + mgr, _ = _make_manager(installed=_installed_with_manifest(manifest)) + + calls_made = [] + + def fake_run(cmd, **kwargs): + calls_made.append(cmd) + if 'docker' in cmd and 'inspect' in cmd: + return _subprocess_ok(stdout='172.20.0.51\n') + if 'iptables-save' in cmd: + return _subprocess_ok(stdout='') + if 'iptables-restore' in cmd: + return _subprocess_ok() + if cmd[:3] == ['ip', 'rule', 'del']: + return _subprocess_fail() + return _subprocess_ok() + + with patch('subprocess.run', side_effect=fake_run): + result = mgr.apply_service('myapp') + + self.assertTrue(result['ok'], result) + self.assertEqual(result['exit_via'], 'openvpn') + + mark_calls = [ + c for c in calls_made + if 'iptables' in str(c) and 'MARK' in c and '--set-mark' in c + ] + self.assertGreater(len(mark_calls), 0) + self.assertIn('0x120', ' '.join(mark_calls[0])) + + +# --------------------------------------------------------------------------- +# 4. test_apply_service_tor_adds_mark_and_redirect +# --------------------------------------------------------------------------- + +class TestApplyServiceTor(unittest.TestCase): + + def test_apply_service_tor_adds_mark_and_redirect(self): + """tor exit must add a mangle MARK 0x130 AND a nat REDIRECT to port 9040.""" + manifest = _make_manifest(egress_default='tor') + mgr, _ = _make_manager(installed=_installed_with_manifest(manifest)) + + calls_made = [] + + def fake_run(cmd, **kwargs): + calls_made.append(cmd) + if 'docker' in cmd and 'inspect' in cmd: + return _subprocess_ok(stdout='172.20.0.52\n') + if 'iptables-save' in cmd: + return _subprocess_ok(stdout='') + if 'iptables-restore' in cmd: + return _subprocess_ok() + if cmd[:3] == ['ip', 'rule', 'del']: + return _subprocess_fail() + return _subprocess_ok() + + with patch('subprocess.run', side_effect=fake_run): + result = mgr.apply_service('myapp') + + self.assertTrue(result['ok'], result) + self.assertEqual(result['exit_via'], 'tor') + + mark_calls = [ + c for c in calls_made + if 'iptables' in str(c) and 'MARK' in c and '--set-mark' in c + ] + self.assertGreater(len(mark_calls), 0, 'No MARK rule found') + self.assertIn('0x130', ' '.join(mark_calls[0])) + + redirect_calls = [ + c for c in calls_made + if 'iptables' in str(c) and 'REDIRECT' in c + ] + self.assertGreater(len(redirect_calls), 0, 'No REDIRECT rule found') + redirect_cmd = ' '.join(redirect_calls[0]) + self.assertIn('9040', redirect_cmd) + self.assertIn('nat', redirect_cmd) + + +# --------------------------------------------------------------------------- +# 5. test_apply_service_no_container_ip_returns_error +# --------------------------------------------------------------------------- + +class TestApplyServiceNoContainerIP(unittest.TestCase): + + def test_apply_service_no_container_ip_returns_error(self): + """When docker inspect returns an empty IP, apply_service must return ok=False.""" + manifest = _make_manifest(egress_default='wireguard_ext') + mgr, _ = _make_manager(installed=_installed_with_manifest(manifest)) + + def fake_run(cmd, **kwargs): + if 'docker' in cmd and 'inspect' in cmd: + return _subprocess_ok(stdout='\n') # empty IP + if 'iptables-save' in cmd: + return _subprocess_ok(stdout='') + if 'iptables-restore' in cmd: + return _subprocess_ok() + return _subprocess_ok() + + with patch('subprocess.run', side_effect=fake_run): + result = mgr.apply_service('myapp') + + self.assertFalse(result['ok']) + self.assertIn('container IP not discoverable', result.get('error', '')) + + +# --------------------------------------------------------------------------- +# 6. test_apply_service_container_ip_retries +# --------------------------------------------------------------------------- + +class TestApplyServiceRetries(unittest.TestCase): + + def test_apply_service_container_ip_retries(self): + """First docker inspect attempt fails; second succeeds — result must be ok=True.""" + manifest = _make_manifest(egress_default='wireguard_ext') + mgr, _ = _make_manager(installed=_installed_with_manifest(manifest)) + + inspect_count = [0] + + def fake_run(cmd, **kwargs): + if 'docker' in cmd and 'inspect' in cmd: + inspect_count[0] += 1 + if inspect_count[0] == 1: + return _subprocess_ok(stdout='\n') # first attempt: empty + return _subprocess_ok(stdout='172.20.0.50\n') # second: success + if 'iptables-save' in cmd: + return _subprocess_ok(stdout='') + if 'iptables-restore' in cmd: + return _subprocess_ok() + if cmd[:3] == ['ip', 'rule', 'del']: + return _subprocess_fail() + return _subprocess_ok() + + with patch('subprocess.run', side_effect=fake_run): + with patch('time.sleep'): # skip actual delays + result = mgr.apply_service('myapp') + + self.assertTrue(result['ok'], result) + self.assertGreaterEqual(inspect_count[0], 2) + + +# --------------------------------------------------------------------------- +# 7. test_has_egress_false_skips_rules +# --------------------------------------------------------------------------- + +class TestHasEgressFalse(unittest.TestCase): + + def test_has_egress_false_skips_rules(self): + """A manifest with has_egress=False must skip rules and return skipped=True.""" + manifest = _make_manifest(has_egress=False) + mgr, _ = _make_manager(installed=_installed_with_manifest(manifest)) + + with patch('subprocess.run') as mock_run: + mock_run.return_value = _subprocess_ok(stdout='') + result = mgr.apply_service('myapp') + + self.assertTrue(result['ok']) + self.assertTrue(result.get('skipped')) + + # No iptables rule-insertion call should have been made. + # iptables-save from clear_service is permitted; only check no -A/-I. + rule_add_calls = [ + c for c in mock_run.call_args_list + if c.args and c.args[0][:1] == ['iptables'] + and any(a in c.args[0] for a in ('-A', '-I', 'MARK', 'REDIRECT')) + ] + self.assertEqual(rule_add_calls, []) + + +# --------------------------------------------------------------------------- +# 8. test_has_egress_missing_egress_block_skips +# --------------------------------------------------------------------------- + +class TestHasEgressMissingBlock(unittest.TestCase): + + def test_has_egress_missing_egress_block_skips(self): + """has_egress=True but no 'egress' dict → must skip (skipped=True).""" + manifest = { + 'id': 'myapp', + 'container_name': 'cell-myapp', + 'has_egress': True, + # 'egress' key intentionally absent + } + mgr, _ = _make_manager( + installed=_installed_with_manifest(manifest) + ) + + with patch('subprocess.run') as mock_run: + mock_run.return_value = _subprocess_ok(stdout='') + result = mgr.apply_service('myapp') + + self.assertTrue(result['ok']) + self.assertTrue(result.get('skipped')) + + +# --------------------------------------------------------------------------- +# 9. test_clear_service_removes_tagged_rules +# --------------------------------------------------------------------------- + +class TestClearService(unittest.TestCase): + + def test_clear_service_removes_tagged_rules(self): + """iptables-restore is called with the tagged lines removed.""" + mgr, _ = _make_manager() + + mangle_rules = ( + '-A PIC_EGRESS -s 172.20.0.50 -j MARK --set-mark 0x110 ' + '-m comment --comment "pic-egr-myapp"\n' + '-A PIC_EGRESS -s 172.20.0.99 -j MARK --set-mark 0x110 ' + '-m comment --comment "pic-egr-otherapp"\n' + ) + nat_rules = '' + + restore_inputs = {} + + def fake_run(cmd, input=None, **kwargs): + if cmd == ['iptables-save', '-t', 'mangle']: + return _subprocess_ok(stdout=mangle_rules) + if cmd == ['iptables-save', '-t', 'nat']: + return _subprocess_ok(stdout=nat_rules) + if cmd == ['iptables-restore', '-T', 'mangle']: + restore_inputs['mangle'] = input + return _subprocess_ok() + if cmd == ['iptables-restore', '-T', 'nat']: + restore_inputs['nat'] = input + return _subprocess_ok() + return _subprocess_ok() + + with patch('subprocess.run', side_effect=fake_run): + result = mgr.clear_service('myapp') + + self.assertTrue(result['ok']) + # The restored mangle rules must not contain myapp's tag + restored = restore_inputs.get('mangle', '') + self.assertNotIn('pic-egr-myapp', restored) + # But the other service's rules must be preserved + self.assertIn('pic-egr-otherapp', restored) + + +# --------------------------------------------------------------------------- +# 10. test_set_service_exit_rejects_not_in_allowed +# --------------------------------------------------------------------------- + +class TestSetServiceExitRejectNotAllowed(unittest.TestCase): + + def test_set_service_exit_rejects_not_in_allowed(self): + """Exit type not in manifest's allowed list must return ok=False.""" + manifest = _make_manifest( + egress_default='default', + allowed=['default', 'tor'], # wireguard_ext not in allowed + ) + mgr, _ = _make_manager(installed=_installed_with_manifest(manifest)) + + result = mgr.set_service_exit('myapp', 'wireguard_ext') + + self.assertFalse(result['ok']) + self.assertIn('error', result) + self.assertIn('allowed', result['error']) + + +# --------------------------------------------------------------------------- +# 11. test_set_service_exit_persists_and_applies +# --------------------------------------------------------------------------- + +class TestSetServiceExitPersistsAndApplies(unittest.TestCase): + + def test_set_service_exit_persists_and_applies(self): + """Valid override must be persisted to config_manager and apply_service called.""" + manifest = _make_manifest(egress_default='default', allowed=list(EXIT_TYPES)) + mgr, cm = _make_manager(installed=_installed_with_manifest(manifest)) + + apply_calls = [] + original_apply = mgr.apply_service + + def fake_apply(sid): + apply_calls.append(sid) + return {'ok': True, 'exit_via': 'tor'} + + mgr.apply_service = fake_apply + + result = mgr.set_service_exit('myapp', 'tor') + + self.assertTrue(result['ok'], result) + # apply_service was called + self.assertIn('myapp', apply_calls) + # override was persisted + cm._save_all_configs.assert_called() + self.assertEqual(cm.configs['egress_overrides'].get('myapp'), 'tor') + + +# --------------------------------------------------------------------------- +# 12. test_apply_all_iterates_installed_services +# --------------------------------------------------------------------------- + +class TestApplyAll(unittest.TestCase): + + def test_apply_all_iterates_installed_services(self): + """apply_all must call apply_service for every service with a manifest.""" + manifests = { + 'svc1': _make_manifest(egress_default='wireguard_ext'), + 'svc2': _make_manifest(egress_default='openvpn'), + 'svc3': _make_manifest(egress_default='tor'), + } + installed = { + sid: {'id': sid, 'manifest': m} + for sid, m in manifests.items() + } + mgr, _ = _make_manager(installed=installed) + + applied = [] + mgr.apply_service = lambda sid: applied.append(sid) or {'ok': True} + + result = mgr.apply_all() + + self.assertTrue(result['ok']) + self.assertEqual(sorted(applied), ['svc1', 'svc2', 'svc3']) + + +# --------------------------------------------------------------------------- +# 13. test_marks_do_not_collide_with_connectivity_manager +# --------------------------------------------------------------------------- + +class TestMarksNoCollision(unittest.TestCase): + + def test_marks_do_not_collide_with_connectivity_manager(self): + """EgressManager marks must be disjoint from ConnectivityManager marks.""" + connectivity_marks = {0x10, 0x20, 0x30} + egress_mark_values = set(MARKS.values()) + collision = connectivity_marks & egress_mark_values + self.assertEqual( + collision, set(), + f'Mark collision with ConnectivityManager: {collision}', + ) + + +# --------------------------------------------------------------------------- +# 14. test_apply_service_unknown_exit_in_allowed_rejected +# --------------------------------------------------------------------------- + +class TestApplyServiceUnknownExit(unittest.TestCase): + + def test_apply_service_unknown_exit_in_allowed_rejected(self): + """An egress.default value that is not a known EXIT_TYPE must return ok=False.""" + manifest = { + 'id': 'myapp', + 'container_name': 'cell-myapp', + 'has_egress': True, + 'egress': { + 'default': 'internet_fast_lane', # unknown exit + 'allowed': ['internet_fast_lane'], + }, + } + mgr, _ = _make_manager(installed=_installed_with_manifest(manifest)) + + def fake_run(cmd, **kwargs): + if 'docker' in cmd and 'inspect' in cmd: + return _subprocess_ok(stdout='172.20.0.50\n') + if 'iptables-save' in cmd: + return _subprocess_ok(stdout='') + if 'iptables-restore' in cmd: + return _subprocess_ok() + return _subprocess_ok() + + with patch('subprocess.run', side_effect=fake_run): + result = mgr.apply_service('myapp') + + self.assertFalse(result['ok']) + self.assertIn('error', result) + + +# --------------------------------------------------------------------------- +# Additional coverage: _has_egress edge cases +# --------------------------------------------------------------------------- + +class TestHasEgressLogic(unittest.TestCase): + + def setUp(self): + self.mgr, _ = _make_manager() + + def test_has_egress_both_required(self): + """Both has_egress=True and non-empty egress dict required.""" + m = {'has_egress': True, 'egress': {'default': 'tor', 'allowed': ['tor']}} + self.assertTrue(self.mgr._has_egress(m)) + + def test_has_egress_false_field(self): + m = {'has_egress': False, 'egress': {'default': 'tor', 'allowed': ['tor']}} + self.assertFalse(self.mgr._has_egress(m)) + + def test_has_egress_missing_has_egress_key(self): + m = {'egress': {'default': 'tor', 'allowed': ['tor']}} + self.assertFalse(self.mgr._has_egress(m)) + + def test_has_egress_empty_egress_dict(self): + m = {'has_egress': True, 'egress': {}} + self.assertFalse(self.mgr._has_egress(m)) + + +# --------------------------------------------------------------------------- +# Additional coverage: _resolve_exit +# --------------------------------------------------------------------------- + +class TestResolveExit(unittest.TestCase): + + def test_override_takes_precedence(self): + mgr, _ = _make_manager(overrides={'myapp': 'openvpn'}) + manifest = _make_manifest(egress_default='wireguard_ext') + self.assertEqual(mgr._resolve_exit('myapp', manifest), 'openvpn') + + def test_manifest_default_used_when_no_override(self): + mgr, _ = _make_manager(overrides={}) + manifest = _make_manifest(egress_default='tor') + self.assertEqual(mgr._resolve_exit('myapp', manifest), 'tor') + + def test_fallback_to_default_when_no_egress_block(self): + mgr, _ = _make_manager(overrides={}) + manifest = {'id': 'myapp'} + self.assertEqual(mgr._resolve_exit('myapp', manifest), 'default') + + +# --------------------------------------------------------------------------- +# Additional: apply_service with missing manifest +# --------------------------------------------------------------------------- + +class TestApplyServiceMissingManifest(unittest.TestCase): + + def test_apply_service_missing_manifest_returns_error(self): + mgr, _ = _make_manager(installed={}) + result = mgr.apply_service('ghost') + self.assertFalse(result['ok']) + self.assertIn('error', result) + + +if __name__ == '__main__': + unittest.main()