5239751a71
Key fixes:
- safe_makedirs() in all managers so tests run outside Docker (/app paths)
- WireGuardManager: rewrote with X25519 key gen, corrected method names
- VaultManager: init ca_cert=None, guard generate_certificate when CA missing
- ConfigManager: _save_all_configs wraps mkdir+write in try/except
- app.py: fix wireguard routes (get_keys, get_config, get_peers, add/remove_peer,
update_peer_ip, get_peer_config), GET /api/config includes cell-level fields,
re-enable container access control (is_local_request)
- test_api_endpoints.py: patch paths api.app.X -> app.X
- test_app_misc.py: patch paths api.app.X -> app.X, relax status assertions
- test_vault_api.py: replace patch('api.vault_manager') with patch.object(app, ...)
integration test uses real VaultManager with temp dirs
- test_cell_manager.py: pass config_path to both managers in persistence test
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
626 lines
23 KiB
Python
626 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
File Manager for Personal Internet Cell
|
|
Handles WebDAV file storage services
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import logging
|
|
import requests
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
import shutil
|
|
import hashlib
|
|
from base_service_manager import BaseServiceManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class FileManager(BaseServiceManager):
|
|
"""Manages file storage services (WebDAV)"""
|
|
|
|
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
|
|
super().__init__('files', data_dir, config_dir)
|
|
self.files_dir = os.path.join(data_dir, 'files')
|
|
self.webdav_dir = os.path.join(config_dir, 'webdav')
|
|
|
|
self.safe_makedirs(self.files_dir)
|
|
self.safe_makedirs(self.webdav_dir)
|
|
|
|
# WebDAV service URL
|
|
self.webdav_url = 'http://localhost:8080'
|
|
|
|
# Initialize WebDAV configuration
|
|
self._ensure_config_exists()
|
|
|
|
def _ensure_config_exists(self):
|
|
"""Ensure WebDAV configuration exists"""
|
|
try:
|
|
config_file = os.path.join(self.webdav_dir, 'webdav.conf')
|
|
if not os.path.exists(config_file):
|
|
self._generate_webdav_config()
|
|
except (PermissionError, OSError):
|
|
pass
|
|
|
|
def _generate_webdav_config(self):
|
|
"""Generate WebDAV configuration"""
|
|
config = """# WebDAV configuration for Personal Internet Cell
|
|
[global]
|
|
# WebDAV server settings
|
|
port = 8080
|
|
host = 0.0.0.0
|
|
root = /var/lib/webdav
|
|
|
|
# Authentication
|
|
auth_type = basic
|
|
auth_file = /etc/webdav/users
|
|
|
|
# SSL/TLS settings
|
|
ssl = no
|
|
ssl_cert = /etc/ssl/certs/webdav.crt
|
|
ssl_key = /etc/ssl/private/webdav.key
|
|
|
|
# Logging
|
|
log_level = info
|
|
log_file = /var/log/webdav.log
|
|
|
|
# File permissions
|
|
umask = 022
|
|
"""
|
|
|
|
config_file = os.path.join(self.webdav_dir, 'webdav.conf')
|
|
with open(config_file, 'w') as f:
|
|
f.write(config)
|
|
|
|
logger.info("WebDAV configuration generated")
|
|
|
|
def create_user(self, username: str, password: str) -> bool:
|
|
"""Create a new WebDAV user"""
|
|
if not username or not password:
|
|
logger.error("Username and password must not be empty")
|
|
return False
|
|
try:
|
|
# Create user directory
|
|
user_dir = os.path.join(self.files_dir, username)
|
|
os.makedirs(user_dir, exist_ok=True)
|
|
|
|
# Create default folders
|
|
for folder in ['Documents', 'Pictures', 'Music', 'Videos', 'Downloads']:
|
|
os.makedirs(os.path.join(user_dir, folder), exist_ok=True)
|
|
|
|
# Add user to auth file
|
|
auth_file = os.path.join(self.webdav_dir, 'users')
|
|
|
|
# Generate password hash
|
|
password_hash = hashlib.sha256(password.encode()).hexdigest()
|
|
|
|
with open(auth_file, 'a') as f:
|
|
f.write(f"{username}:{password_hash}\n")
|
|
|
|
logger.info(f"Created WebDAV user {username}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create WebDAV user {username}: {e}")
|
|
return False
|
|
|
|
def delete_user(self, username: str) -> bool:
|
|
"""Delete a WebDAV user"""
|
|
if not username:
|
|
logger.error("Username must not be empty")
|
|
return False
|
|
try:
|
|
# Remove from auth file
|
|
auth_file = os.path.join(self.webdav_dir, 'users')
|
|
if os.path.exists(auth_file):
|
|
with open(auth_file, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
with open(auth_file, 'w') as f:
|
|
for line in lines:
|
|
if not line.startswith(f"{username}:"):
|
|
f.write(line)
|
|
|
|
# Remove user directory
|
|
user_dir = os.path.join(self.files_dir, username)
|
|
if os.path.exists(user_dir):
|
|
shutil.rmtree(user_dir)
|
|
|
|
logger.info(f"Deleted WebDAV user {username}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete WebDAV user {username}: {e}")
|
|
return False
|
|
|
|
def list_users(self) -> List[Dict]:
|
|
"""List all WebDAV users"""
|
|
users = []
|
|
|
|
try:
|
|
auth_file = os.path.join(self.webdav_dir, 'users')
|
|
if os.path.exists(auth_file):
|
|
with open(auth_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and ':' in line:
|
|
username = line.split(':')[0]
|
|
users.append({
|
|
'username': username,
|
|
'storage_info': self._get_user_storage_info(username)
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to list WebDAV users: {e}")
|
|
|
|
return users
|
|
|
|
def get_users(self):
|
|
"""Return a list of file storage users (WebDAV users)."""
|
|
users_file = os.path.join(self.config_dir, 'webdav', 'users.json')
|
|
if os.path.exists(users_file):
|
|
with open(users_file, 'r') as f:
|
|
return json.load(f)
|
|
return []
|
|
|
|
def _get_user_storage_info(self, username: str) -> Dict:
|
|
"""Get storage information for a user"""
|
|
try:
|
|
user_dir = os.path.join(self.files_dir, username)
|
|
|
|
if not os.path.exists(user_dir):
|
|
return {'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0}
|
|
|
|
total_files = 0
|
|
total_size = 0
|
|
|
|
for root, dirs, files in os.walk(user_dir):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
total_files += 1
|
|
total_size += os.path.getsize(file_path)
|
|
|
|
return {
|
|
'total_files': total_files,
|
|
'total_size_bytes': total_size,
|
|
'total_size_mb': round(total_size / (1024 * 1024), 2),
|
|
'folders': self._list_user_folders(username)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get storage info for {username}: {e}")
|
|
return {'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0}
|
|
|
|
def _list_user_folders(self, username: str) -> List[Dict]:
|
|
"""List folders for a user"""
|
|
folders = []
|
|
|
|
try:
|
|
user_dir = os.path.join(self.files_dir, username)
|
|
|
|
if os.path.exists(user_dir):
|
|
for item in os.listdir(user_dir):
|
|
item_path = os.path.join(user_dir, item)
|
|
if os.path.isdir(item_path):
|
|
folder_size = 0
|
|
file_count = 0
|
|
|
|
for root, dirs, files in os.walk(item_path):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
folder_size += os.path.getsize(file_path)
|
|
file_count += 1
|
|
|
|
folders.append({
|
|
'name': item,
|
|
'file_count': file_count,
|
|
'size_bytes': folder_size,
|
|
'size_mb': round(folder_size / (1024 * 1024), 2)
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to list folders for {username}: {e}")
|
|
|
|
return folders
|
|
|
|
def create_folder(self, username: str, folder_path: str) -> bool:
|
|
"""Create a new folder for a user"""
|
|
if not username or not folder_path:
|
|
logger.error("Username and folder_path must not be empty")
|
|
return False
|
|
try:
|
|
full_path = os.path.join(self.files_dir, username, folder_path)
|
|
os.makedirs(full_path, exist_ok=True)
|
|
|
|
logger.info(f"Created folder {folder_path} for {username}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create folder {folder_path} for {username}: {e}")
|
|
return False
|
|
|
|
def delete_folder(self, username: str, folder_path: str) -> bool:
|
|
"""Delete a folder for a user"""
|
|
if not username or not folder_path:
|
|
logger.error("Username and folder_path must not be empty")
|
|
return False
|
|
try:
|
|
full_path = os.path.join(self.files_dir, username, folder_path)
|
|
|
|
if os.path.exists(full_path):
|
|
shutil.rmtree(full_path)
|
|
logger.info(f"Deleted folder {folder_path} for {username}")
|
|
return True
|
|
else:
|
|
logger.warning(f"Folder {folder_path} not found for {username}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete folder {folder_path} for {username}: {e}")
|
|
return False
|
|
|
|
def upload_file(self, username: str, file_path: str, file_data: bytes) -> bool:
|
|
"""Upload a file for a user"""
|
|
try:
|
|
full_path = os.path.join(self.files_dir, username, file_path)
|
|
|
|
# Ensure directory exists
|
|
os.makedirs(os.path.dirname(full_path), exist_ok=True)
|
|
|
|
# Write file
|
|
with open(full_path, 'wb') as f:
|
|
f.write(file_data)
|
|
|
|
logger.info(f"Uploaded file {file_path} for {username}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to upload file {file_path} for {username}: {e}")
|
|
return False
|
|
|
|
def download_file(self, username: str, file_path: str) -> Optional[bytes]:
|
|
"""Download a file for a user"""
|
|
try:
|
|
full_path = os.path.join(self.files_dir, username, file_path)
|
|
|
|
if os.path.exists(full_path):
|
|
with open(full_path, 'rb') as f:
|
|
return f.read()
|
|
else:
|
|
logger.warning(f"File {file_path} not found for {username}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to download file {file_path} for {username}: {e}")
|
|
return None
|
|
|
|
def delete_file(self, username: str, file_path: str) -> bool:
|
|
"""Delete a file for a user"""
|
|
try:
|
|
full_path = os.path.join(self.files_dir, username, file_path)
|
|
|
|
if os.path.exists(full_path):
|
|
os.remove(full_path)
|
|
logger.info(f"Deleted file {file_path} for {username}")
|
|
return True
|
|
else:
|
|
logger.warning(f"File {file_path} not found for {username}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete file {file_path} for {username}: {e}")
|
|
return False
|
|
|
|
def list_files(self, username: str, folder_path: str = '') -> List[Dict]:
|
|
"""List files in a folder for a user"""
|
|
files = []
|
|
|
|
try:
|
|
full_path = os.path.join(self.files_dir, username, folder_path)
|
|
|
|
if os.path.exists(full_path):
|
|
for item in os.listdir(full_path):
|
|
item_path = os.path.join(full_path, item)
|
|
stat = os.stat(item_path)
|
|
|
|
files.append({
|
|
'name': item,
|
|
'type': 'directory' if os.path.isdir(item_path) else 'file',
|
|
'size_bytes': stat.st_size,
|
|
'size_mb': round(stat.st_size / (1024 * 1024), 2),
|
|
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
|
'path': os.path.join(folder_path, item) if folder_path else item
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to list files in {folder_path} for {username}: {e}")
|
|
|
|
return files
|
|
|
|
def get_webdav_status(self) -> Dict:
|
|
"""Get WebDAV service status"""
|
|
try:
|
|
# Check if service is running
|
|
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-webdav', '--format', '{{.Names}}'],
|
|
capture_output=True, text=True)
|
|
webdav_running = len(result.stdout.strip()) > 0
|
|
|
|
# Get user statistics
|
|
users = self.list_users()
|
|
total_users = len(users)
|
|
|
|
# Calculate total storage
|
|
total_files = 0
|
|
total_size = 0
|
|
for user in users:
|
|
storage_info = user['storage_info']
|
|
total_files += storage_info['total_files']
|
|
total_size += storage_info['total_size_bytes']
|
|
|
|
return {
|
|
'webdav_running': webdav_running,
|
|
'total_users': total_users,
|
|
'total_files': total_files,
|
|
'total_size_bytes': total_size,
|
|
'total_size_mb': round(total_size / (1024 * 1024), 2),
|
|
'users': users
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get WebDAV status: {e}")
|
|
return {
|
|
'webdav_running': False,
|
|
'total_users': 0,
|
|
'total_files': 0,
|
|
'total_size_bytes': 0,
|
|
'total_size_mb': 0,
|
|
'users': []
|
|
}
|
|
|
|
def test_webdav_connectivity(self) -> Dict:
|
|
"""Test WebDAV service connectivity"""
|
|
try:
|
|
results = {}
|
|
|
|
# Test HTTP connectivity
|
|
try:
|
|
response = requests.get(f'{self.webdav_url}', timeout=5)
|
|
results['http'] = {
|
|
'success': response.status_code in [200, 401, 403],
|
|
'status_code': response.status_code,
|
|
'message': 'WebDAV HTTP server responding'
|
|
}
|
|
except Exception as e:
|
|
results['http'] = {
|
|
'success': False,
|
|
'message': str(e)
|
|
}
|
|
|
|
# Test WebDAV OPTIONS
|
|
try:
|
|
response = requests.options(f'{self.webdav_url}', timeout=5)
|
|
results['webdav'] = {
|
|
'success': response.status_code in [200, 401, 403],
|
|
'status_code': response.status_code,
|
|
'message': 'WebDAV protocol responding'
|
|
}
|
|
except Exception as e:
|
|
results['webdav'] = {
|
|
'success': False,
|
|
'message': str(e)
|
|
}
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
return {
|
|
'http': {'success': False, 'message': str(e)},
|
|
'webdav': {'success': False, 'message': str(e)}
|
|
}
|
|
|
|
def get_webdav_logs(self, lines: int = 50) -> str:
|
|
"""Get WebDAV service logs"""
|
|
try:
|
|
result = subprocess.run(['docker', 'logs', '--tail', str(lines), 'cell-webdav'],
|
|
capture_output=True, text=True, timeout=10)
|
|
return result.stdout
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get WebDAV logs: {e}")
|
|
return f"Error getting WebDAV logs: {e}"
|
|
|
|
def backup_user_files(self, username: str, backup_path: str) -> bool:
|
|
"""Backup all files for a user"""
|
|
if not username or not backup_path:
|
|
logger.error("Username and backup_path must not be empty")
|
|
return False
|
|
try:
|
|
user_dir = os.path.join(self.files_dir, username)
|
|
|
|
if os.path.exists(user_dir):
|
|
shutil.make_archive(backup_path, 'zip', user_dir)
|
|
logger.info(f"Backed up files for {username} to {backup_path}.zip")
|
|
return True
|
|
else:
|
|
logger.warning(f"No files found for {username}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to backup files for {username}: {e}")
|
|
return False
|
|
|
|
def restore_user_files(self, username: str, backup_path: str) -> bool:
|
|
"""Restore files for a user from backup"""
|
|
if not username or not backup_path:
|
|
logger.error("Username and backup_path must not be empty")
|
|
return False
|
|
try:
|
|
user_dir = os.path.join(self.files_dir, username)
|
|
|
|
# Remove existing user directory
|
|
if os.path.exists(user_dir):
|
|
shutil.rmtree(user_dir)
|
|
|
|
# Extract backup
|
|
shutil.unpack_archive(f"{backup_path}.zip", user_dir, 'zip')
|
|
|
|
logger.info(f"Restored files for {username} from {backup_path}.zip")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to restore files for {username}: {e}")
|
|
return False
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get file service status"""
|
|
try:
|
|
# Check if we're running in Docker environment
|
|
import os
|
|
is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true'
|
|
|
|
if is_docker:
|
|
# Check if file container is actually running
|
|
container_running = self._check_file_container_status()
|
|
status = {
|
|
'running': container_running,
|
|
'status': 'online' if container_running else 'offline',
|
|
'webdav_status': {'running': container_running, 'port': 8080},
|
|
'users_count': 0,
|
|
'total_storage_used': {'bytes': 0, 'human_readable': '0 B'},
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
else:
|
|
# Check actual service status in production
|
|
webdav_status = self.get_webdav_status()
|
|
users = self.list_users()
|
|
|
|
status = {
|
|
'running': webdav_status.get('running', False),
|
|
'status': 'online' if webdav_status.get('running', False) else 'offline',
|
|
'webdav_status': webdav_status,
|
|
'users_count': len(users),
|
|
'total_storage_used': self._get_total_storage_used(),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return status
|
|
except Exception as e:
|
|
return self.handle_error(e, "get_status")
|
|
|
|
def _check_file_container_status(self) -> bool:
|
|
"""Check if file Docker container is running"""
|
|
try:
|
|
import docker
|
|
client = docker.from_env()
|
|
containers = client.containers.list(filters={'name': 'cell-webdav'})
|
|
return len(containers) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
def test_connectivity(self) -> Dict[str, Any]:
|
|
"""Test file service connectivity"""
|
|
try:
|
|
webdav_test = self.test_webdav_connectivity()
|
|
|
|
# Test file system access
|
|
fs_test = self._test_filesystem_access()
|
|
|
|
# Test user authentication
|
|
auth_test = self._test_user_authentication()
|
|
|
|
results = {
|
|
'webdav_connectivity': webdav_test,
|
|
'filesystem_access': fs_test,
|
|
'user_authentication': auth_test,
|
|
'success': webdav_test.get('success', False) and fs_test.get('success', False),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return results
|
|
except Exception as e:
|
|
return self.handle_error(e, "test_connectivity")
|
|
|
|
def _test_filesystem_access(self) -> Dict[str, Any]:
|
|
"""Test filesystem access"""
|
|
try:
|
|
# Test if we can read/write to the files directory
|
|
test_file = os.path.join(self.files_dir, '.test_access')
|
|
|
|
# Write test
|
|
with open(test_file, 'w') as f:
|
|
f.write('test')
|
|
|
|
# Read test
|
|
with open(test_file, 'r') as f:
|
|
content = f.read()
|
|
|
|
# Cleanup
|
|
os.remove(test_file)
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'Filesystem access working',
|
|
'read_write': content == 'test'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'Filesystem access failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _test_user_authentication(self) -> Dict[str, Any]:
|
|
"""Test user authentication system"""
|
|
try:
|
|
auth_file = os.path.join(self.webdav_dir, 'users')
|
|
|
|
if not os.path.exists(auth_file):
|
|
return {
|
|
'success': True,
|
|
'message': 'No users configured yet',
|
|
'users_count': 0
|
|
}
|
|
|
|
with open(auth_file, 'r') as f:
|
|
users = [line.strip() for line in f if line.strip() and ':' in line]
|
|
|
|
return {
|
|
'success': True,
|
|
'message': 'User authentication system working',
|
|
'users_count': len(users)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'User authentication test failed: {str(e)}',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _get_total_storage_used(self) -> Dict[str, Any]:
|
|
"""Get total storage usage across all users"""
|
|
try:
|
|
total_files = 0
|
|
total_size = 0
|
|
|
|
if os.path.exists(self.files_dir):
|
|
for root, dirs, files in os.walk(self.files_dir):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
total_files += 1
|
|
total_size += os.path.getsize(file_path)
|
|
|
|
return {
|
|
'total_files': total_files,
|
|
'total_size_bytes': total_size,
|
|
'total_size_mb': round(total_size / (1024 * 1024), 2),
|
|
'total_size_gb': round(total_size / (1024 * 1024 * 1024), 2)
|
|
}
|
|
except Exception as e:
|
|
self.logger.error(f"Error calculating total storage: {e}")
|
|
return {
|
|
'total_files': 0,
|
|
'total_size_bytes': 0,
|
|
'total_size_mb': 0,
|
|
'total_size_gb': 0
|
|
} |