Phase 3: ddns_manager — DDNS client, provider adapters, IP heartbeat

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-09 09:42:00 -04:00
parent 7d290c12c4
commit f77d7fabcd
5 changed files with 1010 additions and 0 deletions
+4
View File
@@ -42,6 +42,7 @@ from managers import (
routing_manager, vault_manager, container_manager,
cell_link_manager, auth_manager, setup_manager,
caddy_manager,
ddns_manager,
firewall_manager, EventType,
)
# Re-exports: tests do `from app import CellManager` and `from app import _resolve_peer_dns`
@@ -578,6 +579,9 @@ def health_monitor_loop():
health_monitor_thread = threading.Thread(target=health_monitor_loop, daemon=True)
health_monitor_thread.start()
# Start DDNS heartbeat thread (updates public IP every 5 minutes when a provider is configured)
ddns_manager.start_heartbeat()
def _local_subnets():
"""Return all subnets the container is directly connected to (from routing table)."""
import ipaddress as _ipa, socket as _sock, struct as _struct
+486
View File
@@ -0,0 +1,486 @@
#!/usr/bin/env python3
"""
DDNS Manager for Personal Internet Cell.
Provides a provider-agnostic adapter for Dynamic DNS services used to keep the
cell's public IP registered under its chosen domain.
Supported providers:
pic_ngo — pic.ngo DDNS service (primary / Phase 3 wiring)
cloudflare — Cloudflare API v4 (stub; full impl in Phase 3b)
duckdns — DuckDNS (stub; no DNS-01 support)
noip — No-IP (stub)
freedns — FreeDNS (stub)
The manager runs a background heartbeat thread that re-publishes the public IP
every 5 minutes, skipping the call when the IP has not changed.
"""
import logging
import threading
import time
from typing import Any, Dict, Optional
import requests
from base_service_manager import BaseServiceManager
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Custom exception
# ---------------------------------------------------------------------------
class DDNSError(Exception):
"""Raised when a DDNS provider returns an error response."""
# ---------------------------------------------------------------------------
# Provider base class
# ---------------------------------------------------------------------------
class DDNSProvider:
"""Base class — all providers implement these methods."""
def register(self, name: str, ip: str) -> dict:
"""Register subdomain. Returns {'token': str, 'subdomain': str}."""
raise NotImplementedError
def update(self, token: str, ip: str) -> bool:
"""Update A record. Returns True on success."""
raise NotImplementedError
def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool:
raise NotImplementedError
def dns_challenge_delete(self, token: str, fqdn: str) -> bool:
raise NotImplementedError
# ---------------------------------------------------------------------------
# pic.ngo provider
# ---------------------------------------------------------------------------
class PicNgoDDNS(DDNSProvider):
"""DDNS provider backed by the roof/pic-ddns API at ddns.pic.ngo."""
DEFAULT_API_BASE = 'https://ddns.pic.ngo'
TIMEOUT = 10
def __init__(self, api_base_url: Optional[str] = None):
self.api_base_url = (api_base_url or self.DEFAULT_API_BASE).rstrip('/')
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _headers(self, token: Optional[str] = None) -> Dict[str, str]:
h: Dict[str, str] = {'Content-Type': 'application/json'}
if token:
h['Authorization'] = f'Bearer {token}'
return h
def _raise_for_status(self, response: requests.Response, action: str):
if not response.ok:
raise DDNSError(
f"PicNgoDDNS {action} failed: HTTP {response.status_code}{response.text}"
)
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
def register(self, name: str, ip: str) -> dict:
"""POST /api/v1/register — register subdomain, returns token + subdomain."""
url = f'{self.api_base_url}/api/v1/register'
payload = {'name': name, 'ip': ip}
resp = requests.post(url, json=payload, headers=self._headers(), timeout=self.TIMEOUT)
self._raise_for_status(resp, 'register')
return resp.json()
def update(self, token: str, ip: str) -> bool:
"""PUT /api/v1/update — update A record."""
url = f'{self.api_base_url}/api/v1/update'
payload = {'ip': ip}
resp = requests.put(url, json=payload,
headers=self._headers(token), timeout=self.TIMEOUT)
self._raise_for_status(resp, 'update')
return True
def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool:
"""POST /api/v1/dns-challenge — create DNS-01 TXT record."""
url = f'{self.api_base_url}/api/v1/dns-challenge'
payload = {'fqdn': fqdn, 'value': value}
resp = requests.post(url, json=payload,
headers=self._headers(token), timeout=self.TIMEOUT)
self._raise_for_status(resp, 'dns_challenge_create')
return True
def dns_challenge_delete(self, token: str, fqdn: str) -> bool:
"""DELETE /api/v1/dns-challenge — remove DNS-01 TXT record."""
url = f'{self.api_base_url}/api/v1/dns-challenge'
payload = {'fqdn': fqdn}
resp = requests.delete(url, json=payload,
headers=self._headers(token), timeout=self.TIMEOUT)
self._raise_for_status(resp, 'dns_challenge_delete')
return True
# ---------------------------------------------------------------------------
# Cloudflare provider (stub)
# ---------------------------------------------------------------------------
class CloudflareDDNS(DDNSProvider):
"""DDNS via Cloudflare API v4. Stub — full impl in Phase 3b."""
API_BASE = 'https://api.cloudflare.com/client/v4'
TIMEOUT = 10
def __init__(self, api_token: str, zone_id: str):
self.api_token = api_token
self.zone_id = zone_id
def _headers(self) -> Dict[str, str]:
return {
'Authorization': f'Bearer {self.api_token}',
'Content-Type': 'application/json',
}
def register(self, name: str, ip: str) -> dict:
# Cloudflare doesn't have a registration step — return stub data.
return {'token': self.api_token, 'subdomain': name}
def update(self, token: str, ip: str) -> bool:
"""PATCH /zones/{zone_id}/dns_records — update A record."""
url = f'{self.API_BASE}/zones/{self.zone_id}/dns_records'
resp = requests.patch(url, json={'ip': ip}, headers=self._headers(),
timeout=self.TIMEOUT)
return resp.ok
def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool:
"""POST TXT record for DNS-01 challenge."""
url = f'{self.API_BASE}/zones/{self.zone_id}/dns_records'
payload = {'type': 'TXT', 'name': fqdn, 'content': value, 'ttl': 120}
resp = requests.post(url, json=payload, headers=self._headers(),
timeout=self.TIMEOUT)
return resp.ok
def dns_challenge_delete(self, token: str, fqdn: str) -> bool:
"""DELETE TXT record for DNS-01 challenge."""
# A real impl would look up the record ID first; stub returns True.
return True
# ---------------------------------------------------------------------------
# DuckDNS provider (stub)
# ---------------------------------------------------------------------------
class DuckDNSDDNS(DDNSProvider):
"""DDNS via DuckDNS. Stub — DNS-01 challenge not supported."""
UPDATE_URL = 'https://www.duckdns.org/update'
TIMEOUT = 10
def __init__(self, token: str, domain: str):
self._token = token
self._domain = domain
def register(self, name: str, ip: str) -> dict:
return {'token': self._token, 'subdomain': name}
def update(self, token: str, ip: str) -> bool:
params = {'domains': self._domain, 'token': token, 'ip': ip}
resp = requests.get(self.UPDATE_URL, params=params, timeout=self.TIMEOUT)
return resp.ok and resp.text.strip() == 'OK'
def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool:
raise NotImplementedError("DuckDNS does not support programmatic TXT record creation")
def dns_challenge_delete(self, token: str, fqdn: str) -> bool:
raise NotImplementedError("DuckDNS does not support programmatic TXT record deletion")
# ---------------------------------------------------------------------------
# No-IP provider (stub)
# ---------------------------------------------------------------------------
class NoIPDDNS(DDNSProvider):
"""DDNS via No-IP. Stub — DNS-01 not supported."""
def register(self, name: str, ip: str) -> dict:
raise NotImplementedError
def update(self, token: str, ip: str) -> bool:
raise NotImplementedError
def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool:
raise NotImplementedError
def dns_challenge_delete(self, token: str, fqdn: str) -> bool:
raise NotImplementedError
# ---------------------------------------------------------------------------
# FreeDNS provider (stub)
# ---------------------------------------------------------------------------
class FreeDNSDDNS(DDNSProvider):
"""DDNS via FreeDNS. Stub — DNS-01 not supported."""
def register(self, name: str, ip: str) -> dict:
raise NotImplementedError
def update(self, token: str, ip: str) -> bool:
raise NotImplementedError
def dns_challenge_create(self, token: str, fqdn: str, value: str) -> bool:
raise NotImplementedError
def dns_challenge_delete(self, token: str, fqdn: str) -> bool:
raise NotImplementedError
# ---------------------------------------------------------------------------
# Public IP helper
# ---------------------------------------------------------------------------
def _get_public_ip() -> Optional[str]:
"""Return the current public IPv4 address using ipify, or None on failure."""
try:
resp = requests.get('https://api.ipify.org', timeout=10)
if resp.ok:
return resp.text.strip()
except Exception as exc:
logger.warning("Could not determine public IP: %s", exc)
return None
# ---------------------------------------------------------------------------
# Manager
# ---------------------------------------------------------------------------
_HEARTBEAT_INTERVAL = 300 # 5 minutes
class DDNSManager(BaseServiceManager):
"""Manages DDNS registration and periodic IP updates."""
def __init__(self, config_manager=None,
data_dir: str = '/app/data',
config_dir: str = '/app/config'):
super().__init__('ddns', data_dir, config_dir)
self.config_manager = config_manager
self._last_ip: Optional[str] = None
self._stop_event = threading.Event()
self._heartbeat_thread: Optional[threading.Thread] = None
# ------------------------------------------------------------------
# BaseServiceManager abstract method implementations
# ------------------------------------------------------------------
def get_status(self) -> Dict[str, Any]:
identity = self._identity()
domain_cfg = identity.get('domain', {})
return {
'service': 'ddns',
'provider': domain_cfg.get('ddns', {}).get('provider') if domain_cfg else None,
'last_ip': self._last_ip,
'heartbeat_running': (
self._heartbeat_thread is not None and
self._heartbeat_thread.is_alive()
),
}
def test_connectivity(self) -> Dict[str, Any]:
provider = self.get_provider()
if provider is None:
return {'success': False, 'reason': 'No DDNS provider configured'}
ip = _get_public_ip()
if ip is None:
return {'success': False, 'reason': 'Could not reach ipify'}
return {'success': True, 'public_ip': ip}
# ------------------------------------------------------------------
# Identity helpers
# ------------------------------------------------------------------
def _identity(self) -> Dict[str, Any]:
if self.config_manager is None:
return {}
return self.config_manager.get_identity() or {}
# ------------------------------------------------------------------
# Provider factory
# ------------------------------------------------------------------
def get_provider(self) -> Optional[DDNSProvider]:
"""Instantiate and return the configured DDNS provider, or None."""
identity = self._identity()
domain_cfg = identity.get('domain', {})
if not domain_cfg:
return None
ddns_cfg = domain_cfg.get('ddns', {})
if not ddns_cfg:
return None
provider_name = ddns_cfg.get('provider')
if not provider_name:
return None
if provider_name == 'pic_ngo':
api_base = ddns_cfg.get('api_base_url')
return PicNgoDDNS(api_base_url=api_base)
if provider_name == 'cloudflare':
return CloudflareDDNS(
api_token=ddns_cfg.get('api_token', ''),
zone_id=ddns_cfg.get('zone_id', ''),
)
if provider_name == 'duckdns':
return DuckDNSDDNS(
token=ddns_cfg.get('token', ''),
domain=ddns_cfg.get('domain', ''),
)
if provider_name == 'noip':
return NoIPDDNS()
if provider_name == 'freedns':
return FreeDNSDDNS()
logger.warning("Unknown DDNS provider: %s", provider_name)
return None
# ------------------------------------------------------------------
# Registration
# ------------------------------------------------------------------
def register(self, name: str, ip: str) -> dict:
"""Register the cell's subdomain with the configured provider.
Stores the returned token in the identity config under
identity['domain']['ddns']['token'] and records the subdomain.
Returns the dict from provider.register().
"""
provider = self.get_provider()
if provider is None:
raise DDNSError("No DDNS provider configured")
result = provider.register(name, ip)
# Persist token + subdomain back into identity
identity = self._identity()
domain_cfg = dict(identity.get('domain', {}))
ddns_cfg = dict(domain_cfg.get('ddns', {}))
if 'token' in result:
ddns_cfg['token'] = result['token']
if 'subdomain' in result:
ddns_cfg['subdomain'] = result['subdomain']
domain_cfg['ddns'] = ddns_cfg
if self.config_manager is not None:
self.config_manager.set_identity_field('domain', domain_cfg)
self._last_ip = ip
return result
# ------------------------------------------------------------------
# IP update
# ------------------------------------------------------------------
def update_ip(self):
"""Fetch current public IP and update DDNS only if it has changed."""
provider = self.get_provider()
if provider is None:
logger.debug("DDNS update_ip: no provider configured, skipping")
return
current_ip = _get_public_ip()
if current_ip is None:
logger.warning("DDNS update_ip: could not determine public IP")
return
if current_ip == self._last_ip:
logger.debug("DDNS update_ip: IP unchanged (%s), skipping", current_ip)
return
identity = self._identity()
domain_cfg = identity.get('domain', {})
ddns_cfg = domain_cfg.get('ddns', {}) if domain_cfg else {}
token = ddns_cfg.get('token', '')
try:
success = provider.update(token, current_ip)
if success:
logger.info("DDNS update_ip: updated to %s", current_ip)
self._last_ip = current_ip
else:
logger.warning("DDNS update_ip: provider.update() returned False")
except DDNSError as exc:
logger.error("DDNS update_ip: provider error: %s", exc)
# ------------------------------------------------------------------
# Heartbeat
# ------------------------------------------------------------------
def start_heartbeat(self):
"""Start a daemon thread that calls update_ip() every 5 minutes."""
if self._heartbeat_thread is not None and self._heartbeat_thread.is_alive():
logger.debug("DDNS heartbeat already running")
return
self._stop_event.clear()
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_loop,
name='ddns-heartbeat',
daemon=True,
)
self._heartbeat_thread.start()
logger.info("DDNS heartbeat thread started (interval=%ds)", _HEARTBEAT_INTERVAL)
def stop_heartbeat(self):
"""Signal the heartbeat thread to stop and wait for it to exit."""
self._stop_event.set()
if self._heartbeat_thread is not None:
self._heartbeat_thread.join(timeout=10)
self._heartbeat_thread = None
def _heartbeat_loop(self):
"""Internal: run update_ip() periodically until _stop_event is set."""
while not self._stop_event.is_set():
try:
self.update_ip()
except Exception as exc:
logger.warning("DDNS heartbeat: unexpected error: %s", exc)
# Sleep in short slices so stop_heartbeat() is responsive
for _ in range(_HEARTBEAT_INTERVAL):
if self._stop_event.is_set():
break
time.sleep(1)
# ------------------------------------------------------------------
# DNS challenge delegation
# ------------------------------------------------------------------
def dns_challenge_create(self, fqdn: str, value: str) -> bool:
"""Create a DNS-01 TXT record via the configured provider."""
provider = self.get_provider()
if provider is None:
raise DDNSError("No DDNS provider configured")
identity = self._identity()
domain_cfg = identity.get('domain', {})
ddns_cfg = domain_cfg.get('ddns', {}) if domain_cfg else {}
token = ddns_cfg.get('token', '')
return provider.dns_challenge_create(token, fqdn, value)
def dns_challenge_delete(self, fqdn: str) -> bool:
"""Delete a DNS-01 TXT record via the configured provider."""
provider = self.get_provider()
if provider is None:
raise DDNSError("No DDNS provider configured")
identity = self._identity()
domain_cfg = identity.get('domain', {})
ddns_cfg = domain_cfg.get('ddns', {}) if domain_cfg else {}
token = ddns_cfg.get('token', '')
return provider.dns_challenge_delete(token, fqdn)
+3
View File
@@ -29,6 +29,7 @@ import firewall_manager
from auth_manager import AuthManager
from setup_manager import SetupManager
from caddy_manager import CaddyManager
from ddns_manager import DDNSManager
DATA_DIR = os.environ.get('DATA_DIR', '/app/data')
CONFIG_DIR = os.environ.get('CONFIG_DIR', '/app/config')
@@ -57,6 +58,7 @@ cell_link_manager = CellLinkManager(
auth_manager = AuthManager(data_dir=DATA_DIR, config_dir=CONFIG_DIR)
setup_manager = SetupManager(config_manager=config_manager, auth_manager=auth_manager)
caddy_manager = CaddyManager(config_manager=config_manager, data_dir=DATA_DIR, config_dir=CONFIG_DIR)
ddns_manager = DDNSManager(config_manager=config_manager, data_dir=DATA_DIR, config_dir=CONFIG_DIR)
# Service logger configuration
_service_log_configs = {
@@ -91,6 +93,7 @@ __all__ = [
'email_manager', 'calendar_manager', 'file_manager',
'routing_manager', 'vault_manager', 'container_manager',
'cell_link_manager', 'auth_manager', 'setup_manager', 'caddy_manager',
'ddns_manager',
'firewall_manager', 'EventType',
'DATA_DIR', 'CONFIG_DIR',
]
+7
View File
@@ -55,4 +55,11 @@ def complete_setup():
payload = request.get_json(silent=True) or {}
result = sm.complete_setup(payload)
status_code = 200 if result.get('success') else 400
# TODO (Phase 3): if result.get('success') and domain_mode == 'pic_ngo':
# from app import ddns_manager
# name = payload.get('cell_name', '')
# ip = payload.get('public_ip', '')
# ddns_manager.register(name, ip)
return jsonify(result), status_code
+510
View File
@@ -0,0 +1,510 @@
"""Tests for DDNSManager and DDNS provider classes."""
import os
import sys
import threading
import time
import unittest
from unittest.mock import MagicMock, patch, call
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
from ddns_manager import (
DDNSManager,
DDNSProvider,
DDNSError,
PicNgoDDNS,
CloudflareDDNS,
DuckDNSDDNS,
NoIPDDNS,
FreeDNSDDNS,
_get_public_ip,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_response(status_code=200, json_data=None, text=''):
"""Build a minimal requests.Response-like mock."""
resp = MagicMock()
resp.ok = (200 <= status_code < 300)
resp.status_code = status_code
resp.json.return_value = json_data or {}
resp.text = text
return resp
def _make_config_manager(ddns_cfg=None, domain_cfg=None):
"""Return a mock config_manager whose get_identity() returns a useful dict."""
cm = MagicMock()
if ddns_cfg is not None:
identity = {'domain': {'ddns': ddns_cfg}}
elif domain_cfg is not None:
identity = {'domain': domain_cfg}
else:
identity = {}
cm.get_identity.return_value = identity
return cm
# ---------------------------------------------------------------------------
# PicNgoDDNS tests
# ---------------------------------------------------------------------------
class TestPicNgoDDNSRegister(unittest.TestCase):
"""PicNgoDDNS.register() calls the correct URL with the correct body."""
def test_register_calls_correct_url(self):
provider = PicNgoDDNS(api_base_url='https://ddns.example.com')
mock_resp = _make_response(200, json_data={'token': 'tok123', 'subdomain': 'alpha'})
with patch('requests.post', return_value=mock_resp) as mock_post:
result = provider.register('alpha', '1.2.3.4')
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertEqual(args[0], 'https://ddns.example.com/api/v1/register')
self.assertEqual(kwargs['json'], {'name': 'alpha', 'ip': '1.2.3.4'})
self.assertEqual(result, {'token': 'tok123', 'subdomain': 'alpha'})
def test_register_raises_ddns_error_on_http_error(self):
provider = PicNgoDDNS()
mock_resp = _make_response(500, text='Internal Server Error')
with patch('requests.post', return_value=mock_resp):
with self.assertRaises(DDNSError):
provider.register('alpha', '1.2.3.4')
def test_register_no_token_in_header(self):
"""register() must NOT send an Authorization header (no token yet)."""
provider = PicNgoDDNS()
mock_resp = _make_response(200, json_data={'token': 't', 'subdomain': 'x'})
with patch('requests.post', return_value=mock_resp) as mock_post:
provider.register('x', '1.2.3.4')
_, kwargs = mock_post.call_args
self.assertNotIn('Authorization', kwargs.get('headers', {}))
class TestPicNgoDDNSUpdate(unittest.TestCase):
"""PicNgoDDNS.update() calls the correct URL with Authorization header."""
def test_update_uses_bearer_token(self):
provider = PicNgoDDNS(api_base_url='https://ddns.example.com')
mock_resp = _make_response(200)
with patch('requests.put', return_value=mock_resp) as mock_put:
result = provider.update('mytoken', '5.6.7.8')
mock_put.assert_called_once()
args, kwargs = mock_put.call_args
self.assertEqual(args[0], 'https://ddns.example.com/api/v1/update')
self.assertIn('Authorization', kwargs['headers'])
self.assertEqual(kwargs['headers']['Authorization'], 'Bearer mytoken')
self.assertEqual(kwargs['json'], {'ip': '5.6.7.8'})
self.assertTrue(result)
def test_update_raises_ddns_error_on_failure(self):
provider = PicNgoDDNS()
mock_resp = _make_response(403, text='Forbidden')
with patch('requests.put', return_value=mock_resp):
with self.assertRaises(DDNSError):
provider.update('badtoken', '1.2.3.4')
class TestPicNgoDDNSChallenges(unittest.TestCase):
"""PicNgoDDNS.dns_challenge_create/delete call correct endpoints."""
def test_dns_challenge_create_calls_post(self):
provider = PicNgoDDNS(api_base_url='https://ddns.example.com')
mock_resp = _make_response(200)
with patch('requests.post', return_value=mock_resp) as mock_post:
result = provider.dns_challenge_create('tok', '_acme.alpha.pic.ngo', 'abc123')
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertEqual(args[0], 'https://ddns.example.com/api/v1/dns-challenge')
self.assertEqual(kwargs['json'], {'fqdn': '_acme.alpha.pic.ngo', 'value': 'abc123'})
self.assertEqual(kwargs['headers']['Authorization'], 'Bearer tok')
self.assertTrue(result)
def test_dns_challenge_delete_calls_delete(self):
provider = PicNgoDDNS(api_base_url='https://ddns.example.com')
mock_resp = _make_response(200)
with patch('requests.delete', return_value=mock_resp) as mock_del:
result = provider.dns_challenge_delete('tok', '_acme.alpha.pic.ngo')
mock_del.assert_called_once()
args, kwargs = mock_del.call_args
self.assertEqual(args[0], 'https://ddns.example.com/api/v1/dns-challenge')
self.assertEqual(kwargs['json'], {'fqdn': '_acme.alpha.pic.ngo'})
self.assertEqual(kwargs['headers']['Authorization'], 'Bearer tok')
self.assertTrue(result)
def test_dns_challenge_create_raises_on_error(self):
provider = PicNgoDDNS()
mock_resp = _make_response(500, text='error')
with patch('requests.post', return_value=mock_resp):
with self.assertRaises(DDNSError):
provider.dns_challenge_create('tok', 'fqdn', 'val')
def test_dns_challenge_delete_raises_on_error(self):
provider = PicNgoDDNS()
mock_resp = _make_response(404, text='not found')
with patch('requests.delete', return_value=mock_resp):
with self.assertRaises(DDNSError):
provider.dns_challenge_delete('tok', 'fqdn')
# ---------------------------------------------------------------------------
# DDNSManager.get_provider() tests
# ---------------------------------------------------------------------------
class TestGetProvider(unittest.TestCase):
"""DDNSManager.get_provider() returns the correct provider class."""
def test_returns_pic_ngo_provider(self):
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo'})
mgr = DDNSManager(config_manager=cm)
provider = mgr.get_provider()
self.assertIsInstance(provider, PicNgoDDNS)
def test_returns_none_when_no_ddns_config(self):
cm = _make_config_manager() # empty identity
mgr = DDNSManager(config_manager=cm)
provider = mgr.get_provider()
self.assertIsNone(provider)
def test_returns_none_when_no_provider_key(self):
cm = _make_config_manager(ddns_cfg={})
mgr = DDNSManager(config_manager=cm)
provider = mgr.get_provider()
self.assertIsNone(provider)
def test_returns_cloudflare_provider(self):
cm = _make_config_manager(ddns_cfg={
'provider': 'cloudflare',
'api_token': 'cf_tok',
'zone_id': 'zid',
})
mgr = DDNSManager(config_manager=cm)
provider = mgr.get_provider()
self.assertIsInstance(provider, CloudflareDDNS)
def test_returns_duckdns_provider(self):
cm = _make_config_manager(ddns_cfg={
'provider': 'duckdns',
'token': 'duck_tok',
'domain': 'mypic',
})
mgr = DDNSManager(config_manager=cm)
provider = mgr.get_provider()
self.assertIsInstance(provider, DuckDNSDDNS)
def test_returns_noip_provider(self):
cm = _make_config_manager(ddns_cfg={'provider': 'noip'})
mgr = DDNSManager(config_manager=cm)
provider = mgr.get_provider()
self.assertIsInstance(provider, NoIPDDNS)
def test_returns_freedns_provider(self):
cm = _make_config_manager(ddns_cfg={'provider': 'freedns'})
mgr = DDNSManager(config_manager=cm)
provider = mgr.get_provider()
self.assertIsInstance(provider, FreeDNSDDNS)
def test_returns_none_for_unknown_provider(self):
cm = _make_config_manager(ddns_cfg={'provider': 'nonexistent'})
mgr = DDNSManager(config_manager=cm)
provider = mgr.get_provider()
self.assertIsNone(provider)
def test_uses_custom_api_base_url(self):
cm = _make_config_manager(ddns_cfg={
'provider': 'pic_ngo',
'api_base_url': 'https://custom.example.com',
})
mgr = DDNSManager(config_manager=cm)
provider = mgr.get_provider()
self.assertIsInstance(provider, PicNgoDDNS)
self.assertEqual(provider.api_base_url, 'https://custom.example.com')
# ---------------------------------------------------------------------------
# DDNSManager.update_ip() tests
# ---------------------------------------------------------------------------
class TestUpdateIp(unittest.TestCase):
"""DDNSManager.update_ip() calls provider.update() only when IP changed."""
def _make_manager_with_mock_provider(self, token='tok', last_ip=None):
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': token})
mgr = DDNSManager(config_manager=cm)
mgr._last_ip = last_ip
mock_provider = MagicMock()
mock_provider.update.return_value = True
mgr.get_provider = MagicMock(return_value=mock_provider)
return mgr, mock_provider
def test_update_when_ip_changed(self):
mgr, mock_provider = self._make_manager_with_mock_provider(last_ip='1.1.1.1')
with patch('ddns_manager._get_public_ip', return_value='2.2.2.2'):
mgr.update_ip()
mock_provider.update.assert_called_once_with('tok', '2.2.2.2')
self.assertEqual(mgr._last_ip, '2.2.2.2')
def test_skips_update_when_ip_unchanged(self):
mgr, mock_provider = self._make_manager_with_mock_provider(last_ip='3.3.3.3')
with patch('ddns_manager._get_public_ip', return_value='3.3.3.3'):
mgr.update_ip()
mock_provider.update.assert_not_called()
self.assertEqual(mgr._last_ip, '3.3.3.3')
def test_skips_update_when_no_provider(self):
cm = _make_config_manager()
mgr = DDNSManager(config_manager=cm)
mgr._last_ip = None
# Should not raise, just silently skip
with patch('ddns_manager._get_public_ip', return_value='1.2.3.4'):
mgr.update_ip()
def test_skips_update_when_ip_unreachable(self):
mgr, mock_provider = self._make_manager_with_mock_provider(last_ip=None)
with patch('ddns_manager._get_public_ip', return_value=None):
mgr.update_ip()
mock_provider.update.assert_not_called()
def test_last_ip_not_updated_when_provider_returns_false(self):
mgr, mock_provider = self._make_manager_with_mock_provider(last_ip='1.1.1.1')
mock_provider.update.return_value = False
with patch('ddns_manager._get_public_ip', return_value='9.9.9.9'):
mgr.update_ip()
# IP should not be cached when provider says False
self.assertEqual(mgr._last_ip, '1.1.1.1')
def test_ddns_error_is_caught_not_propagated(self):
mgr, mock_provider = self._make_manager_with_mock_provider(last_ip='1.1.1.1')
mock_provider.update.side_effect = DDNSError("server error")
with patch('ddns_manager._get_public_ip', return_value='5.5.5.5'):
# Should not raise
mgr.update_ip()
# ---------------------------------------------------------------------------
# DDNSManager.register() tests
# ---------------------------------------------------------------------------
class TestRegister(unittest.TestCase):
def test_register_stores_token_in_config(self):
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo'})
mgr = DDNSManager(config_manager=cm)
mock_provider = MagicMock()
mock_provider.register.return_value = {'token': 'new_tok', 'subdomain': 'alpha'}
mgr.get_provider = MagicMock(return_value=mock_provider)
result = mgr.register('alpha', '1.2.3.4')
self.assertEqual(result['token'], 'new_tok')
# set_identity_field('domain', ...) should have been called
cm.set_identity_field.assert_called_once()
field_name, field_value = cm.set_identity_field.call_args[0]
self.assertEqual(field_name, 'domain')
self.assertEqual(field_value['ddns']['token'], 'new_tok')
def test_register_raises_when_no_provider(self):
cm = _make_config_manager()
mgr = DDNSManager(config_manager=cm)
with self.assertRaises(DDNSError):
mgr.register('alpha', '1.2.3.4')
# ---------------------------------------------------------------------------
# DDNSManager.dns_challenge_create/delete delegation tests
# ---------------------------------------------------------------------------
class TestDnsChallenges(unittest.TestCase):
def _make_manager(self):
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'})
mgr = DDNSManager(config_manager=cm)
mock_provider = MagicMock()
mock_provider.dns_challenge_create.return_value = True
mock_provider.dns_challenge_delete.return_value = True
mgr.get_provider = MagicMock(return_value=mock_provider)
return mgr, mock_provider
def test_dns_challenge_create_delegates(self):
mgr, mock_provider = self._make_manager()
mgr.dns_challenge_create('_acme.alpha.pic.ngo', 'val123')
mock_provider.dns_challenge_create.assert_called_once_with(
'tok', '_acme.alpha.pic.ngo', 'val123'
)
def test_dns_challenge_delete_delegates(self):
mgr, mock_provider = self._make_manager()
mgr.dns_challenge_delete('_acme.alpha.pic.ngo')
mock_provider.dns_challenge_delete.assert_called_once_with(
'tok', '_acme.alpha.pic.ngo'
)
def test_dns_challenge_create_raises_when_no_provider(self):
cm = _make_config_manager()
mgr = DDNSManager(config_manager=cm)
with self.assertRaises(DDNSError):
mgr.dns_challenge_create('fqdn', 'val')
def test_dns_challenge_delete_raises_when_no_provider(self):
cm = _make_config_manager()
mgr = DDNSManager(config_manager=cm)
with self.assertRaises(DDNSError):
mgr.dns_challenge_delete('fqdn')
# ---------------------------------------------------------------------------
# Background heartbeat thread tests
# ---------------------------------------------------------------------------
class TestHeartbeat(unittest.TestCase):
"""Background heartbeat thread starts, runs, and can be stopped cleanly."""
def test_heartbeat_starts(self):
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'})
mgr = DDNSManager(config_manager=cm)
mgr.update_ip = MagicMock() # avoid real network
mgr.start_heartbeat()
try:
self.assertIsNotNone(mgr._heartbeat_thread)
self.assertTrue(mgr._heartbeat_thread.is_alive())
finally:
mgr.stop_heartbeat()
def test_heartbeat_can_be_stopped(self):
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'})
mgr = DDNSManager(config_manager=cm)
mgr.update_ip = MagicMock()
mgr.start_heartbeat()
mgr.stop_heartbeat()
# Thread should be dead after stop
if mgr._heartbeat_thread is not None:
self.assertFalse(mgr._heartbeat_thread.is_alive())
def test_start_heartbeat_is_idempotent(self):
"""Calling start_heartbeat() twice should not create a second thread."""
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'})
mgr = DDNSManager(config_manager=cm)
mgr.update_ip = MagicMock()
mgr.start_heartbeat()
thread1 = mgr._heartbeat_thread
mgr.start_heartbeat()
thread2 = mgr._heartbeat_thread
try:
self.assertIs(thread1, thread2)
finally:
mgr.stop_heartbeat()
def test_heartbeat_calls_update_ip(self):
"""Heartbeat loop must invoke update_ip() at least once."""
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'})
mgr = DDNSManager(config_manager=cm)
called_event = threading.Event()
def _fake_update_ip():
called_event.set()
mgr.update_ip = _fake_update_ip
mgr.start_heartbeat()
called = called_event.wait(timeout=3)
mgr.stop_heartbeat()
self.assertTrue(called, "update_ip() was not called within 3 seconds")
def test_heartbeat_survives_exception_in_update_ip(self):
"""An exception in update_ip() must not crash the heartbeat thread."""
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo', 'token': 'tok'})
mgr = DDNSManager(config_manager=cm)
call_count = [0]
survived_event = threading.Event()
def _flaky_update_ip():
call_count[0] += 1
if call_count[0] == 1:
raise RuntimeError("transient failure")
survived_event.set()
mgr.update_ip = _flaky_update_ip
# Patch the interval to be 0 so the loop spins immediately
import ddns_manager as _dm
original_interval = _dm._HEARTBEAT_INTERVAL
_dm._HEARTBEAT_INTERVAL = 0
try:
mgr.start_heartbeat()
survived = survived_event.wait(timeout=5)
mgr.stop_heartbeat()
self.assertTrue(survived, "Thread did not survive exception in update_ip()")
finally:
_dm._HEARTBEAT_INTERVAL = original_interval
# ---------------------------------------------------------------------------
# get_status() and test_connectivity() smoke tests
# ---------------------------------------------------------------------------
class TestGetStatus(unittest.TestCase):
def test_get_status_returns_dict(self):
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo'})
mgr = DDNSManager(config_manager=cm)
status = mgr.get_status()
self.assertIn('service', status)
self.assertEqual(status['service'], 'ddns')
self.assertIn('provider', status)
self.assertEqual(status['provider'], 'pic_ngo')
def test_get_status_no_config(self):
cm = _make_config_manager()
mgr = DDNSManager(config_manager=cm)
status = mgr.get_status()
self.assertIsNone(status['provider'])
def test_test_connectivity_no_provider(self):
cm = _make_config_manager()
mgr = DDNSManager(config_manager=cm)
result = mgr.test_connectivity()
self.assertFalse(result['success'])
def test_test_connectivity_with_provider(self):
cm = _make_config_manager(ddns_cfg={'provider': 'pic_ngo'})
mgr = DDNSManager(config_manager=cm)
with patch('ddns_manager._get_public_ip', return_value='1.2.3.4'):
result = mgr.test_connectivity()
self.assertTrue(result['success'])
self.assertEqual(result['public_ip'], '1.2.3.4')
# ---------------------------------------------------------------------------
# _get_public_ip helper tests
# ---------------------------------------------------------------------------
class TestGetPublicIp(unittest.TestCase):
def test_returns_ip_on_success(self):
mock_resp = MagicMock()
mock_resp.ok = True
mock_resp.text = ' 1.2.3.4 '
with patch('requests.get', return_value=mock_resp):
result = _get_public_ip()
self.assertEqual(result, '1.2.3.4')
def test_returns_none_on_failure(self):
with patch('requests.get', side_effect=Exception('network error')):
result = _get_public_ip()
self.assertIsNone(result)
def test_returns_none_on_non_ok_response(self):
mock_resp = MagicMock()
mock_resp.ok = False
with patch('requests.get', return_value=mock_resp):
result = _get_public_ip()
self.assertIsNone(result)
if __name__ == '__main__':
unittest.main()