Files
pic/api/calendar_manager.py
T
roof a43f9fbf0d fix: full security audit remediation — P0/P1/P2/P3 fixes + 1020 passing tests
P0 — Broken functionality:
- Fix 12+ endpoints with wrong manager method signatures (email/calendar/file/routing)
- Fix email_manager.delete_email_user() missing domain arg
- Fix cell-link DNS forwarding wiped on every peer change (generate_corefile now
  accepts cell_links param; add/remove_cell_dns_forward no longer clobber the file)
- Fix Flask SECRET_KEY regenerating on every restart (persisted to DATA_DIR)
- Fix _next_peer_ip exhaustion returning 500 instead of 409
- Fix ConfigManager Caddyfile path (/app/config-caddy/)
- Fix UI double-add and wrong-key peer bugs in Peers.jsx / WireGuard.jsx
- Remove hardcoded credentials from Dashboard.jsx

P1 — Security:
- CSRF token validation on all POST/PUT/DELETE/PATCH to /api/* (double-submit pattern)
- enforce_auth: 503 only when users file readable but empty; never bypass on IOError
- WireGuard add_cell_peer: validate pubkey, name, endpoint against strict regexes
- DNS add_cell_dns_forward: validate IP and domain; reject injection chars
- DNS zone write: realpath containment + record content validation
- iptables comment /32 suffix prevents substring match deleting wrong peer rules
- is_local_request() trusts only loopback + 172.16.0.0/12 (Docker bridge)
- POST /api/containers: volume allow-list prevents arbitrary host mounts
- file_manager: bcrypt ($2b→$2y) for WebDAV; realpath containment in delete_user
- email/calendar: stop persisting plaintext passwords in user records
- routing_manager: validate IPs, networks, and interface names
- peer_registry: write peers.json at mode 0o600
- vault_manager: Fernet key file at mode 0o600
- CORS: lock down to explicit origin list
- domain/cell_name validation: reject newline, brace, semicolon injection chars

P2 — Architecture:
- Peer add: rollback registry entry if firewall rules fail post-add
- restart_service(): base class now calls _restart_container(); email and calendar
  managers call cell-mail / cell-radicale respectively
- email/calendar managers sync user list (no passwords) to cell_config.json
- Pending-restart flag cleared only after helper subprocess exits with code 0
- docker-compose.yml: add config-caddy volume to API container

P3 — Tests (854 → 1020):
- Fill test_email_endpoints.py, test_calendar_endpoints.py,
  test_network_endpoints.py, test_routing_endpoints.py
- New: test_peer_management_update.py, test_peer_management_edge_cases.py,
  test_input_validation.py, test_enforce_auth_configured.py,
  test_cell_link_dns.py, test_logs_endpoints.py, test_cells_endpoints.py,
  test_is_local_request_per_endpoint.py, test_caddy_routing.py
- E2E conftest: skip WireGuard suite when wg-quick absent
- Update existing tests to match fixed signatures and comment formats

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 11:30:21 -04:00

586 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Calendar Manager for Personal Internet Cell
Handles calendar service configuration and user management
"""
import os
import json
import subprocess
import logging
from datetime import datetime
from typing import Dict, List, Optional, Any
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
class CalendarManager(BaseServiceManager):
"""Manages calendar service configuration and users"""
def __init__(self, data_dir: str = '/app/data', config_dir: str = '/app/config'):
super().__init__('calendar', data_dir, config_dir)
self.calendar_data_dir = os.path.join(data_dir, 'calendar')
self.calendar_dir = self.calendar_data_dir # alias used by tests
self.radicale_dir = os.path.join(config_dir, 'radicale')
self.users_file = os.path.join(self.calendar_data_dir, 'users.json')
self.calendars_file = os.path.join(self.calendar_data_dir, 'calendars.json')
self.events_file = os.path.join(self.calendar_data_dir, 'events.json')
self.safe_makedirs(self.calendar_data_dir)
self.safe_makedirs(self.radicale_dir)
def _get_configured_port(self) -> int:
cfg = self.get_config()
if isinstance(cfg, dict) and 'error' not in cfg:
return cfg.get('port', 5232)
return 5232
def get_status(self) -> Dict[str, Any]:
"""Get calendar service status"""
try:
port = self._get_configured_port()
# Check if we're running in Docker environment
import os
is_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER') == 'true'
if is_docker:
# Check if calendar container is actually running
container_running = self._check_calendar_container_status()
status = {
'running': container_running,
'status': 'online' if container_running else 'offline',
'port': port,
'users_count': 0,
'calendars_count': 0,
'events_count': 0,
'timestamp': datetime.utcnow().isoformat()
}
else:
# Check actual service status in production
service_running = self._check_calendar_status()
users = self._load_users()
calendars = self._load_calendars()
events = self._load_events()
status = {
'running': service_running,
'status': 'online' if service_running else 'offline',
'port': port,
'users_count': len(users),
'calendars_count': len(calendars),
'events_count': len(events),
'timestamp': datetime.utcnow().isoformat()
}
return status
except Exception as e:
return self.handle_error(e, "get_status")
def test_connectivity(self) -> Dict[str, Any]:
"""Test calendar service connectivity"""
try:
# Test if calendar service is accessible
service_test = self._test_service_connectivity()
# Test database connectivity
db_test = self._test_database_connectivity()
# Test web interface
web_test = self._test_web_interface()
results = {
'service_connectivity': service_test,
'database_connectivity': db_test,
'web_interface': web_test,
'success': service_test['success'] and db_test['success'] and web_test['success'],
'timestamp': datetime.utcnow().isoformat()
}
return results
except Exception as e:
return self.handle_error(e, "test_connectivity")
def _check_calendar_status(self) -> bool:
"""Check if calendar service is running"""
try:
# Check if port 5232 (Radicale) is listening
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
return ':5232 ' in result.stdout
except Exception:
return False
def _check_calendar_container_status(self) -> bool:
"""Check if calendar Docker container is running"""
try:
import docker
client = docker.from_env()
containers = client.containers.list(filters={'name': 'cell-radicale'})
return len(containers) > 0
except Exception:
return False
def _test_service_connectivity(self) -> Dict[str, Any]:
"""Test calendar service connectivity via TCP socket to cell-radicale container."""
import socket
try:
with socket.create_connection(('cell-radicale', 5232), timeout=5):
pass
return {'success': True, 'message': 'Calendar service accessible'}
except Exception as e:
return {'success': False, 'message': f'Calendar service not accessible: {str(e)}'}
def _test_database_connectivity(self) -> Dict[str, Any]:
"""Test database connectivity — data dir must be writable; files are created on first use."""
try:
data_dir = os.path.dirname(self.users_file)
os.makedirs(data_dir, exist_ok=True)
accessible = os.access(data_dir, os.R_OK | os.W_OK)
return {
'success': accessible,
'message': 'Database directory accessible' if accessible else 'Database directory not accessible'
}
except Exception as e:
return {'success': False, 'message': f'Database test error: {str(e)}'}
def _test_web_interface(self) -> Dict[str, Any]:
"""Test Radicale web interface via HTTP to cell-radicale container."""
try:
import urllib.request
with urllib.request.urlopen('http://cell-radicale:5232', timeout=5) as r:
body = r.read(512).decode('utf-8', errors='ignore').lower()
success = r.status < 500
return {'success': success, 'message': 'Web interface accessible' if success else 'Web interface not accessible'}
except Exception as e:
return {'success': False, 'message': f'Web interface not accessible: {str(e)}'}
def _load_users(self) -> List[Dict[str, Any]]:
"""Load calendar users from file"""
try:
if os.path.exists(self.users_file):
with open(self.users_file, 'r') as f:
return json.load(f)
return []
except Exception as e:
logger.error(f"Error loading calendar users: {e}")
return []
def _save_users(self, users: List[Dict[str, Any]]):
"""Save calendar users to file"""
try:
with open(self.users_file, 'w') as f:
json.dump(users, f, indent=2)
except Exception as e:
logger.error(f"Error saving calendar users: {e}")
def _load_calendars(self) -> List[Dict[str, Any]]:
"""Load calendars from file"""
try:
if os.path.exists(self.calendars_file):
with open(self.calendars_file, 'r') as f:
return json.load(f)
return []
except Exception as e:
logger.error(f"Error loading calendars: {e}")
return []
def _save_calendars(self, calendars: List[Dict[str, Any]]):
"""Save calendars to file"""
try:
with open(self.calendars_file, 'w') as f:
json.dump(calendars, f, indent=2)
except Exception as e:
logger.error(f"Error saving calendars: {e}")
def _load_events(self) -> List[Dict[str, Any]]:
"""Load events from file"""
try:
if os.path.exists(self.events_file):
with open(self.events_file, 'r') as f:
return json.load(f)
return []
except Exception as e:
logger.error(f"Error loading events: {e}")
return []
def _save_events(self, events: List[Dict[str, Any]]):
"""Save events to file"""
try:
with open(self.events_file, 'w') as f:
json.dump(events, f, indent=2)
except Exception as e:
logger.error(f"Error saving events: {e}")
def get_calendar_status(self) -> Dict[str, Any]:
"""Get detailed calendar service status"""
try:
status = self.get_status()
# Add user details
users = self._load_users()
user_details = []
for user in users:
user_detail = {
'username': user.get('username', ''),
'calendars_count': user.get('calendars_count', 0),
'events_count': user.get('events_count', 0),
'created_at': user.get('created_at', ''),
'last_login': user.get('last_login', ''),
'active': user.get('active', True)
}
user_details.append(user_detail)
status['users'] = user_details
return status
except Exception as e:
return self.handle_error(e, "get_calendar_status")
def get_calendar_users(self) -> List[Dict[str, Any]]:
"""Get all calendar users"""
try:
return self._load_users()
except Exception as e:
logger.error(f"Error getting calendar users: {e}")
return []
def create_calendar_user(self, username: str, password: str) -> bool:
"""Create a new calendar user"""
try:
users = self._load_users()
# Check if user already exists
for user in users:
if user.get('username') == username:
logger.warning(f"Calendar user {username} already exists")
return False
# Create new user
# SECURITY: Do NOT persist the plaintext password here. The calendar
# password is the same as the user's VPN auth password and storing
# it in plain JSON would leak every user credential if this file is
# read. Auth verification goes through auth_manager; the actual
# CalDAV/CardDAV auth is handled by the cell-radicale container
# (htpasswd file). This JSON is metadata only.
new_user = {
'username': username,
'calendars_count': 0,
'events_count': 0,
'created_at': datetime.utcnow().isoformat(),
'last_login': None,
'active': True
}
users.append(new_user)
self._save_users(users)
# Sync user list to cell_config.json (best-effort, non-fatal)
self._sync_users_to_cell_config()
# Create user directory
user_dir = os.path.join(self.calendar_data_dir, 'users', username)
self.safe_makedirs(user_dir)
logger.info(f"Created calendar user: {username}")
return True
except Exception as e:
logger.error(f"Failed to create calendar user {username}: {e}")
return False
def delete_calendar_user(self, username: str) -> bool:
"""Delete a calendar user"""
try:
users = self._load_users()
# Find and remove user
for i, user in enumerate(users):
if user.get('username') == username:
del users[i]
self._save_users(users)
# Sync user list to cell_config.json (best-effort, non-fatal)
self._sync_users_to_cell_config()
# Remove user directory
user_dir = os.path.join(self.calendar_data_dir, 'users', username)
if os.path.exists(user_dir):
import shutil
shutil.rmtree(user_dir)
logger.info(f"Deleted calendar user: {username}")
return True
logger.warning(f"Calendar user {username} not found")
return False
except Exception as e:
logger.error(f"Failed to delete calendar user {username}: {e}")
return False
def create_calendar(self, username: str, calendar_name: str,
description: str = '', color: str = '#4285f4') -> bool:
"""Create a new calendar for a user"""
try:
if not username or not calendar_name:
return False
calendars = self._load_calendars()
# Check if calendar already exists for user
for calendar in calendars:
if calendar.get('username') == username and calendar.get('name') == calendar_name:
logger.warning(f"Calendar {calendar_name} already exists for user {username}")
return False
# Create new calendar
new_calendar = {
'username': username,
'name': calendar_name,
'description': description,
'color': color,
'created_at': datetime.utcnow().isoformat(),
'events_count': 0,
'active': True
}
calendars.append(new_calendar)
self._save_calendars(calendars)
# Update user's calendar count
users = self._load_users()
for user in users:
if user.get('username') == username:
user['calendars_count'] = user.get('calendars_count', 0) + 1
break
self._save_users(users)
# Create calendar directory
calendar_dir = os.path.join(self.calendar_data_dir, 'users', username, calendar_name)
self.safe_makedirs(calendar_dir)
logger.info(f"Created calendar {calendar_name} for user {username}")
return True
except Exception as e:
logger.error(f"Failed to create calendar {calendar_name} for user {username}: {e}")
return False
def get_calendar_events(self, username: str, calendar_name: str,
start_date: str = None, end_date: str = None) -> List[Dict[str, Any]]:
"""Get calendar events for a user and calendar"""
try:
events = self._load_events()
# Filter events by user and calendar
filtered_events = []
for event in events:
if (event.get('username') == username and
event.get('calendar_name') == calendar_name):
# Apply date filters if provided
if start_date and end_date:
event_start = event.get('start', '')
if start_date <= event_start <= end_date:
filtered_events.append(event)
else:
filtered_events.append(event)
return filtered_events
except Exception as e:
logger.error(f"Error getting calendar events: {e}")
return []
def create_calendar_event(self, username: str, calendar_name: str,
title: str, start: str, end: str,
description: str = '', location: str = '') -> bool:
"""Create a new calendar event"""
try:
events = self._load_events()
# Create new event
new_event = {
'id': f"event_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}_{username}",
'username': username,
'calendar_name': calendar_name,
'title': title,
'start': start,
'end': end,
'description': description,
'location': location,
'created_at': datetime.utcnow().isoformat(),
'updated_at': datetime.utcnow().isoformat()
}
events.append(new_event)
self._save_events(events)
# Update calendar's event count
calendars = self._load_calendars()
for calendar in calendars:
if calendar.get('username') == username and calendar.get('name') == calendar_name:
calendar['events_count'] = calendar.get('events_count', 0) + 1
break
self._save_calendars(calendars)
# Update user's event count
users = self._load_users()
for user in users:
if user.get('username') == username:
user['events_count'] = user.get('events_count', 0) + 1
break
self._save_users(users)
logger.info(f"Created calendar event {title} for user {username}")
return True
except Exception as e:
logger.error(f"Failed to create calendar event: {e}")
return False
def get_metrics(self) -> Dict[str, Any]:
"""Get calendar service metrics"""
try:
users = self._load_users()
calendars = self._load_calendars()
events = self._load_events()
total_events = sum(user.get('events_count', 0) for user in users)
total_calendars = sum(user.get('calendars_count', 0) for user in users)
return {
'service': 'calendar',
'timestamp': datetime.utcnow().isoformat(),
'status': 'online' if self._check_calendar_status() else 'offline',
'users_count': len(users),
'calendars_count': len(calendars),
'events_count': len(events),
'total_user_events': total_events,
'total_user_calendars': total_calendars,
'average_events_per_user': total_events / len(users) if users else 0,
'average_calendars_per_user': total_calendars / len(users) if users else 0
}
except Exception as e:
return self.handle_error(e, "get_metrics")
def _sync_users_to_cell_config(self):
"""Best-effort sync of the calendar user list into cell_config.json via ConfigManager.
Only safe metadata (no passwords) is written. Failures are logged as
warnings so they never block the per-service operation that triggered them.
"""
try:
from config_manager import ConfigManager
cm = ConfigManager()
_SENSITIVE = {'password', 'hashed_password', 'password_hash'}
safe_users = [
{k: v for k, v in u.items() if k not in _SENSITIVE}
for u in self._load_users()
]
existing = cm.get_service_config('calendar')
existing['users'] = safe_users
cm.update_service_config('calendar', existing)
except Exception as e:
self.logger.warning(f"Failed to sync calendar users to cell_config.json: {e}")
def restart_service(self) -> bool:
"""Restart calendar service (restarts the cell-radicale Docker container)."""
try:
logger.info('Calendar service restart requested')
return self._restart_container('cell-radicale')
except Exception as e:
logger.error(f'Failed to restart calendar service: {e}')
return False
def _ensure_config_exists(self):
"""Create radicale config file if it doesn't exist."""
self._generate_radicale_config()
def _generate_radicale_config(self):
"""Write a default radicale config to radicale_dir/config."""
config_file = os.path.join(self.radicale_dir, 'config')
config_content = (
'[server]\n'
'hosts = 0.0.0.0:5232\n'
'\n'
'[auth]\n'
'type = htpasswd\n'
'htpasswd_filename = /etc/radicale/users\n'
'htpasswd_encryption = md5\n'
'\n'
'[storage]\n'
'filesystem_folder = /data/collections\n'
)
with open(config_file, 'w') as f:
f.write(config_content)
def apply_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Update radicale config file. Port changes go through pending restart (docker binding)."""
restarted = []
warnings = []
if 'port' not in config:
return {'restarted': restarted, 'warnings': warnings}
try:
radicale_conf = os.path.join(self.radicale_dir, 'config')
if os.path.exists(radicale_conf):
with open(radicale_conf) as f:
lines = f.readlines()
lines = [
f"hosts = 0.0.0.0:{config['port']}\n" if l.strip().startswith('hosts =') else l
for l in lines
]
with open(radicale_conf, 'w') as f:
f.writelines(lines)
# No immediate restart — docker port binding must be updated first.
# The pending restart banner will run docker compose up with updated .env.
except Exception as e:
warnings.append(f"radicale config update failed: {e}")
return {'restarted': restarted, 'warnings': warnings}
def remove_calendar(self, username: str, calendar_name: str) -> bool:
"""Remove a calendar."""
try:
if not username or not calendar_name:
return False
calendars = self._load_calendars()
new_cals = [
c for c in calendars
if not (c.get('username') == username and c.get('name') == calendar_name)
]
self._save_calendars(new_cals)
return True
except Exception as e:
logger.error(f'remove_calendar failed: {e}')
return False
def add_event(self, username: str, calendar_name: str,
event_data: dict) -> bool:
"""Add an event to a calendar."""
try:
if not username or not calendar_name or event_data is None:
return False
events = self._load_events()
event_data = dict(event_data)
event_data.update({
'username': username,
'calendar': calendar_name,
'uid': event_data.get('uid', datetime.utcnow().isoformat()),
})
events.append(event_data)
self._save_events(events)
return True
except Exception as e:
logger.error(f'add_event failed: {e}')
return False
def remove_event(self, username: str, calendar_name: str, uid: str) -> bool:
"""Remove an event by UID."""
try:
if not username or not calendar_name or not uid:
return False
events = self._load_events()
new_events = [
e for e in events
if not (e.get('username') == username
and e.get('calendar') == calendar_name
and e.get('uid') == uid)
]
self._save_events(new_events)
return True
except Exception as e:
logger.error(f'remove_event failed: {e}')
return False