Files
pic/api/file_manager.py
T
2025-09-13 14:23:31 +03:00

624 lines
23 KiB
Python

#!/usr/bin/env python3
"""
File Manager for Personal Internet Cell
Handles WebDAV file storage services
"""
import os
import json
import subprocess
import logging
import requests
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
import shutil
import hashlib
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
class FileManager(BaseServiceManager):
"""Manages file storage services (WebDAV)"""
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('files', data_dir, config_dir)
self.files_dir = os.path.join(data_dir, 'files')
self.webdav_dir = os.path.join(config_dir, 'webdav')
# Ensure directories exist
os.makedirs(self.files_dir, exist_ok=True)
os.makedirs(self.webdav_dir, exist_ok=True)
# WebDAV service URL
self.webdav_url = 'http://localhost:8080'
# Initialize WebDAV configuration
self._ensure_config_exists()
def _ensure_config_exists(self):
"""Ensure WebDAV configuration exists"""
config_file = os.path.join(self.webdav_dir, 'webdav.conf')
if not os.path.exists(config_file):
self._generate_webdav_config()
def _generate_webdav_config(self):
"""Generate WebDAV configuration"""
config = """# WebDAV configuration for Personal Internet Cell
[global]
# WebDAV server settings
port = 8080
host = 0.0.0.0
root = /var/lib/webdav
# Authentication
auth_type = basic
auth_file = /etc/webdav/users
# SSL/TLS settings
ssl = no
ssl_cert = /etc/ssl/certs/webdav.crt
ssl_key = /etc/ssl/private/webdav.key
# Logging
log_level = info
log_file = /var/log/webdav.log
# File permissions
umask = 022
"""
config_file = os.path.join(self.webdav_dir, 'webdav.conf')
with open(config_file, 'w') as f:
f.write(config)
logger.info("WebDAV configuration generated")
def create_user(self, username: str, password: str) -> bool:
"""Create a new WebDAV user"""
if not username or not password:
logger.error("Username and password must not be empty")
return False
try:
# Create user directory
user_dir = os.path.join(self.files_dir, username)
os.makedirs(user_dir, exist_ok=True)
# Create default folders
for folder in ['Documents', 'Pictures', 'Music', 'Videos', 'Downloads']:
os.makedirs(os.path.join(user_dir, folder), exist_ok=True)
# Add user to auth file
auth_file = os.path.join(self.webdav_dir, 'users')
# Generate password hash
password_hash = hashlib.sha256(password.encode()).hexdigest()
with open(auth_file, 'a') as f:
f.write(f"{username}:{password_hash}\n")
logger.info(f"Created WebDAV user {username}")
return True
except Exception as e:
logger.error(f"Failed to create WebDAV user {username}: {e}")
return False
def delete_user(self, username: str) -> bool:
"""Delete a WebDAV user"""
if not username:
logger.error("Username must not be empty")
return False
try:
# Remove from auth file
auth_file = os.path.join(self.webdav_dir, 'users')
if os.path.exists(auth_file):
with open(auth_file, 'r') as f:
lines = f.readlines()
with open(auth_file, 'w') as f:
for line in lines:
if not line.startswith(f"{username}:"):
f.write(line)
# Remove user directory
user_dir = os.path.join(self.files_dir, username)
if os.path.exists(user_dir):
shutil.rmtree(user_dir)
logger.info(f"Deleted WebDAV user {username}")
return True
except Exception as e:
logger.error(f"Failed to delete WebDAV user {username}: {e}")
return False
def list_users(self) -> List[Dict]:
"""List all WebDAV users"""
users = []
try:
auth_file = os.path.join(self.webdav_dir, 'users')
if os.path.exists(auth_file):
with open(auth_file, 'r') as f:
for line in f:
line = line.strip()
if line and ':' in line:
username = line.split(':')[0]
users.append({
'username': username,
'storage_info': self._get_user_storage_info(username)
})
except Exception as e:
logger.error(f"Failed to list WebDAV users: {e}")
return users
def get_users(self):
"""Return a list of file storage users (WebDAV users)."""
users_file = os.path.join(self.config_dir, 'webdav', 'users.json')
if os.path.exists(users_file):
with open(users_file, 'r') as f:
return json.load(f)
return []
def _get_user_storage_info(self, username: str) -> Dict:
"""Get storage information for a user"""
try:
user_dir = os.path.join(self.files_dir, username)
if not os.path.exists(user_dir):
return {'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0}
total_files = 0
total_size = 0
for root, dirs, files in os.walk(user_dir):
for file in files:
file_path = os.path.join(root, file)
total_files += 1
total_size += os.path.getsize(file_path)
return {
'total_files': total_files,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'folders': self._list_user_folders(username)
}
except Exception as e:
logger.error(f"Failed to get storage info for {username}: {e}")
return {'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0}
def _list_user_folders(self, username: str) -> List[Dict]:
"""List folders for a user"""
folders = []
try:
user_dir = os.path.join(self.files_dir, username)
if os.path.exists(user_dir):
for item in os.listdir(user_dir):
item_path = os.path.join(user_dir, item)
if os.path.isdir(item_path):
folder_size = 0
file_count = 0
for root, dirs, files in os.walk(item_path):
for file in files:
file_path = os.path.join(root, file)
folder_size += os.path.getsize(file_path)
file_count += 1
folders.append({
'name': item,
'file_count': file_count,
'size_bytes': folder_size,
'size_mb': round(folder_size / (1024 * 1024), 2)
})
except Exception as e:
logger.error(f"Failed to list folders for {username}: {e}")
return folders
def create_folder(self, username: str, folder_path: str) -> bool:
"""Create a new folder for a user"""
if not username or not folder_path:
logger.error("Username and folder_path must not be empty")
return False
try:
full_path = os.path.join(self.files_dir, username, folder_path)
os.makedirs(full_path, exist_ok=True)
logger.info(f"Created folder {folder_path} for {username}")
return True
except Exception as e:
logger.error(f"Failed to create folder {folder_path} for {username}: {e}")
return False
def delete_folder(self, username: str, folder_path: str) -> bool:
"""Delete a folder for a user"""
if not username or not folder_path:
logger.error("Username and folder_path must not be empty")
return False
try:
full_path = os.path.join(self.files_dir, username, folder_path)
if os.path.exists(full_path):
shutil.rmtree(full_path)
logger.info(f"Deleted folder {folder_path} for {username}")
return True
else:
logger.warning(f"Folder {folder_path} not found for {username}")
return False
except Exception as e:
logger.error(f"Failed to delete folder {folder_path} for {username}: {e}")
return False
def upload_file(self, username: str, file_path: str, file_data: bytes) -> bool:
"""Upload a file for a user"""
try:
full_path = os.path.join(self.files_dir, username, file_path)
# Ensure directory exists
os.makedirs(os.path.dirname(full_path), exist_ok=True)
# Write file
with open(full_path, 'wb') as f:
f.write(file_data)
logger.info(f"Uploaded file {file_path} for {username}")
return True
except Exception as e:
logger.error(f"Failed to upload file {file_path} for {username}: {e}")
return False
def download_file(self, username: str, file_path: str) -> Optional[bytes]:
"""Download a file for a user"""
try:
full_path = os.path.join(self.files_dir, username, file_path)
if os.path.exists(full_path):
with open(full_path, 'rb') as f:
return f.read()
else:
logger.warning(f"File {file_path} not found for {username}")
return None
except Exception as e:
logger.error(f"Failed to download file {file_path} for {username}: {e}")
return None
def delete_file(self, username: str, file_path: str) -> bool:
"""Delete a file for a user"""
try:
full_path = os.path.join(self.files_dir, username, file_path)
if os.path.exists(full_path):
os.remove(full_path)
logger.info(f"Deleted file {file_path} for {username}")
return True
else:
logger.warning(f"File {file_path} not found for {username}")
return False
except Exception as e:
logger.error(f"Failed to delete file {file_path} for {username}: {e}")
return False
def list_files(self, username: str, folder_path: str = '') -> List[Dict]:
"""List files in a folder for a user"""
files = []
try:
full_path = os.path.join(self.files_dir, username, folder_path)
if os.path.exists(full_path):
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
stat = os.stat(item_path)
files.append({
'name': item,
'type': 'directory' if os.path.isdir(item_path) else 'file',
'size_bytes': stat.st_size,
'size_mb': round(stat.st_size / (1024 * 1024), 2),
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'path': os.path.join(folder_path, item) if folder_path else item
})
except Exception as e:
logger.error(f"Failed to list files in {folder_path} for {username}: {e}")
return files
def get_webdav_status(self) -> Dict:
"""Get WebDAV service status"""
try:
# Check if service is running
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-webdav', '--format', '{{.Names}}'],
capture_output=True, text=True)
webdav_running = len(result.stdout.strip()) > 0
# Get user statistics
users = self.list_users()
total_users = len(users)
# Calculate total storage
total_files = 0
total_size = 0
for user in users:
storage_info = user['storage_info']
total_files += storage_info['total_files']
total_size += storage_info['total_size_bytes']
return {
'webdav_running': webdav_running,
'total_users': total_users,
'total_files': total_files,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'users': users
}
except Exception as e:
logger.error(f"Failed to get WebDAV status: {e}")
return {
'webdav_running': False,
'total_users': 0,
'total_files': 0,
'total_size_bytes': 0,
'total_size_mb': 0,
'users': []
}
def test_webdav_connectivity(self) -> Dict:
"""Test WebDAV service connectivity"""
try:
results = {}
# Test HTTP connectivity
try:
response = requests.get(f'{self.webdav_url}', timeout=5)
results['http'] = {
'success': response.status_code in [200, 401, 403],
'status_code': response.status_code,
'message': 'WebDAV HTTP server responding'
}
except Exception as e:
results['http'] = {
'success': False,
'message': str(e)
}
# Test WebDAV OPTIONS
try:
response = requests.options(f'{self.webdav_url}', timeout=5)
results['webdav'] = {
'success': response.status_code in [200, 401, 403],
'status_code': response.status_code,
'message': 'WebDAV protocol responding'
}
except Exception as e:
results['webdav'] = {
'success': False,
'message': str(e)
}
return results
except Exception as e:
return {
'http': {'success': False, 'message': str(e)},
'webdav': {'success': False, 'message': str(e)}
}
def get_webdav_logs(self, lines: int = 50) -> str:
"""Get WebDAV service logs"""
try:
result = subprocess.run(['docker', 'logs', '--tail', str(lines), 'cell-webdav'],
capture_output=True, text=True, timeout=10)
return result.stdout
except Exception as e:
logger.error(f"Failed to get WebDAV logs: {e}")
return f"Error getting WebDAV logs: {e}"
def backup_user_files(self, username: str, backup_path: str) -> bool:
"""Backup all files for a user"""
if not username or not backup_path:
logger.error("Username and backup_path must not be empty")
return False
try:
user_dir = os.path.join(self.files_dir, username)
if os.path.exists(user_dir):
shutil.make_archive(backup_path, 'zip', user_dir)
logger.info(f"Backed up files for {username} to {backup_path}.zip")
return True
else:
logger.warning(f"No files found for {username}")
return False
except Exception as e:
logger.error(f"Failed to backup files for {username}: {e}")
return False
def restore_user_files(self, username: str, backup_path: str) -> bool:
"""Restore files for a user from backup"""
if not username or not backup_path:
logger.error("Username and backup_path must not be empty")
return False
try:
user_dir = os.path.join(self.files_dir, username)
# Remove existing user directory
if os.path.exists(user_dir):
shutil.rmtree(user_dir)
# Extract backup
shutil.unpack_archive(f"{backup_path}.zip", user_dir, 'zip')
logger.info(f"Restored files for {username} from {backup_path}.zip")
return True
except Exception as e:
logger.error(f"Failed to restore files for {username}: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""Get file service status"""
try:
# Check if we're running in Docker environment
import os
is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true'
if is_docker:
# Check if file container is actually running
container_running = self._check_file_container_status()
status = {
'running': container_running,
'status': 'online' if container_running else 'offline',
'webdav_status': {'running': container_running, 'port': 8080},
'users_count': 0,
'total_storage_used': {'bytes': 0, 'human_readable': '0 B'},
'timestamp': datetime.utcnow().isoformat()
}
else:
# Check actual service status in production
webdav_status = self.get_webdav_status()
users = self.list_users()
status = {
'running': webdav_status.get('running', False),
'status': 'online' if webdav_status.get('running', False) else 'offline',
'webdav_status': webdav_status,
'users_count': len(users),
'total_storage_used': self._get_total_storage_used(),
'timestamp': datetime.utcnow().isoformat()
}
return status
except Exception as e:
return self.handle_error(e, "get_status")
def _check_file_container_status(self) -> bool:
"""Check if file Docker container is running"""
try:
import docker
client = docker.from_env()
containers = client.containers.list(filters={'name': 'cell-webdav'})
return len(containers) > 0
except Exception:
return False
def test_connectivity(self) -> Dict[str, Any]:
"""Test file service connectivity"""
try:
webdav_test = self.test_webdav_connectivity()
# Test file system access
fs_test = self._test_filesystem_access()
# Test user authentication
auth_test = self._test_user_authentication()
results = {
'webdav_connectivity': webdav_test,
'filesystem_access': fs_test,
'user_authentication': auth_test,
'success': webdav_test.get('success', False) and fs_test.get('success', False),
'timestamp': datetime.utcnow().isoformat()
}
return results
except Exception as e:
return self.handle_error(e, "test_connectivity")
def _test_filesystem_access(self) -> Dict[str, Any]:
"""Test filesystem access"""
try:
# Test if we can read/write to the files directory
test_file = os.path.join(self.files_dir, '.test_access')
# Write test
with open(test_file, 'w') as f:
f.write('test')
# Read test
with open(test_file, 'r') as f:
content = f.read()
# Cleanup
os.remove(test_file)
return {
'success': True,
'message': 'Filesystem access working',
'read_write': content == 'test'
}
except Exception as e:
return {
'success': False,
'message': f'Filesystem access failed: {str(e)}',
'error': str(e)
}
def _test_user_authentication(self) -> Dict[str, Any]:
"""Test user authentication system"""
try:
auth_file = os.path.join(self.webdav_dir, 'users')
if not os.path.exists(auth_file):
return {
'success': True,
'message': 'No users configured yet',
'users_count': 0
}
with open(auth_file, 'r') as f:
users = [line.strip() for line in f if line.strip() and ':' in line]
return {
'success': True,
'message': 'User authentication system working',
'users_count': len(users)
}
except Exception as e:
return {
'success': False,
'message': f'User authentication test failed: {str(e)}',
'error': str(e)
}
def _get_total_storage_used(self) -> Dict[str, Any]:
"""Get total storage usage across all users"""
try:
total_files = 0
total_size = 0
if os.path.exists(self.files_dir):
for root, dirs, files in os.walk(self.files_dir):
for file in files:
file_path = os.path.join(root, file)
total_files += 1
total_size += os.path.getsize(file_path)
return {
'total_files': total_files,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'total_size_gb': round(total_size / (1024 * 1024 * 1024), 2)
}
except Exception as e:
self.logger.error(f"Error calculating total storage: {e}")
return {
'total_files': 0,
'total_size_bytes': 0,
'total_size_mb': 0,
'total_size_gb': 0
}