Files
pic/api/file_manager.py
T
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

691 lines
27 KiB
Python

#!/usr/bin/env python3
"""
File Manager for Personal Internet Cell
Handles WebDAV file storage services
"""
import os
import re
import json
import subprocess
import logging
import requests
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
import shutil
import hashlib
import bcrypt
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
class FileManager(BaseServiceManager):
"""Manages file storage services (WebDAV)"""
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('files', data_dir, config_dir)
self.files_dir = os.path.join(data_dir, 'files')
self.webdav_dir = os.path.join(config_dir, 'webdav')
self.safe_makedirs(self.files_dir)
self.safe_makedirs(self.webdav_dir)
# WebDAV service URL
self.webdav_url = 'http://cell-webdav:80'
# Initialize WebDAV configuration
self._ensure_config_exists()
def _ensure_config_exists(self):
"""Ensure WebDAV configuration exists"""
try:
config_file = os.path.join(self.webdav_dir, 'webdav.conf')
if not os.path.exists(config_file):
self._generate_webdav_config()
except (PermissionError, OSError):
pass
def _safe_path(self, username: str, *parts: str) -> str:
"""Resolve a safe path under files_dir/username.
Whitelists username, joins extra parts, resolves to a real path, and
asserts the result is contained within the user's directory. Raises
ValueError on any sign of path traversal or invalid input.
"""
if not isinstance(username, str) or not re.match(r'^[A-Za-z0-9_.-]{1,64}$', username):
raise ValueError(f"Invalid username: {username!r}")
safe_parts = []
for p in parts:
if p is None:
continue
if not isinstance(p, str):
raise ValueError(f"Invalid path component: {p!r}")
safe_parts.append(p)
user_root = os.path.realpath(os.path.join(self.files_dir, username))
candidate = os.path.realpath(os.path.join(self.files_dir, username, *safe_parts))
if candidate != user_root and not candidate.startswith(user_root + os.sep):
raise ValueError(f"Path traversal detected for user {username!r}: {parts!r}")
return candidate
def _generate_webdav_config(self):
"""Generate WebDAV configuration"""
config = """# WebDAV configuration for Personal Internet Cell
[global]
# WebDAV server settings
port = 8080
host = 0.0.0.0
root = /var/lib/webdav
# Authentication
auth_type = basic
auth_file = /etc/webdav/users
# SSL/TLS settings
ssl = no
ssl_cert = /etc/ssl/certs/webdav.crt
ssl_key = /etc/ssl/private/webdav.key
# Logging
log_level = info
log_file = /var/log/webdav.log
# File permissions
umask = 022
"""
config_file = os.path.join(self.webdav_dir, 'webdav.conf')
with open(config_file, 'w') as f:
f.write(config)
logger.info("WebDAV configuration generated")
def create_user(self, username: str, password: str) -> bool:
"""Create a new WebDAV user"""
if not username or not password:
logger.error("Username and password must not be empty")
return False
# Validate username — prevents path traversal in user_dir join below and
# injection of newlines / colons into the htpasswd-format auth file.
if not isinstance(username, str) or not re.match(r'^[A-Za-z0-9._-]{1,64}$', username):
logger.error(f"create_user: invalid username {username!r}")
return False
try:
# Create user directory (containment check)
user_dir = os.path.realpath(os.path.join(self.files_dir, username))
files_root = os.path.realpath(self.files_dir)
if not user_dir.startswith(files_root + os.sep):
logger.error(f"create_user: path traversal for username {username!r}")
return False
os.makedirs(user_dir, exist_ok=True)
# Create default folders
for folder in ['Documents', 'Pictures', 'Music', 'Videos', 'Downloads']:
os.makedirs(os.path.join(user_dir, folder), exist_ok=True)
# Add user to auth file
auth_file = os.path.join(self.webdav_dir, 'users')
# Generate bcrypt hash; convert $2b$ -> $2y$ for Apache htpasswd compatibility
# (bytemark/webdav is Apache-based; htpasswd-bcrypt uses $2y$ prefix).
bcrypt_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
if bcrypt_hash.startswith('$2b$'):
bcrypt_hash = '$2y$' + bcrypt_hash[4:]
password_hash = bcrypt_hash
with open(auth_file, 'a') as f:
f.write(f"{username}:{password_hash}\n")
logger.info(f"Created WebDAV user {username}")
return True
except Exception as e:
logger.error(f"Failed to create WebDAV user {username}: {e}")
return False
def delete_user(self, username: str) -> bool:
"""Delete a WebDAV user"""
if not username:
logger.error("Username must not be empty")
return False
# Validate username before any auth-file rewrite or filesystem ops
if not isinstance(username, str) or not re.match(r'^[A-Za-z0-9._-]{1,64}$', username):
logger.error(f"delete_user: invalid username {username!r}")
return False
try:
# Remove from auth file
auth_file = os.path.join(self.webdav_dir, 'users')
if os.path.exists(auth_file):
with open(auth_file, 'r') as f:
lines = f.readlines()
with open(auth_file, 'w') as f:
for line in lines:
if not line.startswith(f"{username}:"):
f.write(line)
# Remove user directory — containment check prevents
# username='..' or 'foo/../../etc' from escaping files_dir.
user_dir = os.path.realpath(os.path.join(self.files_dir, username))
files_root = os.path.realpath(self.files_dir)
if not user_dir.startswith(files_root + os.sep):
logger.error(f"delete_user: path traversal for username {username!r}")
return False
if os.path.exists(user_dir):
shutil.rmtree(user_dir)
logger.info(f"Deleted WebDAV user {username}")
return True
except Exception as e:
logger.error(f"Failed to delete WebDAV user {username}: {e}")
return False
def list_users(self) -> List[Dict]:
"""List all WebDAV users"""
users = []
try:
auth_file = os.path.join(self.webdav_dir, 'users')
if os.path.exists(auth_file):
with open(auth_file, 'r') as f:
for line in f:
line = line.strip()
if line and ':' in line:
username = line.split(':')[0]
users.append({
'username': username,
'storage_info': self._get_user_storage_info(username)
})
except Exception as e:
logger.error(f"Failed to list WebDAV users: {e}")
return users
def get_users(self):
"""Return a list of file storage users (WebDAV users)."""
users_file = os.path.join(self.config_dir, 'webdav', 'users.json')
if os.path.exists(users_file):
with open(users_file, 'r') as f:
return json.load(f)
return []
def _get_user_storage_info(self, username: str) -> Dict:
"""Get storage information for a user"""
try:
user_dir = os.path.join(self.files_dir, username)
if not os.path.exists(user_dir):
return {'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0}
total_files = 0
total_size = 0
for root, dirs, files in os.walk(user_dir):
for file in files:
file_path = os.path.join(root, file)
total_files += 1
total_size += os.path.getsize(file_path)
return {
'total_files': total_files,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'folders': self._list_user_folders(username)
}
except Exception as e:
logger.error(f"Failed to get storage info for {username}: {e}")
return {'total_files': 0, 'total_size_bytes': 0, 'total_size_mb': 0}
def _list_user_folders(self, username: str) -> List[Dict]:
"""List folders for a user"""
folders = []
try:
user_dir = os.path.join(self.files_dir, username)
if os.path.exists(user_dir):
for item in os.listdir(user_dir):
item_path = os.path.join(user_dir, item)
if os.path.isdir(item_path):
folder_size = 0
file_count = 0
for root, dirs, files in os.walk(item_path):
for file in files:
file_path = os.path.join(root, file)
folder_size += os.path.getsize(file_path)
file_count += 1
folders.append({
'name': item,
'file_count': file_count,
'size_bytes': folder_size,
'size_mb': round(folder_size / (1024 * 1024), 2)
})
except Exception as e:
logger.error(f"Failed to list folders for {username}: {e}")
return folders
def create_folder(self, username: str, folder_path: str) -> bool:
"""Create a new folder for a user"""
if not username or not folder_path:
logger.error("Username and folder_path must not be empty")
return False
try:
full_path = self._safe_path(username, folder_path)
os.makedirs(full_path, exist_ok=True)
logger.info(f"Created folder {folder_path} for {username}")
return True
except Exception as e:
logger.error(f"Failed to create folder {folder_path} for {username}: {e}")
return False
def delete_folder(self, username: str, folder_path: str) -> bool:
"""Delete a folder for a user"""
if not username or not folder_path:
logger.error("Username and folder_path must not be empty")
return False
try:
full_path = self._safe_path(username, folder_path)
if os.path.exists(full_path):
shutil.rmtree(full_path)
logger.info(f"Deleted folder {folder_path} for {username}")
return True
else:
logger.warning(f"Folder {folder_path} not found for {username}")
return False
except Exception as e:
logger.error(f"Failed to delete folder {folder_path} for {username}: {e}")
return False
def upload_file(self, username: str, file_path: str, file_data: bytes) -> bool:
"""Upload a file for a user"""
try:
full_path = self._safe_path(username, file_path)
# Ensure directory exists
os.makedirs(os.path.dirname(full_path), exist_ok=True)
# Write file
with open(full_path, 'wb') as f:
f.write(file_data)
logger.info(f"Uploaded file {file_path} for {username}")
return True
except Exception as e:
logger.error(f"Failed to upload file {file_path} for {username}: {e}")
return False
def download_file(self, username: str, file_path: str) -> Optional[bytes]:
"""Download a file for a user"""
try:
full_path = self._safe_path(username, file_path)
if os.path.exists(full_path):
with open(full_path, 'rb') as f:
return f.read()
else:
logger.warning(f"File {file_path} not found for {username}")
return None
except Exception as e:
logger.error(f"Failed to download file {file_path} for {username}: {e}")
return None
def delete_file(self, username: str, file_path: str) -> bool:
"""Delete a file for a user"""
try:
full_path = self._safe_path(username, file_path)
if os.path.exists(full_path):
os.remove(full_path)
logger.info(f"Deleted file {file_path} for {username}")
return True
else:
logger.warning(f"File {file_path} not found for {username}")
return False
except Exception as e:
logger.error(f"Failed to delete file {file_path} for {username}: {e}")
return False
def list_files(self, username: str, folder_path: str = '') -> List[Dict]:
"""List files in a folder for a user"""
files = []
try:
full_path = self._safe_path(username, folder_path)
if os.path.exists(full_path):
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
stat = os.stat(item_path)
files.append({
'name': item,
'type': 'directory' if os.path.isdir(item_path) else 'file',
'size_bytes': stat.st_size,
'size_mb': round(stat.st_size / (1024 * 1024), 2),
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'path': os.path.join(folder_path, item) if folder_path else item
})
except Exception as e:
logger.error(f"Failed to list files in {folder_path} for {username}: {e}")
return files
def get_webdav_status(self) -> Dict:
"""Get WebDAV service status"""
try:
# Check if service is running
result = subprocess.run(['docker', 'ps', '--filter', 'name=cell-webdav', '--format', '{{.Names}}'],
capture_output=True, text=True)
webdav_running = len(result.stdout.strip()) > 0
# Get user statistics
users = self.list_users()
total_users = len(users)
# Calculate total storage
total_files = 0
total_size = 0
for user in users:
storage_info = user['storage_info']
total_files += storage_info['total_files']
total_size += storage_info['total_size_bytes']
return {
'webdav_running': webdav_running,
'total_users': total_users,
'total_files': total_files,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'users': users
}
except Exception as e:
logger.error(f"Failed to get WebDAV status: {e}")
return {
'webdav_running': False,
'total_users': 0,
'total_files': 0,
'total_size_bytes': 0,
'total_size_mb': 0,
'users': []
}
def test_webdav_connectivity(self) -> Dict:
"""Test WebDAV service connectivity"""
try:
results = {}
# Test HTTP connectivity
try:
response = requests.get(f'{self.webdav_url}', timeout=5)
results['http'] = {
'success': response.status_code in [200, 401, 403],
'status_code': response.status_code,
'message': 'WebDAV HTTP server responding'
}
except Exception as e:
results['http'] = {
'success': False,
'message': str(e)
}
# Test WebDAV OPTIONS
try:
response = requests.options(f'{self.webdav_url}', timeout=5)
results['webdav'] = {
'success': response.status_code in [200, 401, 403],
'status_code': response.status_code,
'message': 'WebDAV protocol responding'
}
except Exception as e:
results['webdav'] = {
'success': False,
'message': str(e)
}
results['success'] = results.get('http', {}).get('success', False)
return results
except Exception as e:
return {
'success': False,
'http': {'success': False, 'message': str(e)},
'webdav': {'success': False, 'message': str(e)}
}
def get_webdav_logs(self, lines: int = 50) -> str:
"""Get WebDAV service logs"""
try:
result = subprocess.run(['docker', 'logs', '--tail', str(lines), 'cell-webdav'],
capture_output=True, text=True, timeout=10)
return result.stdout
except Exception as e:
logger.error(f"Failed to get WebDAV logs: {e}")
return f"Error getting WebDAV logs: {e}"
def backup_user_files(self, username: str, backup_path: str) -> bool:
"""Backup all files for a user"""
if not username or not backup_path:
logger.error("Username and backup_path must not be empty")
return False
if not isinstance(username, str) or not re.match(r'^[A-Za-z0-9._-]{1,64}$', username):
logger.error(f"backup_user_files: invalid username {username!r}")
return False
try:
user_dir = os.path.realpath(os.path.join(self.files_dir, username))
files_root = os.path.realpath(self.files_dir)
if not user_dir.startswith(files_root + os.sep):
logger.error(f"backup_user_files: path traversal for username {username!r}")
return False
if os.path.exists(user_dir):
shutil.make_archive(backup_path, 'zip', user_dir)
logger.info(f"Backed up files for {username} to {backup_path}.zip")
return True
else:
logger.warning(f"No files found for {username}")
return False
except Exception as e:
logger.error(f"Failed to backup files for {username}: {e}")
return False
def restore_user_files(self, username: str, backup_path: str) -> bool:
"""Restore files for a user from backup"""
if not username or not backup_path:
logger.error("Username and backup_path must not be empty")
return False
if not isinstance(username, str) or not re.match(r'^[A-Za-z0-9._-]{1,64}$', username):
logger.error(f"restore_user_files: invalid username {username!r}")
return False
try:
user_dir = os.path.realpath(os.path.join(self.files_dir, username))
files_root = os.path.realpath(self.files_dir)
if not user_dir.startswith(files_root + os.sep):
logger.error(f"restore_user_files: path traversal for username {username!r}")
return False
# Remove existing user directory
if os.path.exists(user_dir):
shutil.rmtree(user_dir)
# Extract backup
shutil.unpack_archive(f"{backup_path}.zip", user_dir, 'zip')
logger.info(f"Restored files for {username} from {backup_path}.zip")
return True
except Exception as e:
logger.error(f"Failed to restore files for {username}: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""Get file service status"""
try:
# Check if we're running in Docker environment
import os
is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true'
svc_cfg = self.get_config()
configured_port = svc_cfg.get('port', 80) if isinstance(svc_cfg, dict) and 'error' not in svc_cfg else 80
if is_docker:
# Check if file container is actually running
container_running = self._check_file_container_status()
status = {
'running': container_running,
'status': 'online' if container_running else 'offline',
'webdav_status': {'running': container_running, 'port': configured_port},
'users_count': 0,
'total_storage_used': {'bytes': 0, 'human_readable': '0 B'},
'timestamp': datetime.utcnow().isoformat()
}
else:
# Check actual service status in production
webdav_status = self.get_webdav_status()
users = self.list_users()
status = {
'running': webdav_status.get('running', False),
'status': 'online' if webdav_status.get('running', False) else 'offline',
'webdav_status': webdav_status,
'users_count': len(users),
'total_storage_used': self._get_total_storage_used(),
'timestamp': datetime.utcnow().isoformat()
}
return status
except Exception as e:
return self.handle_error(e, "get_status")
def _check_file_container_status(self) -> bool:
"""Check if file Docker container is running"""
try:
import docker
client = docker.from_env()
containers = client.containers.list(filters={'name': 'cell-webdav'})
return len(containers) > 0
except Exception:
return False
def test_connectivity(self) -> Dict[str, Any]:
"""Test file service connectivity"""
try:
webdav_test = self.test_webdav_connectivity()
# Test file system access
fs_test = self._test_filesystem_access()
# Test user authentication
auth_test = self._test_user_authentication()
results = {
'webdav_connectivity': webdav_test,
'filesystem_access': fs_test,
'user_authentication': auth_test,
'success': webdav_test.get('success', False) and fs_test.get('success', False),
'timestamp': datetime.utcnow().isoformat()
}
return results
except Exception as e:
return self.handle_error(e, "test_connectivity")
def _test_filesystem_access(self) -> Dict[str, Any]:
"""Test filesystem access"""
try:
# Test if we can read/write to the files directory
test_file = os.path.join(self.files_dir, '.test_access')
# Write test
with open(test_file, 'w') as f:
f.write('test')
# Read test
with open(test_file, 'r') as f:
content = f.read()
# Cleanup
os.remove(test_file)
return {
'success': True,
'message': 'Filesystem access working',
'read_write': content == 'test'
}
except Exception as e:
return {
'success': False,
'message': f'Filesystem access failed: {str(e)}',
'error': str(e)
}
def _test_user_authentication(self) -> Dict[str, Any]:
"""Test user authentication system"""
try:
auth_file = os.path.join(self.webdav_dir, 'users')
if not os.path.exists(auth_file):
return {
'success': True,
'message': 'No users configured yet',
'users_count': 0
}
with open(auth_file, 'r') as f:
users = [line.strip() for line in f if line.strip() and ':' in line]
return {
'success': True,
'message': 'User authentication system working',
'users_count': len(users)
}
except Exception as e:
return {
'success': False,
'message': f'User authentication test failed: {str(e)}',
'error': str(e)
}
def _get_total_storage_used(self) -> Dict[str, Any]:
"""Get total storage usage across all users"""
try:
total_files = 0
total_size = 0
if os.path.exists(self.files_dir):
for root, dirs, files in os.walk(self.files_dir):
for file in files:
file_path = os.path.join(root, file)
total_files += 1
total_size += os.path.getsize(file_path)
return {
'total_files': total_files,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'total_size_gb': round(total_size / (1024 * 1024 * 1024), 2)
}
except Exception as e:
self.logger.error(f"Error calculating total storage: {e}")
return {
'total_files': 0,
'total_size_bytes': 0,
'total_size_mb': 0,
'total_size_gb': 0
}