From f77d7fabcd989bd60496b88ba5d554b49ca71a29 Mon Sep 17 00:00:00 2001 From: Dmitrii Iurco Date: Sat, 9 May 2026 09:42:00 -0400 Subject: [PATCH] =?UTF-8?q?Phase=203:=20ddns=5Fmanager=20=E2=80=94=20DDNS?= =?UTF-8?q?=20client,=20provider=20adapters,=20IP=20heartbeat?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- api/app.py | 4 + api/ddns_manager.py | 486 +++++++++++++++++++++++++++++++++++ api/managers.py | 3 + api/routes/setup.py | 7 + tests/test_ddns_manager.py | 510 +++++++++++++++++++++++++++++++++++++ 5 files changed, 1010 insertions(+) create mode 100644 api/ddns_manager.py create mode 100644 tests/test_ddns_manager.py diff --git a/api/app.py b/api/app.py index af3aee0..2c15978 100644 --- a/api/app.py +++ b/api/app.py @@ -42,6 +42,7 @@ from managers import ( routing_manager, vault_manager, container_manager, cell_link_manager, auth_manager, setup_manager, caddy_manager, + ddns_manager, firewall_manager, EventType, ) # Re-exports: tests do `from app import CellManager` and `from app import _resolve_peer_dns` @@ -578,6 +579,9 @@ def health_monitor_loop(): health_monitor_thread = threading.Thread(target=health_monitor_loop, daemon=True) health_monitor_thread.start() +# Start DDNS heartbeat thread (updates public IP every 5 minutes when a provider is configured) +ddns_manager.start_heartbeat() + def _local_subnets(): """Return all subnets the container is directly connected to (from routing table).""" import ipaddress as _ipa, socket as _sock, struct as _struct diff --git a/api/ddns_manager.py b/api/ddns_manager.py new file mode 100644 index 0000000..873c616 --- /dev/null +++ b/api/ddns_manager.py @@ -0,0 +1,486 @@ +#!/usr/bin/env python3 +""" +DDNS Manager for Personal Internet Cell. + +Provides a provider-agnostic adapter for Dynamic DNS services used to keep the +cell's public IP registered under its chosen domain. + +Supported providers: + pic_ngo — pic.ngo DDNS service (primary / Phase 3 wiring) + cloudflare — Cloudflare API v4 (stub; full impl in Phase 3b) + duckdns — DuckDNS (stub; no DNS-01 support) + noip — No-IP (stub) + freedns — FreeDNS (stub) + +The manager runs a background heartbeat thread that re-publishes the public IP +every 5 minutes, skipping the call when the IP has not changed. +""" + +import logging +import threading +import time +from typing import Any, Dict, Optional + +import requests + +from base_service_manager import BaseServiceManager + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Custom exception +# --------------------------------------------------------------------------- + +class DDNSError(Exception): + """Raised when a DDNS provider returns an error response.""" + + +# --------------------------------------------------------------------------- +# Provider base class +# --------------------------------------------------------------------------- + +class DDNSProvider: + """Base class — all providers implement these methods.""" + + def register(self, name: str, ip: str) -> dict: + """Register subdomain. Returns {'token': str, 'subdomain': str}.""" + raise NotImplementedError + + def update(self, token: str, ip: str) -> bool: + """Update A record. Returns True on success.""" + raise NotImplementedError + + def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool: + raise NotImplementedError + + def dns_challenge_delete(self, token: str, fqdn: str) -> bool: + raise NotImplementedError + + +# --------------------------------------------------------------------------- +# pic.ngo provider +# --------------------------------------------------------------------------- + +class PicNgoDDNS(DDNSProvider): + """DDNS provider backed by the roof/pic-ddns API at ddns.pic.ngo.""" + + DEFAULT_API_BASE = 'https://ddns.pic.ngo' + TIMEOUT = 10 + + def __init__(self, api_base_url: Optional[str] = None): + self.api_base_url = (api_base_url or self.DEFAULT_API_BASE).rstrip('/') + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _headers(self, token: Optional[str] = None) -> Dict[str, str]: + h: Dict[str, str] = {'Content-Type': 'application/json'} + if token: + h['Authorization'] = f'Bearer {token}' + return h + + def _raise_for_status(self, response: requests.Response, action: str): + if not response.ok: + raise DDNSError( + f"PicNgoDDNS {action} failed: HTTP {response.status_code} — {response.text}" + ) + + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + + def register(self, name: str, ip: str) -> dict: + """POST /api/v1/register — register subdomain, returns token + subdomain.""" + url = f'{self.api_base_url}/api/v1/register' + payload = {'name': name, 'ip': ip} + resp = requests.post(url, json=payload, headers=self._headers(), timeout=self.TIMEOUT) + self._raise_for_status(resp, 'register') + return resp.json() + + def update(self, token: str, ip: str) -> bool: + """PUT /api/v1/update — update A record.""" + url = f'{self.api_base_url}/api/v1/update' + payload = {'ip': ip} + resp = requests.put(url, json=payload, + headers=self._headers(token), timeout=self.TIMEOUT) + self._raise_for_status(resp, 'update') + return True + + def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool: + """POST /api/v1/dns-challenge — create DNS-01 TXT record.""" + url = f'{self.api_base_url}/api/v1/dns-challenge' + payload = {'fqdn': fqdn, 'value': value} + resp = requests.post(url, json=payload, + headers=self._headers(token), timeout=self.TIMEOUT) + self._raise_for_status(resp, 'dns_challenge_create') + return True + + def dns_challenge_delete(self, token: str, fqdn: str) -> bool: + """DELETE /api/v1/dns-challenge — remove DNS-01 TXT record.""" + url = f'{self.api_base_url}/api/v1/dns-challenge' + payload = {'fqdn': fqdn} + resp = requests.delete(url, json=payload, + headers=self._headers(token), timeout=self.TIMEOUT) + self._raise_for_status(resp, 'dns_challenge_delete') + return True + + +# --------------------------------------------------------------------------- +# Cloudflare provider (stub) +# --------------------------------------------------------------------------- + +class CloudflareDDNS(DDNSProvider): + """DDNS via Cloudflare API v4. Stub — full impl in Phase 3b.""" + + API_BASE = 'https://api.cloudflare.com/client/v4' + TIMEOUT = 10 + + def __init__(self, api_token: str, zone_id: str): + self.api_token = api_token + self.zone_id = zone_id + + def _headers(self) -> Dict[str, str]: + return { + 'Authorization': f'Bearer {self.api_token}', + 'Content-Type': 'application/json', + } + + def register(self, name: str, ip: str) -> dict: + # Cloudflare doesn't have a registration step — return stub data. + return {'token': self.api_token, 'subdomain': name} + + def update(self, token: str, ip: str) -> bool: + """PATCH /zones/{zone_id}/dns_records — update A record.""" + url = f'{self.API_BASE}/zones/{self.zone_id}/dns_records' + resp = requests.patch(url, json={'ip': ip}, headers=self._headers(), + timeout=self.TIMEOUT) + return resp.ok + + def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool: + """POST TXT record for DNS-01 challenge.""" + url = f'{self.API_BASE}/zones/{self.zone_id}/dns_records' + payload = {'type': 'TXT', 'name': fqdn, 'content': value, 'ttl': 120} + resp = requests.post(url, json=payload, headers=self._headers(), + timeout=self.TIMEOUT) + return resp.ok + + def dns_challenge_delete(self, token: str, fqdn: str) -> bool: + """DELETE TXT record for DNS-01 challenge.""" + # A real impl would look up the record ID first; stub returns True. + return True + + +# --------------------------------------------------------------------------- +# DuckDNS provider (stub) +# --------------------------------------------------------------------------- + +class DuckDNSDDNS(DDNSProvider): + """DDNS via DuckDNS. Stub — DNS-01 challenge not supported.""" + + UPDATE_URL = 'https://www.duckdns.org/update' + TIMEOUT = 10 + + def __init__(self, token: str, domain: str): + self._token = token + self._domain = domain + + def register(self, name: str, ip: str) -> dict: + return {'token': self._token, 'subdomain': name} + + def update(self, token: str, ip: str) -> bool: + params = {'domains': self._domain, 'token': token, 'ip': ip} + resp = requests.get(self.UPDATE_URL, params=params, timeout=self.TIMEOUT) + return resp.ok and resp.text.strip() == 'OK' + + def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool: + raise NotImplementedError("DuckDNS does not support programmatic TXT record creation") + + def dns_challenge_delete(self, token: str, fqdn: str) -> bool: + raise NotImplementedError("DuckDNS does not support programmatic TXT record deletion") + + +# --------------------------------------------------------------------------- +# No-IP provider (stub) +# --------------------------------------------------------------------------- + +class NoIPDDNS(DDNSProvider): + """DDNS via No-IP. Stub — DNS-01 not supported.""" + + def register(self, name: str, ip: str) -> dict: + raise NotImplementedError + + def update(self, token: str, ip: str) -> bool: + raise NotImplementedError + + def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool: + raise NotImplementedError + + def dns_challenge_delete(self, token: str, fqdn: str) -> bool: + raise NotImplementedError + + +# --------------------------------------------------------------------------- +# FreeDNS provider (stub) +# --------------------------------------------------------------------------- + +class FreeDNSDDNS(DDNSProvider): + """DDNS via FreeDNS. Stub — DNS-01 not supported.""" + + def register(self, name: str, ip: str) -> dict: + raise NotImplementedError + + def update(self, token: str, ip: str) -> bool: + raise NotImplementedError + + def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool: + raise NotImplementedError + + def dns_challenge_delete(self, token: str, fqdn: str) -> bool: + raise NotImplementedError + + +# --------------------------------------------------------------------------- +# Public IP helper +# --------------------------------------------------------------------------- + +def _get_public_ip() -> Optional[str]: + """Return the current public IPv4 address using ipify, or None on failure.""" + try: + resp = requests.get('https://api.ipify.org', timeout=10) + if resp.ok: + return resp.text.strip() + except Exception as exc: + logger.warning("Could not determine public IP: %s", exc) + return None + + +# --------------------------------------------------------------------------- +# Manager +# --------------------------------------------------------------------------- + +_HEARTBEAT_INTERVAL = 300 # 5 minutes + + +class DDNSManager(BaseServiceManager): + """Manages DDNS registration and periodic IP updates.""" + + def __init__(self, config_manager=None, + data_dir: str = '/app/data', + config_dir: str = '/app/config'): + super().__init__('ddns', data_dir, config_dir) + self.config_manager = config_manager + self._last_ip: Optional[str] = None + self._stop_event = threading.Event() + self._heartbeat_thread: Optional[threading.Thread] = None + + # ------------------------------------------------------------------ + # BaseServiceManager abstract method implementations + # ------------------------------------------------------------------ + + def get_status(self) -> Dict[str, Any]: + identity = self._identity() + domain_cfg = identity.get('domain', {}) + return { + 'service': 'ddns', + 'provider': domain_cfg.get('ddns', {}).get('provider') if domain_cfg else None, + 'last_ip': self._last_ip, + 'heartbeat_running': ( + self._heartbeat_thread is not None and + self._heartbeat_thread.is_alive() + ), + } + + def test_connectivity(self) -> Dict[str, Any]: + provider = self.get_provider() + if provider is None: + return {'success': False, 'reason': 'No DDNS provider configured'} + ip = _get_public_ip() + if ip is None: + return {'success': False, 'reason': 'Could not reach ipify'} + return {'success': True, 'public_ip': ip} + + # ------------------------------------------------------------------ + # Identity helpers + # ------------------------------------------------------------------ + + def _identity(self) -> Dict[str, Any]: + if self.config_manager is None: + return {} + return self.config_manager.get_identity() or {} + + # ------------------------------------------------------------------ + # Provider factory + # ------------------------------------------------------------------ + + def get_provider(self) -> Optional[DDNSProvider]: + """Instantiate and return the configured DDNS provider, or None.""" + identity = self._identity() + domain_cfg = identity.get('domain', {}) + if not domain_cfg: + return None + ddns_cfg = domain_cfg.get('ddns', {}) + if not ddns_cfg: + return None + + provider_name = ddns_cfg.get('provider') + if not provider_name: + return None + + if provider_name == 'pic_ngo': + api_base = ddns_cfg.get('api_base_url') + return PicNgoDDNS(api_base_url=api_base) + + if provider_name == 'cloudflare': + return CloudflareDDNS( + api_token=ddns_cfg.get('api_token', ''), + zone_id=ddns_cfg.get('zone_id', ''), + ) + + if provider_name == 'duckdns': + return DuckDNSDDNS( + token=ddns_cfg.get('token', ''), + domain=ddns_cfg.get('domain', ''), + ) + + if provider_name == 'noip': + return NoIPDDNS() + + if provider_name == 'freedns': + return FreeDNSDDNS() + + logger.warning("Unknown DDNS provider: %s", provider_name) + return None + + # ------------------------------------------------------------------ + # Registration + # ------------------------------------------------------------------ + + def register(self, name: str, ip: str) -> dict: + """Register the cell's subdomain with the configured provider. + + Stores the returned token in the identity config under + identity['domain']['ddns']['token'] and records the subdomain. + Returns the dict from provider.register(). + """ + provider = self.get_provider() + if provider is None: + raise DDNSError("No DDNS provider configured") + + result = provider.register(name, ip) + + # Persist token + subdomain back into identity + identity = self._identity() + domain_cfg = dict(identity.get('domain', {})) + ddns_cfg = dict(domain_cfg.get('ddns', {})) + if 'token' in result: + ddns_cfg['token'] = result['token'] + if 'subdomain' in result: + ddns_cfg['subdomain'] = result['subdomain'] + domain_cfg['ddns'] = ddns_cfg + if self.config_manager is not None: + self.config_manager.set_identity_field('domain', domain_cfg) + + self._last_ip = ip + return result + + # ------------------------------------------------------------------ + # IP update + # ------------------------------------------------------------------ + + def update_ip(self): + """Fetch current public IP and update DDNS only if it has changed.""" + provider = self.get_provider() + if provider is None: + logger.debug("DDNS update_ip: no provider configured, skipping") + return + + current_ip = _get_public_ip() + if current_ip is None: + logger.warning("DDNS update_ip: could not determine public IP") + return + + if current_ip == self._last_ip: + logger.debug("DDNS update_ip: IP unchanged (%s), skipping", current_ip) + return + + identity = self._identity() + domain_cfg = identity.get('domain', {}) + ddns_cfg = domain_cfg.get('ddns', {}) if domain_cfg else {} + token = ddns_cfg.get('token', '') + + try: + success = provider.update(token, current_ip) + if success: + logger.info("DDNS update_ip: updated to %s", current_ip) + self._last_ip = current_ip + else: + logger.warning("DDNS update_ip: provider.update() returned False") + except DDNSError as exc: + logger.error("DDNS update_ip: provider error: %s", exc) + + # ------------------------------------------------------------------ + # Heartbeat + # ------------------------------------------------------------------ + + def start_heartbeat(self): + """Start a daemon thread that calls update_ip() every 5 minutes.""" + if self._heartbeat_thread is not None and self._heartbeat_thread.is_alive(): + logger.debug("DDNS heartbeat already running") + return + + self._stop_event.clear() + self._heartbeat_thread = threading.Thread( + target=self._heartbeat_loop, + name='ddns-heartbeat', + daemon=True, + ) + self._heartbeat_thread.start() + logger.info("DDNS heartbeat thread started (interval=%ds)", _HEARTBEAT_INTERVAL) + + def stop_heartbeat(self): + """Signal the heartbeat thread to stop and wait for it to exit.""" + self._stop_event.set() + if self._heartbeat_thread is not None: + self._heartbeat_thread.join(timeout=10) + self._heartbeat_thread = None + + def _heartbeat_loop(self): + """Internal: run update_ip() periodically until _stop_event is set.""" + while not self._stop_event.is_set(): + try: + self.update_ip() + except Exception as exc: + logger.warning("DDNS heartbeat: unexpected error: %s", exc) + # Sleep in short slices so stop_heartbeat() is responsive + for _ in range(_HEARTBEAT_INTERVAL): + if self._stop_event.is_set(): + break + time.sleep(1) + + # ------------------------------------------------------------------ + # DNS challenge delegation + # ------------------------------------------------------------------ + + def dns_challenge_create(self, fqdn: str, value: str) -> bool: + """Create a DNS-01 TXT record via the configured provider.""" + provider = self.get_provider() + if provider is None: + raise DDNSError("No DDNS provider configured") + identity = self._identity() + domain_cfg = identity.get('domain', {}) + ddns_cfg = domain_cfg.get('ddns', {}) if domain_cfg else {} + token = ddns_cfg.get('token', '') + return provider.dns_challenge_create(token, fqdn, value) + + def dns_challenge_delete(self, fqdn: str) -> bool: + """Delete a DNS-01 TXT record via the configured provider.""" + provider = self.get_provider() + if provider is None: + raise DDNSError("No DDNS provider configured") + identity = self._identity() + domain_cfg = identity.get('domain', {}) + ddns_cfg = domain_cfg.get('ddns', {}) if domain_cfg else {} + token = ddns_cfg.get('token', '') + return provider.dns_challenge_delete(token, fqdn) diff --git a/api/managers.py b/api/managers.py index 738cba3..05d3844 100644 --- a/api/managers.py +++ b/api/managers.py @@ -29,6 +29,7 @@ import firewall_manager from auth_manager import AuthManager from setup_manager import SetupManager from caddy_manager import CaddyManager +from ddns_manager import DDNSManager DATA_DIR = os.environ.get('DATA_DIR', '/app/data') CONFIG_DIR = os.environ.get('CONFIG_DIR', '/app/config') @@ -57,6 +58,7 @@ cell_link_manager = CellLinkManager( auth_manager = AuthManager(data_dir=DATA_DIR, config_dir=CONFIG_DIR) setup_manager = SetupManager(config_manager=config_manager, auth_manager=auth_manager) caddy_manager = CaddyManager(config_manager=config_manager, data_dir=DATA_DIR, config_dir=CONFIG_DIR) +ddns_manager = DDNSManager(config_manager=config_manager, data_dir=DATA_DIR, config_dir=CONFIG_DIR) # Service logger configuration _service_log_configs = { @@ -91,6 +93,7 @@ __all__ = [ 'email_manager', 'calendar_manager', 'file_manager', 'routing_manager', 'vault_manager', 'container_manager', 'cell_link_manager', 'auth_manager', 'setup_manager', 'caddy_manager', + 'ddns_manager', 'firewall_manager', 'EventType', 'DATA_DIR', 'CONFIG_DIR', ] diff --git a/api/routes/setup.py b/api/routes/setup.py index ab85ff5..b49baed 100644 --- a/api/routes/setup.py +++ b/api/routes/setup.py @@ -55,4 +55,11 @@ def complete_setup(): payload = request.get_json(silent=True) or {} result = sm.complete_setup(payload) status_code = 200 if result.get('success') else 400 + + # TODO (Phase 3): if result.get('success') and domain_mode == 'pic_ngo': + # from app import ddns_manager + # name = payload.get('cell_name', '') + # ip = payload.get('public_ip', '') + # ddns_manager.register(name, ip) + return jsonify(result), status_code diff --git a/tests/test_ddns_manager.py b/tests/test_ddns_manager.py new file mode 100644 index 0000000..d585bf1 --- /dev/null +++ b/tests/test_ddns_manager.py @@ -0,0 +1,510 @@ +"""Tests for DDNSManager and DDNS provider classes.""" + +import os +import sys +import threading +import time +import unittest +from unittest.mock import MagicMock, patch, call + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api')) + +from ddns_manager import ( + DDNSManager, + DDNSProvider, + DDNSError, + PicNgoDDNS, + CloudflareDDNS, + DuckDNSDDNS, + NoIPDDNS, + FreeDNSDDNS, + _get_public_ip, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_response(status_code=200, json_data=None, text=''): + """Build a minimal requests.Response-like mock.""" + resp = MagicMock() + resp.ok = (200 <= status_code < 300) + resp.status_code = status_code + resp.json.return_value = json_data or {} + resp.text = text + return resp + + +def _make_config_manager(ddns_cfg=None, domain_cfg=None): + """Return a mock config_manager whose get_identity() returns a useful dict.""" + cm = MagicMock() + if ddns_cfg is not None: + identity = {'domain': {'ddns': ddns_cfg}} + elif domain_cfg is not None: + identity = {'domain': domain_cfg} + else: + identity = {} + cm.get_identity.return_value = identity + return cm + + +# --------------------------------------------------------------------------- +# PicNgoDDNS tests +# --------------------------------------------------------------------------- + +class TestPicNgoDDNSRegister(unittest.TestCase): + """PicNgoDDNS.register() calls the correct URL with the correct body.""" + + def test_register_calls_correct_url(self): + provider = PicNgoDDNS(api_base_url='https://ddns.example.com') + mock_resp = _make_response(200, json_data={'token': 'tok123', 'subdomain': 'alpha'}) + with patch('requests.post', return_value=mock_resp) as mock_post: + result = provider.register('alpha', '1.2.3.4') + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + self.assertEqual(args[0], 'https://ddns.example.com/api/v1/register') + self.assertEqual(kwargs['json'], {'name': 'alpha', 'ip': '1.2.3.4'}) + self.assertEqual(result, {'token': 'tok123', 'subdomain': 'alpha'}) + + def test_register_raises_ddns_error_on_http_error(self): + provider = PicNgoDDNS() + mock_resp = _make_response(500, text='Internal Server Error') + with patch('requests.post', return_value=mock_resp): + with self.assertRaises(DDNSError): + provider.register('alpha', '1.2.3.4') + + def test_register_no_token_in_header(self): + """register() must NOT send an Authorization header (no token yet).""" + provider = PicNgoDDNS() + mock_resp = _make_response(200, json_data={'token': 't', 'subdomain': 'x'}) + with patch('requests.post', return_value=mock_resp) as mock_post: + provider.register('x', '1.2.3.4') + _, kwargs = mock_post.call_args + self.assertNotIn('Authorization', kwargs.get('headers', {})) + + +class TestPicNgoDDNSUpdate(unittest.TestCase): + """PicNgoDDNS.update() calls the correct URL with Authorization header.""" + + def test_update_uses_bearer_token(self): + provider = PicNgoDDNS(api_base_url='https://ddns.example.com') + mock_resp = _make_response(200) + with patch('requests.put', return_value=mock_resp) as mock_put: + result = provider.update('mytoken', '5.6.7.8') + mock_put.assert_called_once() + args, kwargs = mock_put.call_args + self.assertEqual(args[0], 'https://ddns.example.com/api/v1/update') + self.assertIn('Authorization', kwargs['headers']) + self.assertEqual(kwargs['headers']['Authorization'], 'Bearer mytoken') + self.assertEqual(kwargs['json'], {'ip': '5.6.7.8'}) + self.assertTrue(result) + + def test_update_raises_ddns_error_on_failure(self): + provider = PicNgoDDNS() + mock_resp = _make_response(403, text='Forbidden') + with patch('requests.put', return_value=mock_resp): + with self.assertRaises(DDNSError): + provider.update('badtoken', '1.2.3.4') + + +class TestPicNgoDDNSChallenges(unittest.TestCase): + """PicNgoDDNS.dns_challenge_create/delete call correct endpoints.""" + + def test_dns_challenge_create_calls_post(self): + provider = PicNgoDDNS(api_base_url='https://ddns.example.com') + mock_resp = _make_response(200) + with patch('requests.post', return_value=mock_resp) as mock_post: + result = provider.dns_challenge_create('tok', '_acme.alpha.pic.ngo', 'abc123') + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + self.assertEqual(args[0], 'https://ddns.example.com/api/v1/dns-challenge') + self.assertEqual(kwargs['json'], {'fqdn': '_acme.alpha.pic.ngo', 'value': 'abc123'}) + self.assertEqual(kwargs['headers']['Authorization'], 'Bearer tok') + self.assertTrue(result) + + def test_dns_challenge_delete_calls_delete(self): + provider = PicNgoDDNS(api_base_url='https://ddns.example.com') + mock_resp = _make_response(200) + with patch('requests.delete', return_value=mock_resp) as mock_del: + result = provider.dns_challenge_delete('tok', '_acme.alpha.pic.ngo') + mock_del.assert_called_once() + args, kwargs = mock_del.call_args + self.assertEqual(args[0], 'https://ddns.example.com/api/v1/dns-challenge') + self.assertEqual(kwargs['json'], {'fqdn': '_acme.alpha.pic.ngo'}) + self.assertEqual(kwargs['headers']['Authorization'], 'Bearer tok') + self.assertTrue(result) + + def test_dns_challenge_create_raises_on_error(self): + provider = PicNgoDDNS() + mock_resp = _make_response(500, text='error') + with patch('requests.post', return_value=mock_resp): + with self.assertRaises(DDNSError): + provider.dns_challenge_create('tok', 'fqdn', 'val') + + def test_dns_challenge_delete_raises_on_error(self): + provider = PicNgoDDNS() + mock_resp = _make_response(404, text='not found') + with patch('requests.delete', return_value=mock_resp): + with self.assertRaises(DDNSError): + provider.dns_challenge_delete('tok', 'fqdn') + + +# --------------------------------------------------------------------------- +# DDNSManager.get_provider() tests +# --------------------------------------------------------------------------- + +class TestGetProvider(unittest.TestCase): + """DDNSManager.get_provider() returns the correct provider class.""" + + def test_returns_pic_ngo_provider(self): + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo'}) + mgr = DDNSManager(config_manager=cm) + provider = mgr.get_provider() + self.assertIsInstance(provider, PicNgoDDNS) + + def test_returns_none_when_no_ddns_config(self): + cm = _make_config_manager() # empty identity + mgr = DDNSManager(config_manager=cm) + provider = mgr.get_provider() + self.assertIsNone(provider) + + def test_returns_none_when_no_provider_key(self): + cm = _make_config_manager(ddns_cfg={}) + mgr = DDNSManager(config_manager=cm) + provider = mgr.get_provider() + self.assertIsNone(provider) + + def test_returns_cloudflare_provider(self): + cm = _make_config_manager(ddns_cfg={ + 'provider': 'cloudflare', + 'api_token': 'cf_tok', + 'zone_id': 'zid', + }) + mgr = DDNSManager(config_manager=cm) + provider = mgr.get_provider() + self.assertIsInstance(provider, CloudflareDDNS) + + def test_returns_duckdns_provider(self): + cm = _make_config_manager(ddns_cfg={ + 'provider': 'duckdns', + 'token': 'duck_tok', + 'domain': 'mypic', + }) + mgr = DDNSManager(config_manager=cm) + provider = mgr.get_provider() + self.assertIsInstance(provider, DuckDNSDDNS) + + def test_returns_noip_provider(self): + cm = _make_config_manager(ddns_cfg={'provider': 'noip'}) + mgr = DDNSManager(config_manager=cm) + provider = mgr.get_provider() + self.assertIsInstance(provider, NoIPDDNS) + + def test_returns_freedns_provider(self): + cm = _make_config_manager(ddns_cfg={'provider': 'freedns'}) + mgr = DDNSManager(config_manager=cm) + provider = mgr.get_provider() + self.assertIsInstance(provider, FreeDNSDDNS) + + def test_returns_none_for_unknown_provider(self): + cm = _make_config_manager(ddns_cfg={'provider': 'nonexistent'}) + mgr = DDNSManager(config_manager=cm) + provider = mgr.get_provider() + self.assertIsNone(provider) + + def test_uses_custom_api_base_url(self): + cm = _make_config_manager(ddns_cfg={ + 'provider': 'pic_ngo', + 'api_base_url': 'https://custom.example.com', + }) + mgr = DDNSManager(config_manager=cm) + provider = mgr.get_provider() + self.assertIsInstance(provider, PicNgoDDNS) + self.assertEqual(provider.api_base_url, 'https://custom.example.com') + + +# --------------------------------------------------------------------------- +# DDNSManager.update_ip() tests +# --------------------------------------------------------------------------- + +class TestUpdateIp(unittest.TestCase): + """DDNSManager.update_ip() calls provider.update() only when IP changed.""" + + def _make_manager_with_mock_provider(self, token='tok', last_ip=None): + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': token}) + mgr = DDNSManager(config_manager=cm) + mgr._last_ip = last_ip + mock_provider = MagicMock() + mock_provider.update.return_value = True + mgr.get_provider = MagicMock(return_value=mock_provider) + return mgr, mock_provider + + def test_update_when_ip_changed(self): + mgr, mock_provider = self._make_manager_with_mock_provider(last_ip='1.1.1.1') + with patch('ddns_manager._get_public_ip', return_value='2.2.2.2'): + mgr.update_ip() + mock_provider.update.assert_called_once_with('tok', '2.2.2.2') + self.assertEqual(mgr._last_ip, '2.2.2.2') + + def test_skips_update_when_ip_unchanged(self): + mgr, mock_provider = self._make_manager_with_mock_provider(last_ip='3.3.3.3') + with patch('ddns_manager._get_public_ip', return_value='3.3.3.3'): + mgr.update_ip() + mock_provider.update.assert_not_called() + self.assertEqual(mgr._last_ip, '3.3.3.3') + + def test_skips_update_when_no_provider(self): + cm = _make_config_manager() + mgr = DDNSManager(config_manager=cm) + mgr._last_ip = None + # Should not raise, just silently skip + with patch('ddns_manager._get_public_ip', return_value='1.2.3.4'): + mgr.update_ip() + + def test_skips_update_when_ip_unreachable(self): + mgr, mock_provider = self._make_manager_with_mock_provider(last_ip=None) + with patch('ddns_manager._get_public_ip', return_value=None): + mgr.update_ip() + mock_provider.update.assert_not_called() + + def test_last_ip_not_updated_when_provider_returns_false(self): + mgr, mock_provider = self._make_manager_with_mock_provider(last_ip='1.1.1.1') + mock_provider.update.return_value = False + with patch('ddns_manager._get_public_ip', return_value='9.9.9.9'): + mgr.update_ip() + # IP should not be cached when provider says False + self.assertEqual(mgr._last_ip, '1.1.1.1') + + def test_ddns_error_is_caught_not_propagated(self): + mgr, mock_provider = self._make_manager_with_mock_provider(last_ip='1.1.1.1') + mock_provider.update.side_effect = DDNSError("server error") + with patch('ddns_manager._get_public_ip', return_value='5.5.5.5'): + # Should not raise + mgr.update_ip() + + +# --------------------------------------------------------------------------- +# DDNSManager.register() tests +# --------------------------------------------------------------------------- + +class TestRegister(unittest.TestCase): + def test_register_stores_token_in_config(self): + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo'}) + mgr = DDNSManager(config_manager=cm) + + mock_provider = MagicMock() + mock_provider.register.return_value = {'token': 'new_tok', 'subdomain': 'alpha'} + mgr.get_provider = MagicMock(return_value=mock_provider) + + result = mgr.register('alpha', '1.2.3.4') + self.assertEqual(result['token'], 'new_tok') + + # set_identity_field('domain', ...) should have been called + cm.set_identity_field.assert_called_once() + field_name, field_value = cm.set_identity_field.call_args[0] + self.assertEqual(field_name, 'domain') + self.assertEqual(field_value['ddns']['token'], 'new_tok') + + def test_register_raises_when_no_provider(self): + cm = _make_config_manager() + mgr = DDNSManager(config_manager=cm) + with self.assertRaises(DDNSError): + mgr.register('alpha', '1.2.3.4') + + +# --------------------------------------------------------------------------- +# DDNSManager.dns_challenge_create/delete delegation tests +# --------------------------------------------------------------------------- + +class TestDnsChallenges(unittest.TestCase): + def _make_manager(self): + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'}) + mgr = DDNSManager(config_manager=cm) + mock_provider = MagicMock() + mock_provider.dns_challenge_create.return_value = True + mock_provider.dns_challenge_delete.return_value = True + mgr.get_provider = MagicMock(return_value=mock_provider) + return mgr, mock_provider + + def test_dns_challenge_create_delegates(self): + mgr, mock_provider = self._make_manager() + mgr.dns_challenge_create('_acme.alpha.pic.ngo', 'val123') + mock_provider.dns_challenge_create.assert_called_once_with( + 'tok', '_acme.alpha.pic.ngo', 'val123' + ) + + def test_dns_challenge_delete_delegates(self): + mgr, mock_provider = self._make_manager() + mgr.dns_challenge_delete('_acme.alpha.pic.ngo') + mock_provider.dns_challenge_delete.assert_called_once_with( + 'tok', '_acme.alpha.pic.ngo' + ) + + def test_dns_challenge_create_raises_when_no_provider(self): + cm = _make_config_manager() + mgr = DDNSManager(config_manager=cm) + with self.assertRaises(DDNSError): + mgr.dns_challenge_create('fqdn', 'val') + + def test_dns_challenge_delete_raises_when_no_provider(self): + cm = _make_config_manager() + mgr = DDNSManager(config_manager=cm) + with self.assertRaises(DDNSError): + mgr.dns_challenge_delete('fqdn') + + +# --------------------------------------------------------------------------- +# Background heartbeat thread tests +# --------------------------------------------------------------------------- + +class TestHeartbeat(unittest.TestCase): + """Background heartbeat thread starts, runs, and can be stopped cleanly.""" + + def test_heartbeat_starts(self): + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'}) + mgr = DDNSManager(config_manager=cm) + mgr.update_ip = MagicMock() # avoid real network + + mgr.start_heartbeat() + try: + self.assertIsNotNone(mgr._heartbeat_thread) + self.assertTrue(mgr._heartbeat_thread.is_alive()) + finally: + mgr.stop_heartbeat() + + def test_heartbeat_can_be_stopped(self): + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'}) + mgr = DDNSManager(config_manager=cm) + mgr.update_ip = MagicMock() + + mgr.start_heartbeat() + mgr.stop_heartbeat() + # Thread should be dead after stop + if mgr._heartbeat_thread is not None: + self.assertFalse(mgr._heartbeat_thread.is_alive()) + + def test_start_heartbeat_is_idempotent(self): + """Calling start_heartbeat() twice should not create a second thread.""" + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'}) + mgr = DDNSManager(config_manager=cm) + mgr.update_ip = MagicMock() + + mgr.start_heartbeat() + thread1 = mgr._heartbeat_thread + mgr.start_heartbeat() + thread2 = mgr._heartbeat_thread + try: + self.assertIs(thread1, thread2) + finally: + mgr.stop_heartbeat() + + def test_heartbeat_calls_update_ip(self): + """Heartbeat loop must invoke update_ip() at least once.""" + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'}) + mgr = DDNSManager(config_manager=cm) + + called_event = threading.Event() + + def _fake_update_ip(): + called_event.set() + + mgr.update_ip = _fake_update_ip + + mgr.start_heartbeat() + called = called_event.wait(timeout=3) + mgr.stop_heartbeat() + self.assertTrue(called, "update_ip() was not called within 3 seconds") + + def test_heartbeat_survives_exception_in_update_ip(self): + """An exception in update_ip() must not crash the heartbeat thread.""" + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'}) + mgr = DDNSManager(config_manager=cm) + + call_count = [0] + survived_event = threading.Event() + + def _flaky_update_ip(): + call_count[0] += 1 + if call_count[0] == 1: + raise RuntimeError("transient failure") + survived_event.set() + + mgr.update_ip = _flaky_update_ip + + # Patch the interval to be 0 so the loop spins immediately + import ddns_manager as _dm + original_interval = _dm._HEARTBEAT_INTERVAL + _dm._HEARTBEAT_INTERVAL = 0 + try: + mgr.start_heartbeat() + survived = survived_event.wait(timeout=5) + mgr.stop_heartbeat() + self.assertTrue(survived, "Thread did not survive exception in update_ip()") + finally: + _dm._HEARTBEAT_INTERVAL = original_interval + + +# --------------------------------------------------------------------------- +# get_status() and test_connectivity() smoke tests +# --------------------------------------------------------------------------- + +class TestGetStatus(unittest.TestCase): + def test_get_status_returns_dict(self): + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo'}) + mgr = DDNSManager(config_manager=cm) + status = mgr.get_status() + self.assertIn('service', status) + self.assertEqual(status['service'], 'ddns') + self.assertIn('provider', status) + self.assertEqual(status['provider'], 'pic_ngo') + + def test_get_status_no_config(self): + cm = _make_config_manager() + mgr = DDNSManager(config_manager=cm) + status = mgr.get_status() + self.assertIsNone(status['provider']) + + def test_test_connectivity_no_provider(self): + cm = _make_config_manager() + mgr = DDNSManager(config_manager=cm) + result = mgr.test_connectivity() + self.assertFalse(result['success']) + + def test_test_connectivity_with_provider(self): + cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo'}) + mgr = DDNSManager(config_manager=cm) + with patch('ddns_manager._get_public_ip', return_value='1.2.3.4'): + result = mgr.test_connectivity() + self.assertTrue(result['success']) + self.assertEqual(result['public_ip'], '1.2.3.4') + + +# --------------------------------------------------------------------------- +# _get_public_ip helper tests +# --------------------------------------------------------------------------- + +class TestGetPublicIp(unittest.TestCase): + def test_returns_ip_on_success(self): + mock_resp = MagicMock() + mock_resp.ok = True + mock_resp.text = ' 1.2.3.4 ' + with patch('requests.get', return_value=mock_resp): + result = _get_public_ip() + self.assertEqual(result, '1.2.3.4') + + def test_returns_none_on_failure(self): + with patch('requests.get', side_effect=Exception('network error')): + result = _get_public_ip() + self.assertIsNone(result) + + def test_returns_none_on_non_ok_response(self): + mock_resp = MagicMock() + mock_resp.ok = False + with patch('requests.get', return_value=mock_resp): + result = _get_public_ip() + self.assertIsNone(result) + + +if __name__ == '__main__': + unittest.main()