aa1e5c41ec
Unit Tests / test (push) Successful in 12m6s
Coverage was below acceptable levels and several newly-added code paths (sshuttle egress, proxy egress, DDNS provider stubs, DNS overview route, peer-registry provisioning) had zero test coverage. ~250 new unit tests are added across 16 new test files. Existing test files are updated to match refactored interfaces (DHCP removed, constants introduced, network_manager restructured). .coveragerc is added to pin the source mapping and the 70% floor so regressions are caught at commit time. tests/test_enhanced_api.py was previously living in api/ (wrong location) and is moved to tests/ where it belongs. Integration test files are updated to remove references to DHCP endpoints and add coverage for the new DNS overview and DDNS sync endpoints. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
316 lines
14 KiB
Python
316 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Additional tests for FileManager covering uncovered paths:
|
|
- _safe_path traversal rejection
|
|
- create_user: duplicate (file already has user), invalid username
|
|
- delete_user: not in file, invalid username
|
|
- list_users with actual htpasswd file
|
|
- get_users from users.json
|
|
- _get_user_storage_info for nonexistent dir
|
|
- _list_user_folders
|
|
- create_folder / delete_folder edge cases
|
|
- upload_file / download_file / delete_file edge cases
|
|
- list_files edge cases
|
|
- backup_user_files invalid username
|
|
- restore_user_files invalid username / nonexistent backup
|
|
- get_status in docker and non-docker mode
|
|
- _test_filesystem_access
|
|
- _test_user_authentication
|
|
- test_connectivity
|
|
- get_webdav_status (mocked subprocess)
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
api_dir = Path(__file__).parent.parent / 'api'
|
|
sys.path.insert(0, str(api_dir))
|
|
|
|
from file_manager import FileManager
|
|
|
|
|
|
class TestFileManagerExtra(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp()
|
|
self.data_dir = os.path.join(self.tmp, 'data')
|
|
self.config_dir = os.path.join(self.tmp, 'config')
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
os.makedirs(self.config_dir, exist_ok=True)
|
|
self.fm = FileManager(data_dir=self.data_dir, config_dir=self.config_dir)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp)
|
|
|
|
# ── _safe_path ──────────────────────────────────────────────────────────
|
|
|
|
def test_safe_path_traversal_raises(self):
|
|
with self.assertRaises(ValueError):
|
|
self.fm._safe_path('alice', '../../../etc/passwd')
|
|
|
|
def test_safe_path_invalid_username_raises(self):
|
|
with self.assertRaises(ValueError):
|
|
self.fm._safe_path('user with space', 'file.txt')
|
|
|
|
def test_safe_path_valid_returns_path_under_files_dir(self):
|
|
path = self.fm._safe_path('alice', 'Documents', 'readme.txt')
|
|
self.assertTrue(path.startswith(self.fm.files_dir))
|
|
|
|
# ── create_user ─────────────────────────────────────────────────────────
|
|
|
|
def test_create_user_invalid_username_chars_returns_false(self):
|
|
result = self.fm.create_user('bad user!', 'password')
|
|
self.assertFalse(result)
|
|
|
|
def test_create_user_creates_default_folders(self):
|
|
result = self.fm.create_user('alice', 'secret')
|
|
self.assertTrue(result)
|
|
user_dir = os.path.join(self.fm.files_dir, 'alice')
|
|
for folder in ('Documents', 'Pictures', 'Music', 'Videos', 'Downloads'):
|
|
self.assertTrue(os.path.isdir(os.path.join(user_dir, folder)))
|
|
|
|
def test_create_user_bcrypt_hash_uses_2y_prefix(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
auth_file = os.path.join(self.fm.webdav_dir, 'users')
|
|
with open(auth_file) as f:
|
|
line = f.read()
|
|
self.assertIn('$2y$', line)
|
|
self.assertNotIn('$2b$', line)
|
|
|
|
def test_create_user_appends_to_existing_file(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
self.fm.create_user('bob', 'secret2')
|
|
auth_file = os.path.join(self.fm.webdav_dir, 'users')
|
|
with open(auth_file) as f:
|
|
lines = [l for l in f if l.strip()]
|
|
self.assertEqual(len(lines), 2)
|
|
|
|
# ── delete_user ─────────────────────────────────────────────────────────
|
|
|
|
def test_delete_user_invalid_username_returns_false(self):
|
|
result = self.fm.delete_user('bad user!')
|
|
self.assertFalse(result)
|
|
|
|
def test_delete_user_not_in_auth_file_still_returns_true(self):
|
|
"""Delete succeeds even if user was never in the auth file."""
|
|
result = self.fm.delete_user('nobody')
|
|
self.assertTrue(result)
|
|
|
|
def test_delete_user_removes_only_matching_line(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
self.fm.create_user('bob', 'secret2')
|
|
self.fm.delete_user('alice')
|
|
auth_file = os.path.join(self.fm.webdav_dir, 'users')
|
|
with open(auth_file) as f:
|
|
content = f.read()
|
|
self.assertNotIn('alice:', content)
|
|
self.assertIn('bob:', content)
|
|
|
|
# ── list_users ──────────────────────────────────────────────────────────
|
|
|
|
def test_list_users_returns_usernames_from_auth_file(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
self.fm.create_user('bob', 'secret2')
|
|
users = self.fm.list_users()
|
|
usernames = [u['username'] for u in users]
|
|
self.assertIn('alice', usernames)
|
|
self.assertIn('bob', usernames)
|
|
|
|
def test_list_users_skips_malformed_lines(self):
|
|
auth_file = os.path.join(self.fm.webdav_dir, 'users')
|
|
with open(auth_file, 'w') as f:
|
|
f.write('alicehash\n') # no colon
|
|
f.write('bob:hash\n')
|
|
users = self.fm.list_users()
|
|
usernames = [u['username'] for u in users]
|
|
self.assertNotIn('alicehash', usernames)
|
|
self.assertIn('bob', usernames)
|
|
|
|
# ── get_users ───────────────────────────────────────────────────────────
|
|
|
|
def test_get_users_returns_empty_when_no_file(self):
|
|
result = self.fm.get_users()
|
|
self.assertEqual(result, [])
|
|
|
|
def test_get_users_returns_list_from_json(self):
|
|
import json
|
|
webdav_dir = os.path.join(self.config_dir, 'webdav')
|
|
os.makedirs(webdav_dir, exist_ok=True)
|
|
users_file = os.path.join(webdav_dir, 'users.json')
|
|
with open(users_file, 'w') as f:
|
|
json.dump([{'username': 'alice'}], f)
|
|
result = self.fm.get_users()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]['username'], 'alice')
|
|
|
|
# ── _get_user_storage_info ───────────────────────────────────────────────
|
|
|
|
def test_storage_info_nonexistent_dir_returns_zeros(self):
|
|
info = self.fm._get_user_storage_info('nobody')
|
|
self.assertEqual(info['total_files'], 0)
|
|
self.assertEqual(info['total_size_bytes'], 0)
|
|
|
|
def test_storage_info_counts_files(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
self.fm.upload_file('alice', 'Documents/a.txt', b'hello')
|
|
self.fm.upload_file('alice', 'Documents/b.txt', b'world')
|
|
info = self.fm._get_user_storage_info('alice')
|
|
self.assertGreaterEqual(info['total_files'], 2)
|
|
|
|
# ── _list_user_folders ───────────────────────────────────────────────────
|
|
|
|
def test_list_user_folders_returns_list(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
folders = self.fm._list_user_folders('alice')
|
|
self.assertIsInstance(folders, list)
|
|
names = [f['name'] for f in folders]
|
|
self.assertIn('Documents', names)
|
|
|
|
def test_list_user_folders_empty_for_nonexistent_user(self):
|
|
folders = self.fm._list_user_folders('nobody')
|
|
self.assertEqual(folders, [])
|
|
|
|
# ── create_folder / delete_folder edge cases ────────────────────────────
|
|
|
|
def test_create_folder_traversal_returns_false(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
result = self.fm.create_folder('alice', '../../../tmp/evil')
|
|
self.assertFalse(result)
|
|
|
|
def test_delete_folder_nonexistent_returns_false(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
result = self.fm.delete_folder('alice', 'NoSuchFolder')
|
|
self.assertFalse(result)
|
|
|
|
def test_delete_folder_traversal_returns_false(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
result = self.fm.delete_folder('alice', '../../../tmp/evil')
|
|
self.assertFalse(result)
|
|
|
|
# ── upload / download / delete edge cases ───────────────────────────────
|
|
|
|
def test_download_file_nonexistent_returns_none(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
result = self.fm.download_file('alice', 'nope.txt')
|
|
self.assertIsNone(result)
|
|
|
|
def test_delete_file_nonexistent_returns_false(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
result = self.fm.delete_file('alice', 'nope.txt')
|
|
self.assertFalse(result)
|
|
|
|
def test_upload_file_creates_parent_dirs(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
result = self.fm.upload_file('alice', 'deep/nested/file.txt', b'data')
|
|
self.assertTrue(result)
|
|
path = self.fm._safe_path('alice', 'deep/nested/file.txt')
|
|
self.assertTrue(os.path.exists(path))
|
|
|
|
# ── list_files ──────────────────────────────────────────────────────────
|
|
|
|
def test_list_files_shows_files_and_dirs(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
self.fm.upload_file('alice', 'docs/readme.txt', b'hello')
|
|
files = self.fm.list_files('alice', 'docs')
|
|
self.assertEqual(len(files), 1)
|
|
self.assertEqual(files[0]['type'], 'file')
|
|
|
|
def test_list_files_empty_folder(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
files = self.fm.list_files('alice', 'Videos')
|
|
self.assertIsInstance(files, list)
|
|
self.assertEqual(len(files), 0)
|
|
|
|
# ── backup / restore ────────────────────────────────────────────────────
|
|
|
|
def test_backup_user_files_invalid_username_returns_false(self):
|
|
result = self.fm.backup_user_files('../../etc', '/tmp/backup')
|
|
self.assertFalse(result)
|
|
|
|
def test_backup_user_files_empty_username_returns_false(self):
|
|
result = self.fm.backup_user_files('', '/tmp/backup')
|
|
self.assertFalse(result)
|
|
|
|
def test_restore_user_files_invalid_username_returns_false(self):
|
|
result = self.fm.restore_user_files('../../etc', '/tmp/backup')
|
|
self.assertFalse(result)
|
|
|
|
def test_restore_user_files_nonexistent_backup_returns_false(self):
|
|
result = self.fm.restore_user_files('alice', '/tmp/nonexistent_backup')
|
|
self.assertFalse(result)
|
|
|
|
# ── _test_filesystem_access ─────────────────────────────────────────────
|
|
|
|
def test_test_filesystem_access_succeeds(self):
|
|
result = self.fm._test_filesystem_access()
|
|
self.assertTrue(result['success'])
|
|
self.assertTrue(result['read_write'])
|
|
|
|
# ── _test_user_authentication ───────────────────────────────────────────
|
|
|
|
def test_test_user_authentication_no_file_returns_success(self):
|
|
result = self.fm._test_user_authentication()
|
|
self.assertTrue(result['success'])
|
|
self.assertEqual(result['users_count'], 0)
|
|
|
|
def test_test_user_authentication_counts_users(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
self.fm.create_user('bob', 'secret2')
|
|
result = self.fm._test_user_authentication()
|
|
self.assertTrue(result['success'])
|
|
self.assertEqual(result['users_count'], 2)
|
|
|
|
# ── test_connectivity ───────────────────────────────────────────────────
|
|
|
|
@patch('requests.get')
|
|
@patch('requests.options')
|
|
def test_test_connectivity_returns_dict_with_expected_keys(self, mock_options, mock_get):
|
|
mock_get.side_effect = Exception('connection refused')
|
|
mock_options.side_effect = Exception('connection refused')
|
|
result = self.fm.test_connectivity()
|
|
self.assertIn('webdav_connectivity', result)
|
|
self.assertIn('filesystem_access', result)
|
|
self.assertIn('user_authentication', result)
|
|
self.assertIn('success', result)
|
|
|
|
# ── get_status ──────────────────────────────────────────────────────────
|
|
|
|
@patch.dict(os.environ, {'DOCKER_CONTAINER': 'true'})
|
|
def test_get_status_docker_mode_returns_dict(self):
|
|
result = self.fm.get_status()
|
|
self.assertIn('running', result)
|
|
self.assertIn('status', result)
|
|
|
|
@patch.dict(os.environ, {'DOCKER_CONTAINER': 'false'})
|
|
@patch('subprocess.run')
|
|
@patch('requests.get')
|
|
@patch('requests.options')
|
|
def test_get_status_non_docker_mode(self, mock_opt, mock_get, mock_run):
|
|
mock_run.return_value = MagicMock(returncode=0, stdout='', stderr='')
|
|
mock_get.side_effect = Exception('refused')
|
|
mock_opt.side_effect = Exception('refused')
|
|
result = self.fm.get_status()
|
|
self.assertIn('running', result)
|
|
|
|
# ── _get_total_storage_used ─────────────────────────────────────────────
|
|
|
|
def test_get_total_storage_used_empty_dir(self):
|
|
result = self.fm._get_total_storage_used()
|
|
self.assertEqual(result['total_files'], 0)
|
|
self.assertEqual(result['total_size_bytes'], 0)
|
|
|
|
def test_get_total_storage_used_with_files(self):
|
|
self.fm.create_user('alice', 'secret')
|
|
self.fm.upload_file('alice', 'Documents/test.txt', b'hello world')
|
|
result = self.fm._get_total_storage_used()
|
|
self.assertGreater(result['total_files'], 0)
|
|
self.assertGreater(result['total_size_bytes'], 0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|