ffe1dbeed6
Unit Tests / test (push) Failing after 8m57s
setup_cell.py: register_with_ddns() called at end of setup — detects public IP via api.ipify.org, generates TOTP code from DDNS_TOTP_SECRET, POSTs to DDNS /register, saves token to data/api/.ddns_token (mode 600). Idempotent: skips if token file already exists. Fails gracefully if DDNS_TOTP_SECRET is unset or network is unreachable. scripts/ddns_update.py: standalone script for periodic IP updates. Reads token from data/api/.ddns_token, fetches current public IP, compares to cached last IP (data/api/.ddns_last_ip) and calls /update only when the IP has actually changed. Makefile: add ddns-update (run update script) and ddns-register (force re-registration by removing old token then calling register_with_ddns). Usage: DDNS_TOTP_SECRET=<secret> make ddns-register Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
382 lines
14 KiB
Python
382 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PIC setup script — run once on a fresh host to initialise a new cell.
|
|
|
|
Env vars (all optional, have defaults):
|
|
CELL_NAME cell identity name (default: mycell)
|
|
CELL_DOMAIN DNS TLD for this cell (default: cell)
|
|
VPN_ADDRESS WireGuard server address (default: 10.0.0.1/24)
|
|
WG_PORT WireGuard listen port (default: 51820)
|
|
"""
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
# ── directories ────────────────────────────────────────────────────────────────
|
|
REQUIRED_DIRS = [
|
|
'config/caddy/certs',
|
|
'config/dns',
|
|
'config/dhcp',
|
|
'config/ntp',
|
|
'config/mail/config',
|
|
'config/mail/ssl',
|
|
'config/radicale',
|
|
'config/webdav',
|
|
'config/wireguard',
|
|
'config/api',
|
|
'data/caddy',
|
|
'data/dns',
|
|
'data/dhcp',
|
|
'data/maildata',
|
|
'data/mailstate',
|
|
'data/maillogs',
|
|
'data/radicale',
|
|
'data/files',
|
|
'data/api',
|
|
'data/vault/certs',
|
|
'data/vault/keys',
|
|
'data/vault/trust',
|
|
'data/vault/ca',
|
|
'data/logs',
|
|
'data/wireguard/keys/peers',
|
|
'data/wireguard/wg_confs',
|
|
]
|
|
|
|
REQUIRED_FILES = [
|
|
'config/dns/Corefile',
|
|
'config/dhcp/dnsmasq.conf',
|
|
'config/ntp/chrony.conf',
|
|
'config/mail/mailserver.env',
|
|
'config/webdav/users.passwd',
|
|
]
|
|
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
|
|
def ensure_dir(rel):
|
|
path = os.path.join(ROOT, rel)
|
|
if not os.path.exists(path):
|
|
os.makedirs(path, exist_ok=True)
|
|
print(f'[CREATED] {rel}')
|
|
open(os.path.join(path, '.gitkeep'), 'w').close()
|
|
else:
|
|
print(f'[EXISTS] {rel}')
|
|
|
|
|
|
def ensure_file(rel):
|
|
path = os.path.join(ROOT, rel)
|
|
if not os.path.exists(path):
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
open(path, 'w').close()
|
|
print(f'[CREATED] {rel}')
|
|
else:
|
|
print(f'[EXISTS] {rel}')
|
|
|
|
|
|
def ensure_caddy_ca_cert():
|
|
cert_dir = os.path.join(ROOT, 'config', 'caddy', 'certs')
|
|
ca_key = os.path.join(cert_dir, 'ca.key')
|
|
ca_crt = os.path.join(cert_dir, 'ca.crt')
|
|
if os.path.exists(ca_key) and os.path.exists(ca_crt):
|
|
print('[EXISTS] Caddy CA cert')
|
|
return
|
|
print('[INFO] Generating Caddy CA certificate...')
|
|
try:
|
|
subprocess.run([
|
|
'openssl', 'req', '-x509', '-newkey', 'rsa:4096',
|
|
'-keyout', ca_key, '-out', ca_crt, '-days', '365', '-nodes',
|
|
'-subj', '/C=US/ST=State/L=City/O=PersonalInternetCell/CN=CellCA'
|
|
], check=True, capture_output=True)
|
|
print(f'[CREATED] Caddy CA cert')
|
|
except FileNotFoundError:
|
|
print('[WARN] openssl not found — skipping CA cert generation')
|
|
except subprocess.CalledProcessError as e:
|
|
print(f'[ERROR] openssl failed: {e}')
|
|
|
|
|
|
def _gen_keys_python():
|
|
"""Generate WireGuard keypair using the cryptography library (no wg binary needed)."""
|
|
import base64
|
|
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
|
|
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
|
|
private_key = X25519PrivateKey.generate()
|
|
try:
|
|
# private_bytes_raw() available in cryptography >= 3.0
|
|
private_bytes = private_key.private_bytes_raw()
|
|
public_bytes = private_key.public_key().public_bytes_raw()
|
|
except AttributeError:
|
|
private_bytes = private_key.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
|
|
public_bytes = private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
|
|
return base64.b64encode(private_bytes).decode(), base64.b64encode(public_bytes).decode()
|
|
|
|
|
|
def generate_wg_keys():
|
|
keys_dir = os.path.join(ROOT, 'data', 'wireguard', 'keys')
|
|
priv_path = os.path.join(keys_dir, 'server_private.key')
|
|
pub_path = os.path.join(keys_dir, 'server_public.key')
|
|
if os.path.exists(priv_path) and os.path.exists(pub_path):
|
|
print('[EXISTS] WireGuard server keys')
|
|
return open(priv_path).read().strip(), open(pub_path).read().strip()
|
|
print('[INFO] Generating WireGuard server keys...')
|
|
os.makedirs(keys_dir, exist_ok=True)
|
|
|
|
# Allow caller to inject pre-generated keys (useful when wg and cryptography are absent)
|
|
env_priv = os.environ.get('WG_PRIVATE_KEY', '').strip()
|
|
env_pub = os.environ.get('WG_PUBLIC_KEY', '').strip()
|
|
if env_priv and env_pub:
|
|
print('[INFO] Using WG_PRIVATE_KEY / WG_PUBLIC_KEY from environment')
|
|
priv, pub = env_priv, env_pub
|
|
else:
|
|
# Try wg binary, then Python cryptography library
|
|
try:
|
|
priv = subprocess.check_output(['wg', 'genkey']).decode().strip()
|
|
pub = subprocess.check_output(['wg', 'pubkey'], input=priv.encode()).decode().strip()
|
|
except FileNotFoundError:
|
|
print('[INFO] wg not found — using Python cryptography library')
|
|
priv, pub = _gen_keys_python()
|
|
|
|
with open(priv_path, 'w') as f:
|
|
f.write(priv + '\n')
|
|
os.chmod(priv_path, 0o600)
|
|
with open(pub_path, 'w') as f:
|
|
f.write(pub + '\n')
|
|
print(f'[CREATED] WireGuard server keys pub={pub[:12]}...')
|
|
return priv, pub
|
|
|
|
|
|
def write_wg0_conf(private_key: str, address: str, port: int):
|
|
wg_conf = os.path.join(ROOT, 'config', 'wireguard', 'wg0.conf')
|
|
if os.path.exists(wg_conf):
|
|
print('[EXISTS] config/wireguard/wg0.conf')
|
|
return
|
|
server_ip = address.split('/')[0]
|
|
content = (
|
|
f'[Interface]\n'
|
|
f'PrivateKey = {private_key}\n'
|
|
f'Address = {address}\n'
|
|
f'ListenPort = {port}\n'
|
|
f'PostUp = iptables -A FORWARD -i %i -j ACCEPT; '
|
|
f'iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE; '
|
|
f'sysctl -q net.ipv4.conf.all.rp_filter=0\n'
|
|
f'PostDown = iptables -D FORWARD -i %i -j ACCEPT; '
|
|
f'iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE; '
|
|
f'sysctl -q net.ipv4.conf.all.rp_filter=1\n'
|
|
)
|
|
with open(wg_conf, 'w') as f:
|
|
f.write(content)
|
|
os.chmod(wg_conf, 0o600)
|
|
print(f'[CREATED] config/wireguard/wg0.conf address={address} port={port}')
|
|
|
|
|
|
def write_cell_config(cell_name: str, domain: str, port: int):
|
|
cfg_path = os.path.join(ROOT, 'config', 'api', 'cell_config.json')
|
|
if os.path.exists(cfg_path):
|
|
try:
|
|
existing = json.loads(open(cfg_path).read())
|
|
if existing and existing != {}:
|
|
print('[EXISTS] config/api/cell_config.json')
|
|
return
|
|
except Exception:
|
|
pass
|
|
config = {
|
|
'_identity': {
|
|
'cell_name': cell_name,
|
|
'domain': domain,
|
|
'ip_range': '172.20.0.0/16',
|
|
'wireguard_port': port,
|
|
}
|
|
}
|
|
with open(cfg_path, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
print(f'[CREATED] config/api/cell_config.json name={cell_name} domain={domain}')
|
|
|
|
|
|
def write_compose_env(ip_range: str):
|
|
"""Generate .env at project root so docker-compose picks up correct IPs and ports."""
|
|
sys.path.insert(0, os.path.join(ROOT, 'api'))
|
|
import ip_utils
|
|
env_path = os.path.join(ROOT, '.env')
|
|
# Pass no custom ports — ip_utils will use PORT_DEFAULTS for all port vars
|
|
if ip_utils.write_env_file(ip_range, env_path):
|
|
print(f'[CREATED] .env (ip_range={ip_range})')
|
|
else:
|
|
print(f'[WARN] Could not write .env — containers will use built-in default IPs/ports')
|
|
|
|
|
|
def write_caddy_config(ip_range: str, cell_name: str, domain: str):
|
|
"""Generate Caddyfile with correct VIPs and hostnames for this cell."""
|
|
sys.path.insert(0, os.path.join(ROOT, 'api'))
|
|
import ip_utils
|
|
caddyfile = os.path.join(ROOT, 'config', 'caddy', 'Caddyfile')
|
|
if ip_utils.write_caddyfile(ip_range, cell_name, domain, caddyfile):
|
|
print(f'[CREATED] config/caddy/Caddyfile (subnet={ip_range} domain={domain})')
|
|
else:
|
|
print(f'[WARN] Could not write Caddyfile')
|
|
|
|
|
|
def _read_existing_ip_range() -> str:
|
|
"""Read ip_range from existing cell_config.json if present, else return None."""
|
|
cfg_path = os.path.join(ROOT, 'config', 'api', 'cell_config.json')
|
|
try:
|
|
existing = json.loads(open(cfg_path).read())
|
|
return existing.get('_identity', {}).get('ip_range') or None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def ensure_session_secret():
|
|
path = os.path.join(ROOT, 'data', 'api', '.session_secret')
|
|
if os.path.exists(path):
|
|
print('[EXISTS] data/api/.session_secret')
|
|
return
|
|
secret = os.urandom(64)
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, 'wb') as f:
|
|
f.write(secret)
|
|
os.chmod(path, 0o600)
|
|
print('[CREATED] data/api/.session_secret')
|
|
|
|
|
|
DDNS_URL = os.environ.get('DDNS_URL', 'https://ddns.pic.ngo/api/v1')
|
|
DDNS_TOTP_SECRET = os.environ.get('DDNS_TOTP_SECRET', '')
|
|
|
|
|
|
def register_with_ddns(cell_name: str) -> None:
|
|
"""Register cell_name.pic.ngo with the DDNS server using TOTP auth.
|
|
|
|
Idempotent: if a token file already exists the registration is skipped.
|
|
Skipped silently if DDNS_TOTP_SECRET is not set.
|
|
"""
|
|
token_path = os.path.join(ROOT, 'data', 'api', '.ddns_token')
|
|
if os.path.exists(token_path):
|
|
print('[EXISTS] DDNS registration — token already present')
|
|
return
|
|
|
|
if not DDNS_TOTP_SECRET:
|
|
print('[SKIP] DDNS_TOTP_SECRET not set — skipping DDNS registration')
|
|
return
|
|
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
# Detect public IP
|
|
try:
|
|
public_ip = urllib.request.urlopen(
|
|
'https://api.ipify.org', timeout=5
|
|
).read().decode().strip()
|
|
except Exception as e:
|
|
print(f'[WARN] Could not detect public IP: {e} — skipping DDNS registration')
|
|
return
|
|
|
|
# Generate TOTP code (requires pyotp; if not available fall back gracefully)
|
|
try:
|
|
import pyotp
|
|
otp = pyotp.TOTP(DDNS_TOTP_SECRET).now()
|
|
except ImportError:
|
|
# Try python3 -c as a subprocess fallback
|
|
try:
|
|
otp = subprocess.check_output(
|
|
['python3', '-c', f"import pyotp; print(pyotp.TOTP('{DDNS_TOTP_SECRET}').now())"]
|
|
).decode().strip()
|
|
except Exception as e:
|
|
print(f'[WARN] pyotp not available and fallback failed: {e} — skipping DDNS')
|
|
return
|
|
|
|
data = json.dumps({'name': cell_name, 'ip': public_ip}).encode()
|
|
req = urllib.request.Request(
|
|
f'{DDNS_URL}/register',
|
|
data=data,
|
|
headers={'Content-Type': 'application/json', 'X-Register-OTP': otp},
|
|
method='POST',
|
|
)
|
|
try:
|
|
resp = urllib.request.urlopen(req, timeout=10)
|
|
result = json.loads(resp.read())
|
|
token = result['token']
|
|
os.makedirs(os.path.dirname(token_path), exist_ok=True)
|
|
with open(token_path, 'w') as f:
|
|
f.write(token)
|
|
os.chmod(token_path, 0o600)
|
|
print(f'[CREATED] DDNS registration: {result["subdomain"]} ip={public_ip}')
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()
|
|
print(f'[WARN] DDNS registration failed ({e.code}): {body}')
|
|
except Exception as e:
|
|
print(f'[WARN] DDNS registration failed: {e}')
|
|
|
|
|
|
def bootstrap_admin_password():
|
|
import secrets as _secrets
|
|
users_file = os.path.join(ROOT, 'data', 'api', 'auth_users.json')
|
|
init_pw_file = os.path.join(ROOT, 'data', 'api', '.admin_initial_password')
|
|
|
|
# Idempotent: don't overwrite if admin already exists.
|
|
if os.path.exists(users_file):
|
|
try:
|
|
with open(users_file) as f:
|
|
users = json.loads(f.read() or '[]')
|
|
if any(u.get('role') == 'admin' for u in users):
|
|
print('[EXISTS] admin user — skipping password generation')
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
if not os.path.exists(users_file):
|
|
os.makedirs(os.path.dirname(users_file), exist_ok=True)
|
|
with open(users_file, 'w') as f:
|
|
f.write('[]')
|
|
os.chmod(users_file, 0o600)
|
|
|
|
password = os.environ.get('ADMIN_PASSWORD') or _secrets.token_urlsafe(18)
|
|
|
|
with open(init_pw_file, 'w') as f:
|
|
f.write(password)
|
|
os.chmod(init_pw_file, 0o600)
|
|
|
|
print()
|
|
print('=' * 62)
|
|
print(' ADMIN PASSWORD (shown once - save it before starting PIC):')
|
|
print(f' username : admin')
|
|
print(f' password : {password}')
|
|
print('=' * 62)
|
|
print(f' Also saved to: data/api/.admin_initial_password')
|
|
print(' (Delete that file after noting the password.)')
|
|
print('=' * 62)
|
|
print()
|
|
|
|
|
|
def main():
|
|
cell_name = os.environ.get('CELL_NAME', 'mycell')
|
|
domain = os.environ.get('CELL_DOMAIN', 'cell')
|
|
vpn_address = os.environ.get('VPN_ADDRESS', '10.0.0.1/24')
|
|
wg_port = int(os.environ.get('WG_PORT', '51820'))
|
|
# Prefer existing config ip_range over env var so `make setup` is safe to re-run
|
|
ip_range = os.environ.get('CELL_IP_RANGE') or _read_existing_ip_range() or '172.20.0.0/16'
|
|
|
|
print('--- Personal Internet Cell: Setup ---')
|
|
print(f' cell={cell_name} domain={domain} vpn={vpn_address} port={wg_port}')
|
|
print()
|
|
|
|
for d in REQUIRED_DIRS:
|
|
ensure_dir(d)
|
|
for f in REQUIRED_FILES:
|
|
ensure_file(f)
|
|
|
|
ensure_caddy_ca_cert()
|
|
priv, _pub = generate_wg_keys()
|
|
write_wg0_conf(priv, vpn_address, wg_port)
|
|
write_cell_config(cell_name, domain, wg_port)
|
|
write_compose_env(ip_range)
|
|
write_caddy_config(ip_range, cell_name, domain)
|
|
ensure_session_secret()
|
|
bootstrap_admin_password()
|
|
register_with_ddns(cell_name)
|
|
|
|
print()
|
|
print('--- Setup complete! Run: make start ---')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|