""" Tests for the sshuttle (SSH tunnel) exit type — configure_sshuttle validation, config-file generation, vault integration, apply_routes REDIRECT rules, _exit_status bridging, and the /api/connectivity/exits/sshuttle route (never echoes secrets). """ import os import stat import sys import shutil import tempfile import unittest from pathlib import Path from unittest.mock import MagicMock, patch sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api')) import connectivity_manager as cm_module from connectivity_manager import ConnectivityManager _SENTINEL = object() VALID_KEY = ( '-----BEGIN OPENSSH PRIVATE KEY-----\n' 'b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAAB\n' '-----END OPENSSH PRIVATE KEY-----\n' ) VALID_KNOWN_HOSTS = ( 'ssh.example.com,203.0.113.5 ssh-ed25519 ' 'AAAAC3NzaC1lZDI1NTE5AAAAIB5d0o0Yw1xP1Yw1xP1Yw1xP1Yw1xP1Yw1xP1Yw1xP1Y' ) def _make_manager(tmp_dir=None, peer_registry=_SENTINEL, config_manager=None, vault_manager=None): if tmp_dir is None: tmp_dir = tempfile.mkdtemp() if config_manager is None: config_manager = MagicMock() config_manager.get_identity.return_value = { 'cell_name': 'test', 'ip_range': '172.20.0.0/16', } config_manager.get_connectivity_config.return_value = { 'exits': {}, 'peer_exit_map': {}, } config_manager.get_installed_services.return_value = {} config_manager.list_connections.return_value = [] if peer_registry is _SENTINEL: peer_registry = MagicMock() peer_registry.list_peers.return_value = [] with patch.object(ConnectivityManager, '_subscribe_to_events', lambda self: None): mgr = ConnectivityManager( config_manager=config_manager, peer_registry=peer_registry, vault_manager=vault_manager, data_dir=tmp_dir, config_dir=tmp_dir, ) return mgr def _valid_cfg(**overrides): cfg = { 'host': 'ssh.example.com', 'port': 22, 'user': 'tunnel', 'auth': 'key', 'private_key': VALID_KEY, 'known_hosts': VALID_KNOWN_HOSTS, } cfg.update(overrides) return cfg def _mock_subprocess_ok(): return MagicMock(returncode=0, stdout='', stderr='') # --------------------------------------------------------------------------- # configure_sshuttle — validation # --------------------------------------------------------------------------- class TestConfigureSshuttleValidation(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.mgr = _make_manager(tmp_dir=self.tmp) def tearDown(self): shutil.rmtree(self.tmp, ignore_errors=True) def test_valid_config_returns_ok(self): result = self.mgr.configure_sshuttle(_valid_cfg()) self.assertTrue(result['ok'], result) def test_non_dict_config_rejected(self): result = self.mgr.configure_sshuttle('not a dict') self.assertFalse(result['ok']) def test_missing_host_rejected(self): cfg = _valid_cfg() del cfg['host'] result = self.mgr.configure_sshuttle(cfg) self.assertFalse(result['ok']) self.assertIn('host', result['error']) def test_host_with_spaces_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(host='bad host')) self.assertFalse(result['ok']) def test_host_with_shell_metachars_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(host='host;rm -rf /')) self.assertFalse(result['ok']) def test_host_with_double_dots_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(host='a..b')) self.assertFalse(result['ok']) def test_ip_host_accepted(self): result = self.mgr.configure_sshuttle(_valid_cfg(host='203.0.113.10')) self.assertTrue(result['ok']) def test_port_zero_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(port=0)) self.assertFalse(result['ok']) self.assertIn('port', result['error']) def test_port_above_65535_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(port=70000)) self.assertFalse(result['ok']) def test_port_non_numeric_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(port='abc')) self.assertFalse(result['ok']) def test_port_defaults_to_22(self): cfg = _valid_cfg() del cfg['port'] result = self.mgr.configure_sshuttle(cfg) self.assertTrue(result['ok']) conf = Path(self.mgr.sshuttle_dir, 'sshuttle.conf').read_text() self.assertIn('PORT=22', conf) def test_invalid_user_uppercase_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(user='Tunnel')) self.assertFalse(result['ok']) self.assertIn('user', result['error']) def test_invalid_user_too_long_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(user='a' * 33)) self.assertFalse(result['ok']) def test_user_starting_with_digit_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(user='1user')) self.assertFalse(result['ok']) def test_invalid_auth_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(auth='agent')) self.assertFalse(result['ok']) self.assertIn('auth', result['error']) def test_missing_known_hosts_rejected(self): cfg = _valid_cfg() del cfg['known_hosts'] result = self.mgr.configure_sshuttle(cfg) self.assertFalse(result['ok']) self.assertIn('known_hosts', result['error']) def test_empty_known_hosts_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(known_hosts=' ')) self.assertFalse(result['ok']) def test_known_hosts_with_too_few_fields_rejected(self): result = self.mgr.configure_sshuttle( _valid_cfg(known_hosts='ssh.example.com ssh-ed25519')) self.assertFalse(result['ok']) def test_known_hosts_with_bad_keytype_rejected(self): result = self.mgr.configure_sshuttle( _valid_cfg(known_hosts='ssh.example.com ssh-dss AAAAB3Nza')) self.assertFalse(result['ok']) def test_known_hosts_with_non_base64_key_rejected(self): result = self.mgr.configure_sshuttle( _valid_cfg(known_hosts='ssh.example.com ssh-ed25519 not$base64!')) self.assertFalse(result['ok']) def test_multiline_known_hosts_rejected(self): kh = VALID_KNOWN_HOSTS + '\nother.example.com ssh-ed25519 AAAAC3Nza' result = self.mgr.configure_sshuttle(_valid_cfg(known_hosts=kh)) self.assertFalse(result['ok']) def test_strict_host_key_checking_no_rejected(self): result = self.mgr.configure_sshuttle( _valid_cfg(host='ssh.example.com -oStrictHostKeyChecking=no')) self.assertFalse(result['ok']) self.assertIn('StrictHostKeyChecking', result['error']) def test_strict_host_key_checking_no_in_known_hosts_rejected(self): result = self.mgr.configure_sshuttle( _valid_cfg(known_hosts='x StrictHostKeyChecking=no y')) self.assertFalse(result['ok']) def test_strict_host_key_checking_no_case_insensitive(self): result = self.mgr.configure_sshuttle( _valid_cfg(host='h stricthostkeychecking = NO')) self.assertFalse(result['ok']) def test_key_auth_without_private_key_rejected(self): cfg = _valid_cfg() del cfg['private_key'] result = self.mgr.configure_sshuttle(cfg) self.assertFalse(result['ok']) self.assertIn('private_key', result['error']) def test_key_auth_with_garbage_key_rejected(self): result = self.mgr.configure_sshuttle(_valid_cfg(private_key='not a key')) self.assertFalse(result['ok']) def test_password_auth_without_password_rejected(self): cfg = _valid_cfg(auth='password') del cfg['private_key'] result = self.mgr.configure_sshuttle(cfg) self.assertFalse(result['ok']) def test_password_auth_with_password_accepted(self): cfg = _valid_cfg(auth='password', password='s3cret') del cfg['private_key'] result = self.mgr.configure_sshuttle(cfg) self.assertTrue(result['ok']) def test_invalid_exclude_subnet_rejected(self): result = self.mgr.configure_sshuttle( _valid_cfg(exclude_subnets=['not-a-cidr'])) self.assertFalse(result['ok']) def test_result_never_contains_secrets(self): result = self.mgr.configure_sshuttle(_valid_cfg()) self.assertNotIn('PRIVATE KEY', str(result)) # --------------------------------------------------------------------------- # configure_sshuttle — file generation # --------------------------------------------------------------------------- class TestConfigureSshuttleFiles(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.vault = MagicMock() self.mgr = _make_manager(tmp_dir=self.tmp, vault_manager=self.vault) def tearDown(self): shutil.rmtree(self.tmp, ignore_errors=True) def test_sshuttle_conf_golden(self): self.mgr.configure_sshuttle(_valid_cfg( exclude_subnets=['172.20.0.0/16', '10.0.0.0/8'])) conf = Path(self.mgr.sshuttle_dir, 'sshuttle.conf').read_text() expected = ( 'HOST=ssh.example.com\n' 'PORT=22\n' 'USER=tunnel\n' 'AUTH=key\n' 'LISTEN_PORT=12300\n' 'EXCLUDE=172.20.0.0/16,10.0.0.0/8\n' ) self.assertEqual(conf, expected) def test_key_file_written_0600(self): self.mgr.configure_sshuttle(_valid_cfg()) key_path = Path(self.mgr.sshuttle_dir, 'id_pic') self.assertTrue(key_path.is_file()) mode = stat.S_IMODE(os.stat(key_path).st_mode) self.assertEqual(mode, 0o600) self.assertIn('PRIVATE KEY', key_path.read_text()) def test_known_hosts_file_written_0600(self): self.mgr.configure_sshuttle(_valid_cfg()) kh_path = Path(self.mgr.sshuttle_dir, 'known_hosts') self.assertTrue(kh_path.is_file()) mode = stat.S_IMODE(os.stat(kh_path).st_mode) self.assertEqual(mode, 0o600) self.assertEqual(kh_path.read_text(), VALID_KNOWN_HOSTS + '\n') def test_password_file_written_0600_for_password_auth(self): cfg = _valid_cfg(auth='password', password='s3cret') del cfg['private_key'] self.mgr.configure_sshuttle(cfg) pw_path = Path(self.mgr.sshuttle_dir, 'password') self.assertTrue(pw_path.is_file()) mode = stat.S_IMODE(os.stat(pw_path).st_mode) self.assertEqual(mode, 0o600) self.assertEqual(pw_path.read_text(), 's3cret\n') def test_default_excludes_contain_cell_subnet_and_rfc1918(self): self.mgr.configure_sshuttle(_valid_cfg()) conf = Path(self.mgr.sshuttle_dir, 'sshuttle.conf').read_text() for net in ('172.20.0.0/16', '10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'): self.assertIn(net, conf) def test_key_stored_in_vault(self): self.mgr.configure_sshuttle(_valid_cfg()) self.vault.store_secret.assert_called_once_with( 'connectivity_sshuttle_key', VALID_KEY) def test_password_stored_in_vault(self): cfg = _valid_cfg(auth='password', password='s3cret') del cfg['private_key'] self.mgr.configure_sshuttle(cfg) self.vault.store_secret.assert_called_once_with( 'connectivity_sshuttle_password', 's3cret') def test_non_secret_fields_persisted_in_config_manager(self): self.mgr.configure_sshuttle(_valid_cfg()) self.mgr.config_manager.set_connectivity_field.assert_called_once() field, exits = self.mgr.config_manager.set_connectivity_field.call_args[0] self.assertEqual(field, 'exits') self.assertEqual(exits['sshuttle']['host'], 'ssh.example.com') self.assertNotIn('private_key', exits['sshuttle']) self.assertNotIn('password', exits['sshuttle']) self.assertNotIn('known_hosts', exits['sshuttle']) def test_write_failure_returns_ok_false(self): with patch.object(self.mgr, '_write_secure', side_effect=OSError('disk full')): result = self.mgr.configure_sshuttle(_valid_cfg()) self.assertFalse(result['ok']) # --------------------------------------------------------------------------- # apply_routes — sshuttle REDIRECT # --------------------------------------------------------------------------- class TestApplyRoutesSshuttle(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.tmp, ignore_errors=True) @staticmethod def _ssh_conn(mark=0x1000, table=1000, redirect_port=9100): return {'id': 'conn_ssh', 'type': 'sshuttle', 'enabled': True, 'mark': mark, 'table': table, 'iface': None, 'redirect_port': redirect_port} def _cm(self, connections): cm = MagicMock() cm.get_identity.return_value = {'cell_name': 't', 'ip_range': '172.20.0.0/16'} cm.list_connections.return_value = connections cm.get_installed_services.return_value = {} return cm def test_sshuttle_peer_gets_redirect_to_instance_port(self): conn = self._ssh_conn(redirect_port=9100) pr = MagicMock() pr.list_peers.return_value = [{'peer': 'alice', 'exit_via': 'conn_ssh'}] pr.get_peer.return_value = {'peer': 'alice', 'ip': '172.20.0.50/32'} mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr, config_manager=self._cm([conn])) with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.return_value = _mock_subprocess_ok() mgr.apply_routes() redirect_calls = [ c for c in mock_sp.run.call_args_list if 'REDIRECT' in c.args[0] ] self.assertEqual(len(redirect_calls), 1) args = redirect_calls[0].args[0] self.assertIn('--to-ports', args) self.assertEqual(args[args.index('--to-ports') + 1], '9100') self.assertIn('172.20.0.50', args) def test_sshuttle_peer_gets_instance_mark(self): conn = self._ssh_conn(mark=0x1040) pr = MagicMock() pr.list_peers.return_value = [{'peer': 'alice', 'exit_via': 'conn_ssh'}] pr.get_peer.return_value = {'peer': 'alice', 'ip': '172.20.0.50/32'} mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr, config_manager=self._cm([conn])) with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.return_value = _mock_subprocess_ok() mgr.apply_routes() mark_calls = [ c for c in mock_sp.run.call_args_list if 'MARK' in c.args[0] and '172.20.0.50' in c.args[0] ] self.assertEqual(len(mark_calls), 1) args = mark_calls[0].args[0] self.assertEqual(args[args.index('--set-mark') + 1], hex(0x1040)) def test_ip_rule_added_for_instance_table(self): conn = self._ssh_conn(mark=0x1040, table=1399) mgr = _make_manager(tmp_dir=self.tmp, config_manager=self._cm([conn])) with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='') mgr.apply_routes() rule_adds = [ c for c in mock_sp.run.call_args_list if 'rule' in c.args[0] and 'add' in c.args[0] and hex(0x1040) in c.args[0] ] self.assertEqual(len(rule_adds), 1) self.assertIn('1399', rule_adds[0].args[0]) def test_no_killswitch_for_sshuttle(self): """sshuttle has no exit iface — _add_killswitch must skip it.""" mgr = _make_manager(tmp_dir=self.tmp) self.assertNotIn('sshuttle', mgr.IFACES) with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.return_value = _mock_subprocess_ok() mgr._add_killswitch(0x40, None) mock_sp.run.assert_not_called() # --------------------------------------------------------------------------- # _exit_status — sshuttle bridge # --------------------------------------------------------------------------- class TestSshuttleExitStatus(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.tmp, ignore_errors=True) def _mgr(self, installed=None): config_manager = MagicMock() config_manager.get_identity.return_value = {'ip_range': '172.20.0.0/16'} config_manager.get_installed_services.return_value = installed or {} return _make_manager(tmp_dir=self.tmp, config_manager=config_manager) def test_not_configured_initially(self): mgr = self._mgr() with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='') info = mgr._exit_status('sshuttle') self.assertFalse(info['configured']) self.assertEqual(info['status'], 'not_configured') def test_configured_after_configure_sshuttle(self): mgr = self._mgr() mgr.configure_sshuttle(_valid_cfg()) with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='') info = mgr._exit_status('sshuttle') self.assertTrue(info['configured']) self.assertEqual(info['status'], 'configured') def test_configured_when_store_service_installed(self): mgr = self._mgr(installed={'sshuttle': {'manifest': {'id': 'sshuttle'}}}) with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.return_value = MagicMock(returncode=1, stdout='', stderr='') info = mgr._exit_status('sshuttle') self.assertTrue(info['configured']) def test_configured_when_container_running(self): mgr = self._mgr() def fake_run(cmd, **kwargs): if 'inspect' in cmd and 'cell-sshuttle' in cmd: return MagicMock(returncode=0, stdout='true\n', stderr='') return MagicMock(returncode=1, stdout='', stderr='') with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.side_effect = fake_run info = mgr._exit_status('sshuttle') self.assertTrue(info['configured']) def test_sshuttle_in_list_exits(self): mgr = self._mgr() with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.return_value = _mock_subprocess_ok() exits = mgr.list_exits() types = {e['type'] for e in exits} self.assertIn('sshuttle', types) # --------------------------------------------------------------------------- # set_peer_exit accepts sshuttle # --------------------------------------------------------------------------- class TestSetPeerExitSshuttle(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.tmp, ignore_errors=True) def test_legacy_sshuttle_type_resolves_to_instance(self): """Back-compat shim: setting exit to the legacy 'sshuttle' type resolves to the single sshuttle connection instance.""" conn = {'id': 'conn_ssh', 'type': 'sshuttle'} cm = MagicMock() cm.get_identity.return_value = {'ip_range': '172.20.0.0/16'} cm.list_connections.return_value = [conn] cm.get_installed_services.return_value = {} pr = MagicMock() pr.list_peers.return_value = [] pr.get_peer.return_value = {'peer': 'alice', 'exit_via': 'conn_ssh'} def _set(name, value): # mimic the real registry shim resolution return value in ('default', 'conn_ssh', 'sshuttle') pr.set_peer_exit_via.side_effect = _set mgr = _make_manager(tmp_dir=self.tmp, peer_registry=pr, config_manager=cm) with patch.object(cm_module, 'subprocess') as mock_sp: mock_sp.run.return_value = _mock_subprocess_ok() result = mgr.set_peer_exit('alice', 'sshuttle') self.assertTrue(result['ok']) def test_peer_registry_accepts_sshuttle_legacy_type(self): """The peer registry resolves a legacy 'sshuttle' type to its instance id.""" from peer_registry import PeerRegistry cm = MagicMock() cm.list_connections.return_value = [{'id': 'conn_ssh', 'type': 'sshuttle'}] reg = PeerRegistry(data_dir=self.tmp, config_dir=self.tmp, config_manager=cm) reg.add_peer({'peer': 'alice', 'ip': '10.0.0.5'}) self.assertTrue(reg.set_peer_exit_via('alice', 'sshuttle')) self.assertEqual(reg.get_peer('alice')['exit_via'], 'conn_ssh') # --------------------------------------------------------------------------- # POST /api/connectivity/exits/sshuttle — route behaviour # --------------------------------------------------------------------------- class TestSshuttleRoute(unittest.TestCase): def setUp(self): import app as app_module self.app_module = app_module app_module.app.config['TESTING'] = True self.client = app_module.app.test_client() def test_valid_config_returns_200_ok_only(self): mock_cm = MagicMock() mock_cm.configure_sshuttle.return_value = {'ok': True} with patch.object(self.app_module, 'connectivity_manager', mock_cm): resp = self.client.post('/api/connectivity/exits/sshuttle', json=_valid_cfg()) self.assertEqual(resp.status_code, 200) self.assertEqual(resp.get_json(), {'ok': True}) def test_invalid_config_returns_400(self): mock_cm = MagicMock() mock_cm.configure_sshuttle.return_value = {'ok': False, 'error': 'invalid host'} with patch.object(self.app_module, 'connectivity_manager', mock_cm): resp = self.client.post('/api/connectivity/exits/sshuttle', json={'host': '!!'}) self.assertEqual(resp.status_code, 400) self.assertFalse(resp.get_json()['ok']) def test_response_never_echoes_private_key(self): mock_cm = MagicMock() mock_cm.configure_sshuttle.return_value = {'ok': True} with patch.object(self.app_module, 'connectivity_manager', mock_cm): resp = self.client.post('/api/connectivity/exits/sshuttle', json=_valid_cfg()) body = resp.get_data(as_text=True) self.assertNotIn('PRIVATE KEY', body) self.assertNotIn(VALID_KEY.splitlines()[1], body) def test_response_never_echoes_password(self): mock_cm = MagicMock() mock_cm.configure_sshuttle.return_value = {'ok': True} cfg = _valid_cfg(auth='password', password='hunter2-secret') del cfg['private_key'] with patch.object(self.app_module, 'connectivity_manager', mock_cm): resp = self.client.post('/api/connectivity/exits/sshuttle', json=cfg) self.assertNotIn('hunter2-secret', resp.get_data(as_text=True)) def test_exception_returns_500_without_details(self): mock_cm = MagicMock() mock_cm.configure_sshuttle.side_effect = Exception('boom PRIVATE stuff') with patch.object(self.app_module, 'connectivity_manager', mock_cm): resp = self.client.post('/api/connectivity/exits/sshuttle', json=_valid_cfg()) self.assertEqual(resp.status_code, 500) self.assertNotIn('PRIVATE', resp.get_data(as_text=True)) if __name__ == '__main__': unittest.main()