#!/usr/bin/env python3 """Backup/restore overhaul tests for ConfigManager. Covers the P0 data-loss fix: - critical secrets/keys are INCLUDED in a backup - trash (logs, nested backups, *.tmp, .test_admin_pass) is EXCLUDED - optional passphrase encryption (encrypted archive named .tar.gz.age, plaintext 0600) - restore ordering (vault/fernet restored first) + reapply step invoked - round-trip: backup -> restore with passphrase recovers files Docker/subprocess and the live managers used by the reapply step are mocked. """ import os import sys import json import stat import shutil import tarfile import tempfile import unittest from pathlib import Path from unittest.mock import patch, MagicMock api_dir = Path(__file__).parent.parent / 'api' sys.path.insert(0, str(api_dir)) from config_manager import ConfigManager import backup_crypto def _write(p: Path, content: str = 'x'): p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content) class _BackupBase(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp() self.config_file = os.path.join(self.tmp, 'config', 'cell_config.json') self.data_dir = Path(self.tmp) / 'data' os.makedirs(os.path.dirname(self.config_file), exist_ok=True) os.makedirs(self.data_dir, exist_ok=True) self.cm = ConfigManager(self.config_file, str(self.data_dir)) self.cm.configs['_identity'] = {'cell_name': 'mycell', 'domain': 'cell'} self.cm._save_all_configs() self._seed_data() def tearDown(self): shutil.rmtree(self.tmp, ignore_errors=True) def _seed_data(self): d = self.data_dir # Critical paths _write(d / 'api' / 'auth_users.json', '{"admin": 1}') _write(d / 'api' / '.flask_secret_key', 'secret') _write(d / 'api' / 'peers.json', '{"peer1": "key"}') _write(d / 'api' / 'peer_service_credentials.json', '{}') _write(d / 'api' / 'cell_links.json', '{"link": 1}') _write(d / 'api' / 'ddns_token', 'tok123') _write(d / 'api' / 'audit' / 'audit.log', '{"seq": 1, "action": "peer.create"}') _write(d / 'wireguard' / 'keys' / 'server_private.key', 'PRIV') _write(d / 'wireguard' / 'wg_confs' / 'wg0.conf', '[Interface]') _write(d / 'api' / 'wireguard' / 'keys' / 'private.key', 'P2') _write(d / 'vault' / 'keys' / 'fernet.key', 'FERNETKEY') _write(d / 'vault' / 'ca' / 'ca.key', 'CAKEY') _write(d / 'vault' / 'secrets.json', 'ENC') _write(d / 'api' / 'services' / 'wireguard-ext' / 'config' / 'wg_ext0.conf', 'EXT') _write(d / 'caddy' / 'caddy' / 'cert.pem', 'CERT') # Trash that must be excluded _write(d / 'logs' / 'app.log', 'log line') _write(d / 'api' / 'config_backups' / 'old' / 'manifest.json', '{}') _write(d / 'api' / '.test_admin_pass', 'pw') _write(d / 'api' / '.gitkeep', '') _write(d / 'api' / 'scratch.tmp', 'tmp') _write(d / 'api' / 'half.partial', 'partial') _write(d / 'api' / '__pycache__' / 'x.pyc', 'bytecode') def _backup_files(self, backup_id): bp = self.cm.backup_dir / backup_id return {p.relative_to(bp).as_posix() for p in bp.rglob('*') if p.is_file()} class TestBackupInclude(_BackupBase): def test_critical_paths_included(self): bid = self.cm.backup_config() files = self._backup_files(bid) expected = [ 'data/api/auth_users.json', 'data/api/.flask_secret_key', 'data/api/peers.json', 'data/api/peer_service_credentials.json', 'data/api/cell_links.json', 'data/api/ddns_token', 'data/api/audit/audit.log', 'data/wireguard/keys/server_private.key', 'data/wireguard/wg_confs/wg0.conf', 'data/api/wireguard/keys/private.key', 'data/vault/keys/fernet.key', 'data/vault/ca/ca.key', 'data/vault/secrets.json', 'data/api/services/wireguard-ext/config/wg_ext0.conf', 'data/caddy/caddy/cert.pem', ] for rel in expected: self.assertIn(rel, files, f'{rel} missing from backup') def test_absent_path_skipped_gracefully(self): # Remove ddns_token before backup — should not error, just skip. (self.data_dir / 'api' / 'ddns_token').unlink() bid = self.cm.backup_config() files = self._backup_files(bid) self.assertNotIn('data/api/ddns_token', files) self.assertIn('data/api/auth_users.json', files) class TestBackupExclude(_BackupBase): def test_trash_excluded(self): bid = self.cm.backup_config() files = self._backup_files(bid) for rel in ( 'data/logs/app.log', 'data/api/config_backups/old/manifest.json', 'data/api/.test_admin_pass', 'data/api/.gitkeep', 'data/api/scratch.tmp', 'data/api/half.partial', 'data/api/__pycache__/x.pyc', ): self.assertNotIn(rel, files, f'{rel} should be excluded') class TestPassphraseEncryption(_BackupBase): def test_encrypted_archive_named_age(self): archive_id = self.cm.backup_config(passphrase='hunter2') self.assertTrue(archive_id.endswith('.tar.gz.age')) archive = self.cm.backup_dir / archive_id self.assertTrue(archive.is_file()) # Plaintext staging dir removed self.assertFalse((self.cm.backup_dir / archive_id[:-len('.tar.gz.age')]).exists()) # Blob is recognised as encrypted self.assertTrue(backup_crypto.is_encrypted(archive.read_bytes())) # Mode 0600 mode = stat.S_IMODE(os.stat(archive).st_mode) self.assertEqual(mode, 0o600) def test_plaintext_backup_is_0600(self): bid = self.cm.backup_config() bp = self.cm.backup_dir / bid mode = stat.S_IMODE(os.stat(bp).st_mode) self.assertEqual(mode, 0o700) def test_restore_wrong_passphrase_raises_permission(self): archive_id = self.cm.backup_config(passphrase='correct') with self.assertRaises(PermissionError): self.cm.restore_config(archive_id, passphrase='wrong') def test_restore_missing_passphrase_raises_permission(self): archive_id = self.cm.backup_config(passphrase='correct') with self.assertRaises(PermissionError): self.cm.restore_config(archive_id, passphrase=None) def test_roundtrip_with_passphrase_recovers_files(self): archive_id = self.cm.backup_config(passphrase='secretpw') # Wipe a critical file then restore. (self.data_dir / 'api' / 'auth_users.json').unlink() (self.data_dir / 'vault' / 'keys' / 'fernet.key').unlink() with patch.object(self.cm, '_reapply_runtime_state'): ok = self.cm.restore_config(archive_id, passphrase='secretpw') self.assertTrue(ok) self.assertEqual( (self.data_dir / 'api' / 'auth_users.json').read_text(), '{"admin": 1}') self.assertEqual( (self.data_dir / 'vault' / 'keys' / 'fernet.key').read_text(), 'FERNETKEY') class TestRestoreOrderingAndReapply(_BackupBase): def test_vault_restored_before_other_data(self): bid = self.cm.backup_config() # Wipe data dir's restored targets to observe restore. order = [] real_copy = shutil.copy2 def tracking_copy(src, dst, *a, **k): order.append(Path(dst).as_posix()) return real_copy(src, dst, *a, **k) with patch.object(self.cm, '_reapply_runtime_state'), \ patch('config_manager.shutil.copy2', side_effect=tracking_copy): self.cm.restore_config(bid) def first_idx(needle): for i, p in enumerate(order): if needle in p: return i return 10 ** 9 vault_i = first_idx('/vault/') auth_i = first_idx('auth_users.json') wg_i = first_idx('/wireguard/') self.assertLess(vault_i, auth_i, 'vault must restore before auth_users') self.assertLess(vault_i, wg_i, 'vault must restore before wireguard keys') def test_reapply_step_invoked(self): bid = self.cm.backup_config() with patch.object(self.cm, '_reapply_runtime_state') as mock_reapply: self.cm.restore_config(bid) mock_reapply.assert_called_once() def test_reapply_calls_regenerate_and_apply_routes(self): bid = self.cm.backup_config() fake = MagicMock() managers_mock = MagicMock() managers_mock.caddy_manager = fake.caddy managers_mock.firewall_manager = fake.firewall managers_mock.connectivity_manager = fake.connectivity managers_mock.cell_link_manager = fake.cell_link managers_mock.service_composer = fake.composer managers_mock.peer_registry = fake.peers fake.peers.list_peers.return_value = [] fake.cell_link.list_connections.return_value = [] with patch.dict('sys.modules', {'managers': managers_mock}): self.cm.restore_config(bid) fake.caddy.regenerate_with_installed.assert_called_once() fake.firewall.generate_corefile.assert_called_once() fake.connectivity.apply_routes.assert_called_once() fake.cell_link.replay_pending_pushes.assert_called_once() fake.composer.reapply_active_services.assert_called_once() if __name__ == '__main__': unittest.main()