#!/usr/bin/env python3 """ File Manager for Personal Internet Cell Handles WebDAV file storage services """ import os import json import subprocess import logging import requests from datetime import datetime from typing import Dict, List, Optional, Tuple, Any import shutil import hashlib from base_service_manager import BaseServiceManager logger = logging.getLogger(__name__) class FileManager(BaseServiceManager): """Manages file storage services (WebDAV)""" def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'): super().__init__('files', data_dir, config_dir) self.files_dir = os.path.join(data_dir, 'files') self.webdav_dir = os.path.join(config_dir, 'webdav') self.safe_makedirs(self.files_dir) self.safe_makedirs(self.webdav_dir) # WebDAV service URL self.webdav_url = 'http://cell-webdav:80' # Initialize WebDAV configuration self._ensure_config_exists() def _ensure_config_exists(self): """Ensure WebDAV configuration exists""" try: config_file = os.path.join(self.webdav_dir, 'webdav.conf') if not os.path.exists(config_file): self._generate_webdav_config() except (PermissionError, OSError): pass def _generate_webdav_config(self): """Generate WebDAV configuration""" config = """# WebDAV configuration for Personal Internet Cell [global] # WebDAV server settings port = 8080 host = 0.0.0.0 root = /var/lib/webdav # Authentication auth_type = basic auth_file = /etc/webdav/users # SSL/TLS settings ssl = no ssl_cert = /etc/ssl/certs/webdav.crt ssl_key = /etc/ssl/private/webdav.key # Logging log_level = info log_file = /var/log/webdav.log # File permissions umask = 022 """ config_file = os.path.join(self.webdav_dir, 'webdav.conf') with open(config_file, 'w') as f: f.write(config) logger.info("WebDAV configuration generated") def create_user(self, username: str, password: str) -> bool: """Create a new WebDAV user""" if not username or not password: logger.error("Username and password must not be empty") return False try: # Create user directory user_dir = os.path.join(self.files_dir, username) os.makedirs(user_dir, exist_ok=True) # Create default folders for folder in ['Documents', 'Pictures', 'Music', 'Videos', 'Downloads']: os.makedirs(os.path.join(user_dir, folder), exist_ok=True) # Add user to auth file auth_file = os.path.join(self.webdav_dir, 'users') # Generate password hash password_hash = hashlib.sha256(password.encode()).hexdigest() with open(auth_file, 'a') as f: f.write(f"{username}:{password_hash}\n") logger.info(f"Created WebDAV user {username}") return True except Exception as e: logger.error(f"Failed to create WebDAV user {username}: {e}") return False def delete_user(self, username: str) -> bool: """Delete a WebDAV user""" if not username: logger.error("Username must not be empty") return False try: # Remove from auth file auth_file = os.path.join(self.webdav_dir, 'users') if os.path.exists(auth_file): with open(auth_file, 'r') as f: lines = f.readlines() with open(auth_file, 'w') as f: for line in lines: if not line.startswith(f"{username}:"): f.write(line) # Remove user directory user_dir = os.path.join(self.files_dir, username) if os.path.exists(user_dir): shutil.rmtree(user_dir) logger.info(f"Deleted WebDAV user {username}") return True except Exception as e: logger.error(f"Failed to delete WebDAV user {username}: {e}") return False def list_users(self) -> List[Dict]: """List all WebDAV users""" users = [] try: auth_file = os.path.join(self.webdav_dir, 'users') if os.path.exists(auth_file): with open(auth_file, 'r') as f: for line in f: line = line.strip() if line and ':' in line: username = line.split(':')[0] users.append({ 'username': username, 'storage_info': self._get_user_storage_info(username) }) except Exception as e: logger.error(f"Failed to list WebDAV users: {e}") return users def get_users(self): """Return a list of file storage users (WebDAV users).""" users_file = os.path.join(self.config_dir, 'webdav', 'users.json') if os.path.exists(users_file): with open(users_file, 'r') as f: return json.load(f) return [] def _get_user_storage_info(self, username: str) -> Dict: """Get storage information for a user""" try: user_dir = os.path.join(self.files_dir, username) if not os.path.exists(user_dir): return {'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0} total_files = 0 total_size = 0 for root, dirs, files in os.walk(user_dir): for file in files: file_path = os.path.join(root, file) total_files += 1 total_size += os.path.getsize(file_path) return { 'total_files': total_files, 'total_size_bytes': total_size, 'total_size_mb': round(total_size / (1024 * 1024), 2), 'folders': self._list_user_folders(username) } except Exception as e: logger.error(f"Failed to get storage info for {username}: {e}") return {'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0} def _list_user_folders(self, username: str) -> List[Dict]: """List folders for a user""" folders = [] try: user_dir = os.path.join(self.files_dir, username) if os.path.exists(user_dir): for item in os.listdir(user_dir): item_path = os.path.join(user_dir, item) if os.path.isdir(item_path): folder_size = 0 file_count = 0 for root, dirs, files in os.walk(item_path): for file in files: file_path = os.path.join(root, file) folder_size += os.path.getsize(file_path) file_count += 1 folders.append({ 'name': item, 'file_count': file_count, 'size_bytes': folder_size, 'size_mb': round(folder_size / (1024 * 1024), 2) }) except Exception as e: logger.error(f"Failed to list folders for {username}: {e}") return folders def create_folder(self, username: str, folder_path: str) -> bool: """Create a new folder for a user""" if not username or not folder_path: logger.error("Username and folder_path must not be empty") return False try: full_path = os.path.join(self.files_dir, username, folder_path) os.makedirs(full_path, exist_ok=True) logger.info(f"Created folder {folder_path} for {username}") return True except Exception as e: logger.error(f"Failed to create folder {folder_path} for {username}: {e}") return False def delete_folder(self, username: str, folder_path: str) -> bool: """Delete a folder for a user""" if not username or not folder_path: logger.error("Username and folder_path must not be empty") return False try: full_path = os.path.join(self.files_dir, username, folder_path) if os.path.exists(full_path): shutil.rmtree(full_path) logger.info(f"Deleted folder {folder_path} for {username}") return True else: logger.warning(f"Folder {folder_path} not found for {username}") return False except Exception as e: logger.error(f"Failed to delete folder {folder_path} for {username}: {e}") return False def upload_file(self, username: str, file_path: str, file_data: bytes) -> bool: """Upload a file for a user""" try: full_path = os.path.join(self.files_dir, username, file_path) # Ensure directory exists os.makedirs(os.path.dirname(full_path), exist_ok=True) # Write file with open(full_path, 'wb') as f: f.write(file_data) logger.info(f"Uploaded file {file_path} for {username}") return True except Exception as e: logger.error(f"Failed to upload file {file_path} for {username}: {e}") return False def download_file(self, username: str, file_path: str) -> Optional[bytes]: """Download a file for a user""" try: full_path = os.path.join(self.files_dir, username, file_path) if os.path.exists(full_path): with open(full_path, 'rb') as f: return f.read() else: logger.warning(f"File {file_path} not found for {username}") return None except Exception as e: logger.error(f"Failed to download file {file_path} for {username}: {e}") return None def delete_file(self, username: str, file_path: str) -> bool: """Delete a file for a user""" try: full_path = os.path.join(self.files_dir, username, file_path) if os.path.exists(full_path): os.remove(full_path) logger.info(f"Deleted file {file_path} for {username}") return True else: logger.warning(f"File {file_path} not found for {username}") return False except Exception as e: logger.error(f"Failed to delete file {file_path} for {username}: {e}") return False def list_files(self, username: str, folder_path: str = '') -> List[Dict]: """List files in a folder for a user""" files = [] try: full_path = os.path.join(self.files_dir, username, folder_path) if os.path.exists(full_path): for item in os.listdir(full_path): item_path = os.path.join(full_path, item) stat = os.stat(item_path) files.append({ 'name': item, 'type': 'directory' if os.path.isdir(item_path) else 'file', 'size_bytes': stat.st_size, 'size_mb': round(stat.st_size / (1024 * 1024), 2), 'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'path': os.path.join(folder_path, item) if folder_path else item }) except Exception as e: logger.error(f"Failed to list files in {folder_path} for {username}: {e}") return files def get_webdav_status(self) -> Dict: """Get WebDAV service status""" try: # Check if service is running result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-webdav', '--format', '{{.Names}}'], capture_output=True, text=True) webdav_running = len(result.stdout.strip()) > 0 # Get user statistics users = self.list_users() total_users = len(users) # Calculate total storage total_files = 0 total_size = 0 for user in users: storage_info = user['storage_info'] total_files += storage_info['total_files'] total_size += storage_info['total_size_bytes'] return { 'webdav_running': webdav_running, 'total_users': total_users, 'total_files': total_files, 'total_size_bytes': total_size, 'total_size_mb': round(total_size / (1024 * 1024), 2), 'users': users } except Exception as e: logger.error(f"Failed to get WebDAV status: {e}") return { 'webdav_running': False, 'total_users': 0, 'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0, 'users': [] } def test_webdav_connectivity(self) -> Dict: """Test WebDAV service connectivity""" try: results = {} # Test HTTP connectivity try: response = requests.get(f'{self.webdav_url}', timeout=5) results['http'] = { 'success': response.status_code in [200, 401, 403], 'status_code': response.status_code, 'message': 'WebDAV HTTP server responding' } except Exception as e: results['http'] = { 'success': False, 'message': str(e) } # Test WebDAV OPTIONS try: response = requests.options(f'{self.webdav_url}', timeout=5) results['webdav'] = { 'success': response.status_code in [200, 401, 403], 'status_code': response.status_code, 'message': 'WebDAV protocol responding' } except Exception as e: results['webdav'] = { 'success': False, 'message': str(e) } results['success'] = results.get('http', {}).get('success', False) return results except Exception as e: return { 'success': False, 'http': {'success': False, 'message': str(e)}, 'webdav': {'success': False, 'message': str(e)} } def get_webdav_logs(self, lines: int = 50) -> str: """Get WebDAV service logs""" try: result = subprocess.run(['docker', 'logs', '--tail', str(lines), 'cell-webdav'], capture_output=True, text=True, timeout=10) return result.stdout except Exception as e: logger.error(f"Failed to get WebDAV logs: {e}") return f"Error getting WebDAV logs: {e}" def backup_user_files(self, username: str, backup_path: str) -> bool: """Backup all files for a user""" if not username or not backup_path: logger.error("Username and backup_path must not be empty") return False try: user_dir = os.path.join(self.files_dir, username) if os.path.exists(user_dir): shutil.make_archive(backup_path, 'zip', user_dir) logger.info(f"Backed up files for {username} to {backup_path}.zip") return True else: logger.warning(f"No files found for {username}") return False except Exception as e: logger.error(f"Failed to backup files for {username}: {e}") return False def restore_user_files(self, username: str, backup_path: str) -> bool: """Restore files for a user from backup""" if not username or not backup_path: logger.error("Username and backup_path must not be empty") return False try: user_dir = os.path.join(self.files_dir, username) # Remove existing user directory if os.path.exists(user_dir): shutil.rmtree(user_dir) # Extract backup shutil.unpack_archive(f"{backup_path}.zip", user_dir, 'zip') logger.info(f"Restored files for {username} from {backup_path}.zip") return True except Exception as e: logger.error(f"Failed to restore files for {username}: {e}") return False def get_status(self) -> Dict[str, Any]: """Get file service status""" try: # Check if we're running in Docker environment import os is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true' if is_docker: # Check if file container is actually running container_running = self._check_file_container_status() status = { 'running': container_running, 'status': 'online' if container_running else 'offline', 'webdav_status': {'running': container_running, 'port': 8080}, 'users_count': 0, 'total_storage_used': {'bytes': 0, 'human_readable': '0 B'}, 'timestamp': datetime.utcnow().isoformat() } else: # Check actual service status in production webdav_status = self.get_webdav_status() users = self.list_users() status = { 'running': webdav_status.get('running', False), 'status': 'online' if webdav_status.get('running', False) else 'offline', 'webdav_status': webdav_status, 'users_count': len(users), 'total_storage_used': self._get_total_storage_used(), 'timestamp': datetime.utcnow().isoformat() } return status except Exception as e: return self.handle_error(e, "get_status") def _check_file_container_status(self) -> bool: """Check if file Docker container is running""" try: import docker client = docker.from_env() containers = client.containers.list(filters={'name': 'cell-webdav'}) return len(containers) > 0 except Exception: return False def test_connectivity(self) -> Dict[str, Any]: """Test file service connectivity""" try: webdav_test = self.test_webdav_connectivity() # Test file system access fs_test = self._test_filesystem_access() # Test user authentication auth_test = self._test_user_authentication() results = { 'webdav_connectivity': webdav_test, 'filesystem_access': fs_test, 'user_authentication': auth_test, 'success': webdav_test.get('success', False) and fs_test.get('success', False), 'timestamp': datetime.utcnow().isoformat() } return results except Exception as e: return self.handle_error(e, "test_connectivity") def _test_filesystem_access(self) -> Dict[str, Any]: """Test filesystem access""" try: # Test if we can read/write to the files directory test_file = os.path.join(self.files_dir, '.test_access') # Write test with open(test_file, 'w') as f: f.write('test') # Read test with open(test_file, 'r') as f: content = f.read() # Cleanup os.remove(test_file) return { 'success': True, 'message': 'Filesystem access working', 'read_write': content == 'test' } except Exception as e: return { 'success': False, 'message': f'Filesystem access failed: {str(e)}', 'error': str(e) } def _test_user_authentication(self) -> Dict[str, Any]: """Test user authentication system""" try: auth_file = os.path.join(self.webdav_dir, 'users') if not os.path.exists(auth_file): return { 'success': True, 'message': 'No users configured yet', 'users_count': 0 } with open(auth_file, 'r') as f: users = [line.strip() for line in f if line.strip() and ':' in line] return { 'success': True, 'message': 'User authentication system working', 'users_count': len(users) } except Exception as e: return { 'success': False, 'message': f'User authentication test failed: {str(e)}', 'error': str(e) } def _get_total_storage_used(self) -> Dict[str, Any]: """Get total storage usage across all users""" try: total_files = 0 total_size = 0 if os.path.exists(self.files_dir): for root, dirs, files in os.walk(self.files_dir): for file in files: file_path = os.path.join(root, file) total_files += 1 total_size += os.path.getsize(file_path) return { 'total_files': total_files, 'total_size_bytes': total_size, 'total_size_mb': round(total_size / (1024 * 1024), 2), 'total_size_gb': round(total_size / (1024 * 1024 * 1024), 2) } except Exception as e: self.logger.error(f"Error calculating total storage: {e}") return { 'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0, 'total_size_gb': 0 }