Files
pic/api/cell_link_manager.py
T
roof 639fb66e5b
Unit Tests / test (push) Successful in 9m45s
fix: complete cross-cell peer-sync push (domain SNI + source-preserving NAT)
Finishes the transport repair (L1+L2 landed in 714fb9b). The push now works
end-to-end between linked cells — verified live: offer/permission state
propagates automatically and the cell_relay derives/reverts without manual steps.

L3 — push by domain, not bare IP (cell_link_manager): the push targeted
https://<vpn-ip>, but in DDNS/ACME mode Caddy only holds a cert for the cell's
domain, so the TLS handshake failed by IP. Target https://<remote-domain> with
`curl --resolve <domain>:443:<dns_ip>` — connect to the VPN IP over the tunnel
but present the domain as SNI/Host. remote_api_url is now domain-based; legacy
http://ip:3000 and https://ip URLs migrate on load.

L4 — preserve the real source for auth (firewall_manager): the blanket
`-o eth0 MASQUERADE` rewrote the push source, so the remote's X-Forwarded-For
source-subnet auth couldn't match. apply_cell_rules adds a tightly-scoped nat
POSTROUTING RETURN (linked-subnet → caddy:443 only) above the masquerade; the
host route returns Caddy's reply through the tunnel. Reviewed by pic-security:
WireGuard per-cell AllowedIPs + Caddy last-XFF (no trusted_proxies) keep this
un-spoofable; the API stays 127.0.0.1-only.

Also:
- validate remote-invite domain/dns_ip/endpoint/subnet at ingest (they reach a
  curl --resolve argv — block leading-dash argument-injection).
- remove the host subnet route on cell unlink (remove_cell_subnet_route); the
  route was never cleaned, leaving a stale subnet that made is_local_request
  treat it as local. Mock firewall side-effects in the affected unit tests.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-17 01:02:20 -04:00

892 lines
40 KiB
Python

#!/usr/bin/env python3
"""
CellLinkManager — manages site-to-site connections between PIC cells.
Each connection is stored in data/cell_links.json and manifests as:
- A WireGuard [Peer] block (AllowedIPs = remote cell's VPN subnet)
- A CoreDNS forwarding block (remote domain → remote cell's DNS IP)
- An iptables FORWARD rule set (service-level access control)
"""
import ipaddress
import json
import logging
import os
import random
import re
import subprocess
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
VALID_SERVICES = ('calendar', 'files', 'mail', 'webdav')
_DEFAULT_PERMISSIONS = {
'inbound': {s: False for s in VALID_SERVICES},
'outbound': {s: False for s in VALID_SERVICES},
}
_PUSH_TIMEOUT = 5 # seconds
_BACKOFF_BASE_S = 60
_BACKOFF_MAX_S = 3600
# Strict formats for fields imported from a remote cell's invite. The domain and
# dns_ip flow into a `curl --resolve <domain>:443:<dns_ip>` argv (peer-sync push);
# anchoring them — domain must start alphanumeric, dns_ip must be an IP — prevents
# a malicious invite injecting a leading-dash value that curl reads as a flag.
_INVITE_HOSTNAME_RE = re.compile(r'^[A-Za-z0-9]([A-Za-z0-9.-]{0,253}[A-Za-z0-9])?$')
_INVITE_CELL_NAME_RE = re.compile(r'^[A-Za-z0-9][A-Za-z0-9 _.-]{0,63}$')
_INVITE_ENDPOINT_RE = re.compile(r'^[A-Za-z0-9][A-Za-z0-9._-]*:\d{1,5}$')
def _validate_invite_fields(invite: Dict[str, Any]) -> None:
"""Reject a remote cell's invite whose fields aren't strictly well-formed.
Defence-in-depth: these values come from another cell and reach iptables,
DNS config, and a curl argv (the peer-sync push --resolves <domain>:443:
<dns_ip>). Anchoring domain/dns_ip/endpoint to start alphanumeric blocks a
malicious leading-dash value that curl would read as a flag. The public_key
is validated downstream by WireGuardManager.add_cell_peer. Raise ValueError
on anything malformed.
"""
name = invite.get('cell_name', '')
if not isinstance(name, str) or not _INVITE_CELL_NAME_RE.match(name):
raise ValueError(f'invalid cell_name {name!r}')
domain = invite.get('domain', '')
if not isinstance(domain, str) or not _INVITE_HOSTNAME_RE.match(domain):
raise ValueError(f'invalid domain {domain!r}: must be a hostname')
try:
ipaddress.ip_address(str(invite.get('dns_ip', '')))
except ValueError:
raise ValueError(f"invalid dns_ip {invite.get('dns_ip')!r}")
try:
ipaddress.ip_network(str(invite.get('vpn_subnet', '')), strict=False)
except ValueError:
raise ValueError(f"invalid vpn_subnet {invite.get('vpn_subnet')!r}")
endpoint = invite.get('endpoint')
if endpoint and not _INVITE_ENDPOINT_RE.match(str(endpoint)):
raise ValueError(f'invalid endpoint {endpoint!r}')
def _remote_api_url(domain: Optional[str]) -> Optional[str]:
"""Base URL for a linked cell's API, reached over the WG tunnel.
Cross-cell peer-sync goes to the remote's Caddy on 443 (the WireGuard server
DNATs VPN-IP:443 → Caddy → API; the API's own :3000 binds 127.0.0.1 and is
unreachable from another cell). The URL uses the remote cell's DOMAIN — not
its VPN IP — because Caddy only holds a certificate for the domain (ACME) or
the .cell name (internal CA); a request by bare IP has no matching SNI and the
TLS handshake fails. The push connects to the VPN IP over the tunnel via
`curl --resolve <domain>:443:<dns_ip>` (see _push_permissions_to_remote).
"""
return f"https://{domain}" if domain else None
def _compute_next_retry(attempts: int) -> str:
"""Return an ISO timestamp for the earliest next retry using capped exponential backoff."""
delay = min(_BACKOFF_BASE_S * (2 ** (attempts - 1)), _BACKOFF_MAX_S)
delay += random.uniform(0, _BACKOFF_BASE_S / 2)
return (datetime.utcnow() + timedelta(seconds=delay)).isoformat()
def _default_perms() -> Dict[str, Any]:
return {
'inbound': {s: False for s in VALID_SERVICES},
'outbound': {s: False for s in VALID_SERVICES},
}
class CellLinkManager:
def __init__(self, data_dir: str, config_dir: str, wireguard_manager, network_manager):
self.data_dir = data_dir
self.config_dir = config_dir
self.wireguard_manager = wireguard_manager
self.network_manager = network_manager
self.links_file = os.path.join(data_dir, 'cell_links.json')
# ── Storage ───────────────────────────────────────────────────────────────
def _load(self) -> List[Dict[str, Any]]:
if os.path.exists(self.links_file):
try:
with open(self.links_file) as f:
links = json.load(f)
changed = False
for link in links:
if 'permissions' not in link:
link['permissions'] = _default_perms()
changed = True
# Phase 1 migration: permission-sync tracking fields
# Domain-based HTTPS URL. Rebuild if missing, or if it's a
# legacy form: http://<ip>:3000 (unreachable) or https://<ip>
# (no matching Caddy cert by bare IP).
_want_url = _remote_api_url(link.get('domain'))
if link.get('remote_api_url') != _want_url and _want_url:
link['remote_api_url'] = _want_url
changed = True
if 'last_push_status' not in link:
link['last_push_status'] = 'never'
changed = True
if 'last_push_at' not in link:
link['last_push_at'] = None
changed = True
if 'last_push_error' not in link:
link['last_push_error'] = None
changed = True
if 'pending_push' not in link:
# Existing links predate sync — mark pending so startup replay syncs them
link['pending_push'] = True
changed = True
if 'last_remote_update_at' not in link:
link['last_remote_update_at'] = None
changed = True
# Phase 2 migration: exit-offer signaling fields
if 'exit_offered' not in link:
link['exit_offered'] = False
changed = True
if 'remote_exit_offered' not in link:
link['remote_exit_offered'] = False
changed = True
# Phase 3 migration: per-peer internet routing
if 'exit_relay_active' not in link:
link['exit_relay_active'] = False
changed = True
if 'remote_exit_relay_active' not in link:
link['remote_exit_relay_active'] = False
changed = True
# Phase 4 migration: retry/backoff state
if 'push_attempts' not in link:
link['push_attempts'] = 0
changed = True
if 'next_retry_at' not in link:
link['next_retry_at'] = None
changed = True
if changed:
self._save(links)
return links
except Exception:
return []
return []
def _save(self, links: List[Dict[str, Any]]):
with open(self.links_file, 'w') as f:
json.dump(links, f, indent=2)
# ── Sync helpers ──────────────────────────────────────────────────────────
def _local_identity(self) -> Dict[str, str]:
"""Return this cell's name and WG public key for outbound peer-sync calls."""
from app import config_manager
identity = config_manager.configs.get('_identity', {})
cell_name = identity.get('cell_name', os.environ.get('CELL_NAME', 'mycell'))
keys = self.wireguard_manager.get_keys()
return {'cell_name': cell_name, 'public_key': keys.get('public_key', '')}
def _local_wg_ip(self) -> Optional[str]:
"""Return the local cell-wireguard wg0 IP (e.g. '10.0.0.1').
Used to set X-Forwarded-For so the remote peer-sync endpoint can
authenticate us by VPN subnet even after MASQUERADE changes the
apparent source to the Docker bridge IP.
"""
try:
r = subprocess.run(
['docker', 'exec', 'cell-wireguard',
'ip', 'addr', 'show', 'wg0'],
capture_output=True, text=True, timeout=5
)
for line in r.stdout.splitlines():
line = line.strip()
if line.startswith('inet '):
return line.split()[1].split('/')[0]
except Exception:
pass
return None
def _push_permissions_to_remote(self, link: Dict[str, Any],
from_cell: str,
from_public_key: str) -> Dict[str, Any]:
"""POST mirrored permissions to the remote cell's peer-sync endpoint.
Returns {'ok': bool, 'error': str|None}. Never raises.
The body inverts inbound/outbound: our inbound (what we share with them)
becomes their outbound (what they receive from us), and vice-versa.
Uses 'docker exec cell-wireguard curl' so the HTTP request originates
from inside cell-wireguard's network namespace, which has routes to
remote cell VPN subnets that cell-api (on the Docker bridge) lacks.
"""
url = link.get('remote_api_url')
if not url:
return {'ok': False, 'error': 'no remote_api_url'}
perms = link.get('permissions') or _default_perms()
body = {
'version': 1,
'from_cell': from_cell,
'from_public_key': from_public_key,
'permissions': {
'outbound': dict(perms.get('inbound', {})),
'inbound': dict(perms.get('outbound', {})),
},
'exit_offered': bool(link.get('exit_offered', False)),
'use_as_exit_relay': bool(link.get('exit_relay_active', False)),
'sent_at': datetime.utcnow().isoformat() + 'Z',
}
payload = json.dumps(body)
endpoint = url.rstrip('/') + '/api/cells/peer-sync/permissions'
# Determine local WG IP for X-Forwarded-For (belt-and-suspenders for the
# remote's source-subnet auth). With the peer-sync masquerade exclusion
# the remote's Caddy already sees our real VPN source and appends it, but
# passing it explicitly is harmless.
local_wg_ip = self._local_wg_ip()
xff_header = f'X-Forwarded-For: {local_wg_ip}' if local_wg_ip else None
# Reach the remote over the WG tunnel by its VPN IP, but present the
# cell's DOMAIN as SNI/Host so Caddy serves its certificate — a request
# to a bare IP has no matching cert and the TLS handshake fails. -k still
# covers LAN mode (internal-CA cert curl won't chain to).
domain = link.get('domain')
dns_ip = link.get('dns_ip')
cmd = [
'docker', 'exec', 'cell-wireguard',
'curl', '-s', '-k', '-o', '/dev/null', '-w', '%{http_code}',
]
if domain and dns_ip:
cmd += ['--resolve', f'{domain}:443:{dns_ip}']
cmd += [
'-X', 'POST',
'-H', 'Content-Type: application/json',
]
if xff_header:
cmd += ['-H', xff_header]
cmd += [
'-d', payload,
'--max-time', str(_PUSH_TIMEOUT),
'--connect-timeout', '3',
endpoint,
]
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=_PUSH_TIMEOUT + 5
)
if result.returncode != 0:
err = (result.stderr or result.stdout or 'curl error').strip()[:200]
return {'ok': False, 'error': err}
status = result.stdout.strip()
if status.startswith('2'):
return {'ok': True, 'error': None}
return {'ok': False, 'error': f'HTTP {status}'}
except subprocess.TimeoutExpired:
return {'ok': False, 'error': 'timeout'}
except Exception as e:
return {'ok': False, 'error': str(e)[:200]}
def _record_push_result(self, cell_name: str, result: Dict[str, Any]) -> None:
"""Persist last_push_* and pending_push after a push attempt."""
links = self._load()
for link in links:
if link['cell_name'] == cell_name:
if result.get('ok'):
link['last_push_status'] = 'ok'
link['last_push_at'] = datetime.utcnow().isoformat()
link['last_push_error'] = None
link['pending_push'] = False
link['push_attempts'] = 0
link['next_retry_at'] = None
else:
link['last_push_status'] = 'failed'
link['last_push_error'] = result.get('error')
link['pending_push'] = True
attempts = link.get('push_attempts', 0) + 1
link['push_attempts'] = attempts
link['next_retry_at'] = _compute_next_retry(attempts)
break
self._save(links)
def _try_push(self, cell_name: str, link: Dict[str, Any]) -> None:
"""Mark pending, push, record result. Non-fatal."""
# Mark dirty before pushing — a crash mid-push leaves it pending for replay
links = self._load()
for l in links:
if l['cell_name'] == cell_name:
l['pending_push'] = True
break
self._save(links)
try:
identity = self._local_identity()
result = self._push_permissions_to_remote(
link, identity['cell_name'], identity['public_key']
)
self._record_push_result(cell_name, result)
if not result['ok']:
logger.warning(
f"Permission push to '{cell_name}' failed "
f"(will retry on startup): {result['error']}"
)
except Exception as e:
logger.warning(f"Permission push to '{cell_name}' skipped (non-fatal): {e}")
def apply_remote_permissions(self, from_public_key: str,
permissions: Dict[str, Any],
exit_offered: bool = False,
use_as_exit_relay: bool = False) -> Dict[str, Any]:
"""Store permissions pushed by a remote cell (identified by WG public key).
Validates service names, persists, and re-applies local iptables rules.
Returns the updated link record.
"""
links = self._load()
link = next((l for l in links if l['public_key'] == from_public_key), None)
if not link:
raise ValueError(f"No connection found for public_key '{from_public_key[:16]}...'")
in_raw = permissions.get('inbound', {}) if isinstance(permissions, dict) else {}
out_raw = permissions.get('outbound', {}) if isinstance(permissions, dict) else {}
clean_inbound = {s: bool(in_raw.get(s, False)) for s in VALID_SERVICES}
clean_outbound = {s: bool(out_raw.get(s, False)) for s in VALID_SERVICES}
link['permissions'] = {'inbound': clean_inbound, 'outbound': clean_outbound}
link['remote_exit_offered'] = bool(exit_offered)
link['remote_exit_relay_active'] = bool(use_as_exit_relay)
link['last_remote_update_at'] = datetime.utcnow().isoformat()
self._save(links)
# Soft loop-detection warning: if the remote is asking us to act as exit
# AND we already have a peer routing via that cell, it's a potential cycle.
if use_as_exit_relay:
try:
from peer_registry import PeerRegistry
import os as _os
pr = PeerRegistry(_os.environ.get('DATA_DIR', '/app/data'))
loop_peers = [p['name'] for p in pr.list_peers()
if p.get('route_via') == link['cell_name']]
if loop_peers:
logger.warning(
f"apply_remote_permissions: '{link['cell_name']}' asked us to act as "
f"its exit relay, but we already route peers {loop_peers} via it — "
f"potential routing loop detected"
)
except Exception:
pass
inbound_list = [s for s, v in clean_inbound.items() if v]
try:
import firewall_manager as _fm
_fm.apply_cell_rules(link['cell_name'], link['vpn_subnet'], inbound_list,
exit_relay=use_as_exit_relay)
except Exception as e:
logger.warning(
f"apply_cell_rules after remote push for '{link['cell_name']}' "
f"failed (non-fatal): {e}"
)
return link
def replay_pending_pushes(self) -> Dict[str, int]:
"""Retry permission pushes for any link with pending_push=True.
Called from _apply_startup_enforcement. Returns counts for logging.
"""
summary = {'attempted': 0, 'ok': 0, 'failed': 0}
try:
identity = self._local_identity()
except Exception as e:
logger.warning(f"replay_pending_pushes: cannot resolve identity ({e})")
return summary
summary['deferred'] = 0
now_iso = datetime.utcnow().isoformat()
for link in self._load():
if not link.get('pending_push'):
continue
next_retry = link.get('next_retry_at')
if next_retry and next_retry > now_iso:
summary['deferred'] += 1
logger.info(
f"replay: skipping '{link['cell_name']}' — backoff until {next_retry}"
)
continue
summary['attempted'] += 1
result = self._push_permissions_to_remote(
link, identity['cell_name'], identity['public_key']
)
self._record_push_result(link['cell_name'], result)
if result.get('ok'):
summary['ok'] += 1
logger.info(f"replay: synced permissions to '{link['cell_name']}'")
else:
summary['failed'] += 1
logger.warning(
f"replay: push to '{link['cell_name']}' failed: {result.get('error')}"
)
if summary['attempted'] or summary.get('deferred'):
logger.info(
f"replay_pending_pushes: {summary['attempted']} attempted, "
f"{summary['ok']} ok, {summary['failed']} failed, "
f"{summary.get('deferred', 0)} deferred (backoff)"
)
return summary
# ── Public API ────────────────────────────────────────────────────────────
def generate_invite(self, cell_name: str, domain: str) -> Dict[str, Any]:
"""Return an invite package describing this cell for another cell to import.
The endpoint advertises the cell's public domain (when in a DDNS/ACME
mode) plus this cell's own WireGuard port, rather than a raw external IP —
so the remote cell reaches us by name and a NAT/router can forward each
cell's distinct WG port to the right host.
"""
keys = self.wireguard_manager.get_keys()
server_vpn_ip = self.wireguard_manager._get_configured_address().split('/')[0]
try:
from app import config_manager as _cm
except Exception:
_cm = None
endpoint = self.wireguard_manager.get_advertised_endpoint(_cm)
return {
'cell_name': cell_name,
'public_key': keys['public_key'],
'endpoint': endpoint,
'vpn_subnet': self.wireguard_manager._get_configured_network(),
'dns_ip': server_vpn_ip,
'domain': domain,
'version': 1,
}
def list_connections(self) -> List[Dict[str, Any]]:
return self._load()
def _check_invite_conflicts(self, invite: Dict[str, Any],
exclude_cell: str = '') -> None:
"""Raise ValueError if invite's subnet or domain conflicts with existing state."""
import ipaddress as _ip
links = self._load()
remote_subnet = invite.get('vpn_subnet', '')
remote_domain = invite.get('domain', '')
# Check VPN subnet: must not overlap our own subnet or any existing cell's subnet
try:
remote_net = _ip.ip_network(remote_subnet, strict=False)
own_net = _ip.ip_network(
self.wireguard_manager._get_configured_network(), strict=False)
if remote_net.overlaps(own_net):
raise ValueError(
f"VPN subnet {remote_subnet!r} overlaps this cell's own subnet "
f"{str(own_net)!r} — the remote cell must use a distinct IP range"
)
for link in links:
if link['cell_name'] == exclude_cell:
continue
existing_net = _ip.ip_network(link.get('vpn_subnet', '0.0.0.0/32'),
strict=False)
if remote_net.overlaps(existing_net):
raise ValueError(
f"VPN subnet {remote_subnet!r} overlaps already-connected cell "
f"'{link['cell_name']}' ({link['vpn_subnet']!r})"
)
except ValueError:
raise
except Exception as e:
logger.warning(f'_check_invite_conflicts subnet check skipped: {e}')
# Check domain: must not match our own domain or any existing cell's domain
if remote_domain:
try:
from app import config_manager
identity = config_manager.configs.get('_identity', {})
own_domain = identity.get('domain_name') or identity.get('domain', os.environ.get('CELL_DOMAIN', ''))
if own_domain and remote_domain == own_domain:
raise ValueError(
f"Domain {remote_domain!r} is the same as this cell's own domain — "
f"the remote cell must use a different domain name"
)
except ValueError:
raise
except Exception:
pass
for link in links:
if link['cell_name'] == exclude_cell:
continue
if link.get('domain') == remote_domain:
raise ValueError(
f"Domain {remote_domain!r} is already used by connected cell "
f"'{link['cell_name']}' — each cell must have a unique domain"
)
def _push_invite_to_remote(self, link: Dict[str, Any]) -> Dict[str, Any]:
"""Send OUR invite to the remote cell so it can complete mutual WG pairing.
Called immediately after adding the remote as our WG peer, before the WG
tunnel is up. Reaches the remote over the PUBLIC path at its advertised
endpoint host (a domain in DDNS/ACME modes) on Caddy/443 — the API's :3000
is 127.0.0.1-only and not reachable across cells. Non-fatal — one-sided
pairing degrades gracefully; the admin can pair from the other side.
"""
endpoint = link.get('endpoint') or ''
if not endpoint:
return {'ok': False, 'error': 'no endpoint'}
# Host from endpoint (e.g. "alice.pic.ngo:51821" → "alice.pic.ngo").
try:
host = endpoint.rsplit(':', 1)[0].strip('[]')
except Exception:
return {'ok': False, 'error': f'cannot parse endpoint {endpoint!r}'}
try:
identity = self._local_identity()
from app import config_manager
id_cfg = config_manager.configs.get('_identity', {})
own_domain = id_cfg.get('domain_name') or id_cfg.get('domain', os.environ.get('CELL_DOMAIN', 'cell'))
own_invite = self.generate_invite(identity['cell_name'], own_domain)
except Exception as e:
return {'ok': False, 'error': f'could not build own invite: {e}'}
url = f'https://{host}/api/cells/peer-sync/accept-invite'
payload = json.dumps({'invite': own_invite})
cmd = [
'docker', 'exec', 'cell-wireguard',
# -k: endpoint may be a bare IP (LAN/fallback) whose cert won't match;
# accept-invite carries only public keys and the WG handshake is the
# real authentication.
'curl', '-s', '-k', '-o', '/dev/null', '-w', '%{http_code}',
'-X', 'POST',
'-H', 'Content-Type: application/json',
'-d', payload,
'--max-time', str(_PUSH_TIMEOUT),
'--connect-timeout', '3',
url,
]
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=_PUSH_TIMEOUT + 5
)
if result.returncode != 0:
err = (result.stderr or result.stdout or 'curl error').strip()[:200]
return {'ok': False, 'error': err}
status = result.stdout.strip()
if status.startswith('2'):
return {'ok': True, 'error': None}
return {'ok': False, 'error': f'HTTP {status}'}
except Exception as e:
return {'ok': False, 'error': str(e)[:200]}
def accept_invite(self, invite: Dict[str, Any]) -> Dict[str, Any]:
"""Accept a remote cell's invite and complete mutual WG pairing.
Called by the /api/cells/peer-sync/accept-invite endpoint when the remote
cell pushes its own invite after we connected to it. Idempotent: if we
are already connected to this cell, returns the existing link.
"""
for field in ('cell_name', 'public_key', 'vpn_subnet', 'dns_ip', 'domain'):
if field not in invite:
raise ValueError(f"Invite missing field: {field!r}")
_validate_invite_fields(invite)
links = self._load()
name = invite['cell_name']
# Already connected — check whether the remote's endpoint, subnet, or domain changed
# (e.g. the remote cell changed its WireGuard address or domain) and heal if so.
existing = next((l for l in links if l['cell_name'] == name), None)
if existing:
dns_changed = existing.get('dns_ip') != invite['dns_ip']
subnet_changed = existing.get('vpn_subnet') != invite['vpn_subnet']
endpoint_changed = (invite.get('endpoint') and
invite['endpoint'] != existing.get('endpoint'))
domain_changed = (invite.get('domain') and
invite['domain'] != existing.get('domain'))
if dns_changed or subnet_changed or endpoint_changed or domain_changed:
# Before healing, verify the updated invite doesn't conflict with
# other connected cells (exclude this cell by name so it's not
# self-blocking when only endpoint/dns_ip changed).
self._check_invite_conflicts(invite, exclude_cell=name)
logger.info(
f"accept_invite: updating existing cell '{name}' "
f"(dns_ip: {existing.get('dns_ip')}{invite['dns_ip']}, "
f"vpn_subnet: {existing.get('vpn_subnet')}{invite['vpn_subnet']}, "
f"domain: {existing.get('domain')}{invite.get('domain')})"
)
old_subnet = existing.get('vpn_subnet', '')
old_domain = existing.get('domain', '')
existing['dns_ip'] = invite['dns_ip']
existing['vpn_subnet'] = invite['vpn_subnet']
existing['remote_api_url'] = _remote_api_url(invite['domain'])
if invite.get('endpoint'):
existing['endpoint'] = invite['endpoint']
if domain_changed:
existing['domain'] = invite['domain']
self._save(links)
# Update WG peer AllowedIPs to the new subnet
if subnet_changed and old_subnet:
self.wireguard_manager.update_peer_ip(
existing['public_key'], invite['vpn_subnet'])
# Update DNS forward rule — triggers on dns_ip OR domain change
if dns_changed or domain_changed:
try:
self.network_manager.remove_cell_dns_forward(old_domain)
except Exception:
pass
self.network_manager.add_cell_dns_forward(
domain=existing['domain'], dns_ip=invite['dns_ip'])
# Reapply firewall rules with new subnet
if subnet_changed:
try:
import firewall_manager as _fm
inbound_list = [s for s, v in
existing.get('permissions', {}).get('inbound', {}).items() if v]
_fm.apply_cell_rules(name, invite['vpn_subnet'], inbound_list)
except Exception as e:
logger.warning(f"apply_cell_rules after subnet update failed: {e}")
return existing
# Conflict check (exclude by name since we're adding for the first time)
self._check_invite_conflicts(invite)
ok = self.wireguard_manager.add_cell_peer(
name=name,
public_key=invite['public_key'],
endpoint=invite.get('endpoint', ''),
vpn_subnet=invite['vpn_subnet'],
)
if not ok:
raise RuntimeError(f"Failed to add WireGuard peer for cell '{name}'")
dns_result = self.network_manager.add_cell_dns_forward(
domain=invite['domain'],
dns_ip=invite['dns_ip'],
)
if dns_result.get('warnings'):
logger.warning('DNS forward warnings for %s (accept_invite): %s',
name, dns_result['warnings'])
link = {
'cell_name': name,
'public_key': invite['public_key'],
'endpoint': invite.get('endpoint'),
'vpn_subnet': invite['vpn_subnet'],
'dns_ip': invite['dns_ip'],
'domain': invite['domain'],
'connected_at': datetime.utcnow().isoformat(),
'permissions': _default_perms(),
'remote_api_url': _remote_api_url(invite['domain']),
'last_push_status': 'never',
'last_push_at': None,
'last_push_error': None,
'pending_push': True,
'last_remote_update_at': None,
}
links.append(link)
self._save(links)
try:
import firewall_manager as _fm
_fm.apply_cell_rules(name, invite['vpn_subnet'], [])
except Exception as e:
logger.warning(f"apply_cell_rules for {name} (accept_invite) failed: {e}")
logger.info(f"accept_invite: mutual pairing completed for cell '{name}'")
return link
def add_connection(self, invite: Dict[str, Any],
inbound_services: Optional[List[str]] = None) -> Dict[str, Any]:
"""Import a remote cell's invite and establish the connection."""
_validate_invite_fields(invite)
links = self._load()
name = invite['cell_name']
if any(l['cell_name'] == name for l in links):
raise ValueError(f"Cell '{name}' is already connected")
# Check for VPN subnet and domain conflicts before touching WG/DNS
self._check_invite_conflicts(invite)
ok = self.wireguard_manager.add_cell_peer(
name=name,
public_key=invite['public_key'],
endpoint=invite.get('endpoint', ''),
vpn_subnet=invite['vpn_subnet'],
)
if not ok:
raise RuntimeError(f"Failed to add WireGuard peer for cell '{name}'")
dns_result = self.network_manager.add_cell_dns_forward(
domain=invite['domain'],
dns_ip=invite['dns_ip'],
)
if dns_result.get('warnings'):
logger.warning('DNS forward warnings for %s: %s', name, dns_result['warnings'])
inbound = [s for s in (inbound_services or []) if s in VALID_SERVICES]
perms = _default_perms()
for s in inbound:
perms['inbound'][s] = True
link = {
'cell_name': name,
'public_key': invite['public_key'],
'endpoint': invite.get('endpoint'),
'vpn_subnet': invite['vpn_subnet'],
'dns_ip': invite['dns_ip'],
'domain': invite['domain'],
'connected_at': datetime.utcnow().isoformat(),
'permissions': perms,
'remote_api_url': _remote_api_url(invite['domain']),
'last_push_status': 'never',
'last_push_at': None,
'last_push_error': None,
'pending_push': True,
'last_remote_update_at': None,
}
links.append(link)
self._save(links)
try:
import firewall_manager as _fm
_fm.apply_cell_rules(name, invite['vpn_subnet'], inbound)
except Exception as e:
logger.warning(f"apply_cell_rules for {name} failed (non-fatal): {e}")
# Push OUR invite to the remote so it can complete mutual WG pairing.
# This is done over the LAN (before the WG tunnel is up) using the
# endpoint IP, so the remote doesn't need manual action from both sides.
try:
inv_result = self._push_invite_to_remote(link)
if inv_result.get('ok'):
logger.info(f"Mutual pairing invite accepted by '{name}'")
else:
logger.warning(
f"Invite push to '{name}' failed (manual pairing from remote required): "
f"{inv_result.get('error')}"
)
except Exception as e:
logger.warning(f"Invite push to '{name}' skipped (non-fatal): {e}")
# Initial permission push (uses WG tunnel — may fail if tunnel not yet up)
try:
identity = self._local_identity()
result = self._push_permissions_to_remote(
link, identity['cell_name'], identity['public_key']
)
self._record_push_result(name, result)
if not result['ok']:
logger.warning(
f"Initial permission push to '{name}' failed "
f"(will retry on startup): {result['error']}"
)
except Exception as e:
logger.warning(f"Initial permission push to '{name}' skipped (non-fatal): {e}")
return link
def remove_connection(self, cell_name: str):
"""Tear down a cell connection by name."""
links = self._load()
link = next((l for l in links if l['cell_name'] == cell_name), None)
if not link:
raise ValueError(f"Cell '{cell_name}' not found")
try:
import firewall_manager as _fm
_fm.clear_cell_rules(cell_name)
_fm.remove_cell_subnet_route(link.get('vpn_subnet', ''))
except Exception as e:
logger.warning(f"firewall teardown for {cell_name} failed (non-fatal): {e}")
self.wireguard_manager.remove_peer(link['public_key'])
self.network_manager.remove_cell_dns_forward(link['domain'])
links = [l for l in links if l['cell_name'] != cell_name]
self._save(links)
def update_permissions(self, cell_name: str,
inbound: Dict[str, bool],
outbound: Dict[str, bool]) -> Dict[str, Any]:
"""Update service sharing permissions for a cell connection.
Validates, persists, re-applies iptables, then pushes to remote.
Returns the updated link record.
"""
links = self._load()
link = next((l for l in links if l['cell_name'] == cell_name), None)
if not link:
raise ValueError(f"Cell '{cell_name}' not found")
clean_inbound = {s: bool(inbound.get(s, False)) for s in VALID_SERVICES}
clean_outbound = {s: bool(outbound.get(s, False)) for s in VALID_SERVICES}
link['permissions'] = {'inbound': clean_inbound, 'outbound': clean_outbound}
self._save(links)
inbound_list = [s for s, v in clean_inbound.items() if v]
try:
import firewall_manager as _fm
_fm.apply_cell_rules(cell_name, link['vpn_subnet'], inbound_list)
except Exception as e:
logger.warning(f"apply_cell_rules for {cell_name} failed (non-fatal): {e}")
# Push mirrored state to the remote cell (non-fatal)
self._try_push(cell_name, link)
return link
def get_permissions(self, cell_name: str) -> Dict[str, Any]:
"""Return the permissions dict for a connected cell."""
links = self._load()
link = next((l for l in links if l['cell_name'] == cell_name), None)
if not link:
raise ValueError(f"Cell '{cell_name}' not found")
return link.get('permissions', _default_perms())
def set_exit_offered(self, cell_name: str, offered: bool) -> Dict[str, Any]:
"""Toggle whether THIS cell offers to route internet traffic for cell_name.
The new value is persisted locally then pushed to the remote cell so it
knows our offer changed. Returns the updated link record.
"""
links = self._load()
link = next((l for l in links if l['cell_name'] == cell_name), None)
if not link:
raise ValueError(f"Cell '{cell_name}' not found")
link['exit_offered'] = bool(offered)
self._save(links)
self._try_push(cell_name, link)
return link
def set_exit_relay_active(self, cell_name: str, active: bool) -> Dict[str, Any]:
"""Record that THIS cell is routing a peer's internet traffic via cell_name.
Persists the flag locally and pushes updated state to the remote cell so
it can enable/disable the FORWARD-to-eth0 rule on its side.
Returns the updated link record.
"""
links = self._load()
link = next((l for l in links if l['cell_name'] == cell_name), None)
if not link:
raise ValueError(f"Cell '{cell_name}' not found")
link['exit_relay_active'] = bool(active)
self._save(links)
self._try_push(cell_name, link)
return link
def get_connection_status(self, cell_name: str) -> Dict[str, Any]:
"""Return link record enriched with live WireGuard handshake status."""
links = self._load()
link = next((l for l in links if l['cell_name'] == cell_name), None)
if not link:
raise ValueError(f"Cell '{cell_name}' not found")
try:
st = self.wireguard_manager.get_peer_status(link['public_key'])
return {**link, 'online': st.get('online', False),
'last_handshake': st.get('last_handshake')}
except Exception:
return {**link, 'online': False, 'last_handshake': None}