#!/usr/bin/env python3 """ PIC setup script — run once on a fresh host to initialise a new cell. Env vars (all optional, have defaults): CELL_NAME cell identity name (default: mycell) CELL_DOMAIN DNS TLD for this cell (default: cell) VPN_ADDRESS WireGuard server address (default: 10.0.0.1/24) WG_PORT WireGuard listen port (default: 51820) """ import json import os import subprocess import sys # ── directories ──────────────────────────────────────────────────────────────── REQUIRED_DIRS = [ 'config/caddy/certs', 'config/dns', 'config/dhcp', 'config/ntp', 'config/mail/config', 'config/mail/ssl', 'config/radicale', 'config/webdav', 'config/wireguard', 'config/api', 'data/caddy', 'data/dns', 'data/dhcp', 'data/maildata', 'data/mailstate', 'data/maillogs', 'data/radicale', 'data/files', 'data/api', 'data/vault/certs', 'data/vault/keys', 'data/vault/trust', 'data/vault/ca', 'data/logs', 'data/wireguard/keys/peers', 'data/wireguard/wg_confs', ] REQUIRED_FILES = [ 'config/dns/Corefile', 'config/dhcp/dnsmasq.conf', 'config/ntp/chrony.conf', 'config/mail/mailserver.env', 'config/webdav/users.passwd', ] ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def ensure_dir(rel): path = os.path.join(ROOT, rel) if not os.path.exists(path): os.makedirs(path, exist_ok=True) print(f'[CREATED] {rel}') open(os.path.join(path, '.gitkeep'), 'w').close() else: print(f'[EXISTS] {rel}') def ensure_file(rel): path = os.path.join(ROOT, rel) if not os.path.exists(path): os.makedirs(os.path.dirname(path), exist_ok=True) open(path, 'w').close() print(f'[CREATED] {rel}') else: print(f'[EXISTS] {rel}') def ensure_caddy_ca_cert(): cert_dir = os.path.join(ROOT, 'config', 'caddy', 'certs') ca_key = os.path.join(cert_dir, 'ca.key') ca_crt = os.path.join(cert_dir, 'ca.crt') if os.path.exists(ca_key) and os.path.exists(ca_crt): print('[EXISTS] Caddy CA cert') return print('[INFO] Generating Caddy CA certificate...') try: subprocess.run([ 'openssl', 'req', '-x509', '-newkey', 'rsa:4096', '-keyout', ca_key, '-out', ca_crt, '-days', '365', '-nodes', '-subj', '/C=US/ST=State/L=City/O=PersonalInternetCell/CN=CellCA' ], check=True, capture_output=True) print(f'[CREATED] Caddy CA cert') except FileNotFoundError: print('[WARN] openssl not found — skipping CA cert generation') except subprocess.CalledProcessError as e: print(f'[ERROR] openssl failed: {e}') def _gen_keys_python(): """Generate WireGuard keypair using the cryptography library (no wg binary needed).""" import base64 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption private_key = X25519PrivateKey.generate() try: # private_bytes_raw() available in cryptography >= 3.0 private_bytes = private_key.private_bytes_raw() public_bytes = private_key.public_key().public_bytes_raw() except AttributeError: private_bytes = private_key.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) public_bytes = private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) return base64.b64encode(private_bytes).decode(), base64.b64encode(public_bytes).decode() def generate_wg_keys(): keys_dir = os.path.join(ROOT, 'data', 'wireguard', 'keys') priv_path = os.path.join(keys_dir, 'server_private.key') pub_path = os.path.join(keys_dir, 'server_public.key') if os.path.exists(priv_path) and os.path.exists(pub_path): print('[EXISTS] WireGuard server keys') return open(priv_path).read().strip(), open(pub_path).read().strip() print('[INFO] Generating WireGuard server keys...') os.makedirs(keys_dir, exist_ok=True) # Allow caller to inject pre-generated keys (useful when wg and cryptography are absent) env_priv = os.environ.get('WG_PRIVATE_KEY', '').strip() env_pub = os.environ.get('WG_PUBLIC_KEY', '').strip() if env_priv and env_pub: print('[INFO] Using WG_PRIVATE_KEY / WG_PUBLIC_KEY from environment') priv, pub = env_priv, env_pub else: # Try wg binary, then Python cryptography library try: priv = subprocess.check_output(['wg', 'genkey']).decode().strip() pub = subprocess.check_output(['wg', 'pubkey'], input=priv.encode()).decode().strip() except FileNotFoundError: print('[INFO] wg not found — using Python cryptography library') priv, pub = _gen_keys_python() with open(priv_path, 'w') as f: f.write(priv + '\n') os.chmod(priv_path, 0o600) with open(pub_path, 'w') as f: f.write(pub + '\n') print(f'[CREATED] WireGuard server keys pub={pub[:12]}...') return priv, pub def write_wg0_conf(private_key: str, address: str, port: int): wg_conf = os.path.join(ROOT, 'config', 'wireguard', 'wg0.conf') if os.path.exists(wg_conf): print('[EXISTS] config/wireguard/wg0.conf') return server_ip = address.split('/')[0] content = ( f'[Interface]\n' f'PrivateKey = {private_key}\n' f'Address = {address}\n' f'ListenPort = {port}\n' f'PostUp = iptables -A FORWARD -i %i -j ACCEPT; ' f'iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE; ' f'sysctl -q net.ipv4.conf.all.rp_filter=0\n' f'PostDown = iptables -D FORWARD -i %i -j ACCEPT; ' f'iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE; ' f'sysctl -q net.ipv4.conf.all.rp_filter=1\n' ) with open(wg_conf, 'w') as f: f.write(content) os.chmod(wg_conf, 0o600) print(f'[CREATED] config/wireguard/wg0.conf address={address} port={port}') def write_cell_config(cell_name: str, domain: str, port: int): cfg_path = os.path.join(ROOT, 'config', 'api', 'cell_config.json') if os.path.exists(cfg_path): try: existing = json.loads(open(cfg_path).read()) if existing and existing != {}: print('[EXISTS] config/api/cell_config.json') return except Exception: pass config = { '_identity': { 'cell_name': cell_name, 'domain': domain, 'ip_range': '172.20.0.0/16', 'wireguard_port': port, } } with open(cfg_path, 'w') as f: json.dump(config, f, indent=2) print(f'[CREATED] config/api/cell_config.json name={cell_name} domain={domain}') def write_compose_env(ip_range: str): """Generate .env at project root so docker-compose picks up correct IPs and ports.""" sys.path.insert(0, os.path.join(ROOT, 'api')) import ip_utils env_path = os.path.join(ROOT, '.env') # Pass no custom ports — ip_utils will use PORT_DEFAULTS for all port vars if ip_utils.write_env_file(ip_range, env_path): print(f'[CREATED] .env (ip_range={ip_range})') else: print(f'[WARN] Could not write .env — containers will use built-in default IPs/ports') def write_caddy_config(ip_range: str, cell_name: str, domain: str): """Generate Caddyfile with correct VIPs and hostnames for this cell.""" sys.path.insert(0, os.path.join(ROOT, 'api')) import ip_utils caddyfile = os.path.join(ROOT, 'config', 'caddy', 'Caddyfile') if ip_utils.write_caddyfile(ip_range, cell_name, domain, caddyfile): print(f'[CREATED] config/caddy/Caddyfile (subnet={ip_range} domain={domain})') else: print(f'[WARN] Could not write Caddyfile') def _read_existing_ip_range() -> str: """Read ip_range from existing cell_config.json if present, else return None.""" cfg_path = os.path.join(ROOT, 'config', 'api', 'cell_config.json') try: existing = json.loads(open(cfg_path).read()) return existing.get('_identity', {}).get('ip_range') or None except Exception: return None def main(): cell_name = os.environ.get('CELL_NAME', 'mycell') domain = os.environ.get('CELL_DOMAIN', 'cell') vpn_address = os.environ.get('VPN_ADDRESS', '10.0.0.1/24') wg_port = int(os.environ.get('WG_PORT', '51820')) # Prefer existing config ip_range over env var so `make setup` is safe to re-run ip_range = os.environ.get('CELL_IP_RANGE') or _read_existing_ip_range() or '172.20.0.0/16' print('--- Personal Internet Cell: Setup ---') print(f' cell={cell_name} domain={domain} vpn={vpn_address} port={wg_port}') print() for d in REQUIRED_DIRS: ensure_dir(d) for f in REQUIRED_FILES: ensure_file(f) ensure_caddy_ca_cert() priv, _pub = generate_wg_keys() write_wg0_conf(priv, vpn_address, wg_port) write_cell_config(cell_name, domain, wg_port) write_compose_env(ip_range) write_caddy_config(ip_range, cell_name, domain) print() print('--- Setup complete! Run: make start ---') if __name__ == '__main__': main()