fix: DDNS update token in body, webdav gating, regression tests
Unit Tests / test (push) Successful in 7m25s

- PicNgoDDNS.update(): send token in request body instead of Authorization
  header; DDNS server validates it from body (was returning HTTP 422 on
  every heartbeat, leaving IP record stale after fresh install)
- peers.py / Peers.jsx: webdav service_access only valid when 'files' store
  service is installed; was always shown even with no services, confusing
  users into thinking WebDAV was pre-installed
- 10 new regression tests: DDNS update body contract, Caddy always
  regenerates on startup with no services, peer role allowed on
  /api/services/active, webdav gating by installed services

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-07 16:56:12 -04:00
parent 962d137093
commit 69862331e7
7 changed files with 241 additions and 12 deletions
+21 -5
View File
@@ -13,6 +13,7 @@ from ddns_manager import (
DDNSManager,
DDNSProvider,
DDNSError,
DDNSTokenExpired,
PicNgoDDNS,
CloudflareDDNS,
DuckDNSDDNS,
@@ -103,9 +104,9 @@ class TestPicNgoDDNSRegister(unittest.TestCase):
class TestPicNgoDDNSUpdate(unittest.TestCase):
"""PicNgoDDNS.update() calls the correct URL with Authorization header."""
"""PicNgoDDNS.update() sends token in the request body (DDNS server validates it there)."""
def test_update_uses_bearer_token(self):
def test_update_sends_token_in_body(self):
provider = PicNgoDDNS(api_base_url='https://ddns.example.com')
mock_resp = _make_response(200)
with patch('requests.put', return_value=mock_resp) as mock_put:
@@ -113,11 +114,19 @@ class TestPicNgoDDNSUpdate(unittest.TestCase):
mock_put.assert_called_once()
args, kwargs = mock_put.call_args
self.assertEqual(args[0], 'https://ddns.example.com/api/v1/update')
self.assertIn('Authorization', kwargs['headers'])
self.assertEqual(kwargs['headers']['Authorization'], 'Bearer mytoken')
self.assertEqual(kwargs['json'], {'ip': '5.6.7.8'})
# Token must be in the JSON body — server validates it there, not in Authorization
self.assertEqual(kwargs['json'], {'ip': '5.6.7.8', 'token': 'mytoken'})
self.assertTrue(result)
def test_update_does_not_use_bearer_header(self):
"""Token must NOT be sent as Authorization: Bearer — server ignores it and returns 422."""
provider = PicNgoDDNS(api_base_url='https://ddns.example.com')
mock_resp = _make_response(200)
with patch('requests.put', return_value=mock_resp) as mock_put:
provider.update('mytoken', '1.2.3.4')
_, kwargs = mock_put.call_args
self.assertNotIn('Authorization', kwargs.get('headers', {}))
def test_update_raises_ddns_error_on_failure(self):
provider = PicNgoDDNS()
mock_resp = _make_response(403, text='Forbidden')
@@ -125,6 +134,13 @@ class TestPicNgoDDNSUpdate(unittest.TestCase):
with self.assertRaises(DDNSError):
provider.update('badtoken', '1.2.3.4')
def test_update_raises_ddns_token_expired_on_401(self):
provider = PicNgoDDNS()
mock_resp = _make_response(401, text='Unauthorized')
with patch('requests.put', return_value=mock_resp):
with self.assertRaises(DDNSTokenExpired):
provider.update('expiredtoken', '1.2.3.4')
class TestPicNgoDDNSChallenges(unittest.TestCase):
"""PicNgoDDNS.dns_challenge_create/delete call correct endpoints."""
+76
View File
@@ -141,5 +141,81 @@ def test_empty_auth_manager_non_api_path_bypasses_503(
assert r.status_code == 200
# ── role-based access: peer vs admin ─────────────────────────────────────────
@pytest.fixture
def peer_client(tmp_path):
"""Test client with a peer-role session active."""
from app import app
from auth_manager import AuthManager
data_dir = str(tmp_path / 'data')
config_dir = str(tmp_path / 'config')
os.makedirs(data_dir, exist_ok=True)
os.makedirs(config_dir, exist_ok=True)
mgr = AuthManager(data_dir=data_dir, config_dir=config_dir)
mgr.create_user('admin', 'AdminPass123!', 'admin')
mgr.create_user('alice', 'AlicePass123!', 'peer')
app.config['TESTING'] = True
with patch('app.auth_manager', mgr):
try:
import auth_routes
with patch.object(auth_routes, 'auth_manager', mgr, create=True):
with app.test_client() as client:
r = client.post('/api/auth/login',
data=json.dumps({'username': 'alice', 'password': 'AlicePass123!'}),
content_type='application/json')
assert r.status_code == 200, f'peer login failed: {r.status_code}'
yield client, mgr
except ImportError:
with app.test_client() as client:
with client.session_transaction() as sess:
sess['username'] = 'alice'
sess['role'] = 'peer'
yield client, mgr
def test_peer_role_blocked_from_admin_only_endpoint(peer_client):
"""Peer sessions must not access admin-only endpoints like /api/peers."""
client, mgr = peer_client
with patch('app.auth_manager', mgr):
r = client.get('/api/peers')
assert r.status_code == 403
def test_peer_role_allowed_services_active(peer_client):
"""/api/services/active must be accessible to peer sessions.
Regression guard: peers saw 'not installed' on My Services because
enforce_auth returned 403 for this endpoint.
"""
client, mgr = peer_client
with patch('app.auth_manager', mgr):
r = client.get('/api/services/active')
# 200 (or whatever the route returns) but NOT 403
assert r.status_code != 403, (
'/api/services/active returned 403 for peer — peer UI cannot show installed services'
)
def test_admin_role_still_allowed_services_active(flask_client, populated_auth_manager):
"""/api/services/active must remain accessible to admin sessions."""
with patch('app.auth_manager', populated_auth_manager):
try:
import auth_routes
with patch.object(auth_routes, 'auth_manager', populated_auth_manager, create=True):
r_login = flask_client.post('/api/auth/login',
data=json.dumps({'username': 'admin', 'password': 'AdminPass123!'}),
content_type='application/json')
assert r_login.status_code == 200
r = flask_client.get('/api/services/active')
except ImportError:
with flask_client.session_transaction() as sess:
sess['username'] = 'admin'
sess['role'] = 'admin'
r = flask_client.get('/api/services/active')
assert r.status_code != 403
if __name__ == '__main__':
pytest.main([__file__, '-v'])
+106
View File
@@ -515,3 +515,109 @@ def test_create_peer_rolls_back_firewall_on_dns_failure(
finally:
for p in patches:
p.stop()
# ── service_access webdav gating ──────────────────────────────────────────────
def _make_admin_client_with_installed(auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry,
installed_services):
"""Return patch list for an admin client with get_installed_services pre-configured."""
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret'
mock_cfg = MagicMock()
mock_cfg.get_installed_services.return_value = installed_services
patches = [
patch('app.auth_manager', auth_mgr),
patch('app.config_manager', mock_cfg),
patch('app.email_manager', mock_email_mgr),
patch('app.calendar_manager', mock_calendar_mgr),
patch('app.file_manager', mock_file_mgr),
patch('app.wireguard_manager', mock_wg_mgr),
patch('app.peer_registry', mock_peer_registry),
patch('app.firewall_manager'),
]
try:
import auth_routes
patches.append(patch.object(auth_routes, 'auth_manager', auth_mgr, create=True))
except (ImportError, AttributeError):
pass
return patches
def test_webdav_not_offered_when_files_not_installed(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry):
"""Requesting service_access=['webdav'] must fail when files is not installed.
Regression guard: webdav was hardcoded into _valid_services even when
no store services were installed, misleading users into thinking WebDAV
was always available.
"""
patches = _make_admin_client_with_installed(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry,
installed_services={},
)
started = [p.start() for p in patches]
try:
with app.test_client() as client:
_login(client)
resp = _post_peer(client, _peer_payload(service_access=['webdav']))
assert resp.status_code == 400, (
f'expected 400 when requesting webdav without files installed, got {resp.status_code}'
)
data = json.loads(resp.data)
assert 'service_access' in data.get('error', '').lower()
finally:
for p in patches:
p.stop()
def test_webdav_offered_when_files_is_installed(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry):
"""Requesting service_access=['webdav'] must succeed when files is installed."""
patches = _make_admin_client_with_installed(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry,
installed_services={'files': {}},
)
started = [p.start() for p in patches]
try:
with app.test_client() as client:
_login(client)
resp = _post_peer(client, _peer_payload(service_access=['webdav']))
assert resp.status_code == 201, (
f'expected 201 when requesting webdav with files installed, got {resp.status_code}: {resp.data}'
)
finally:
for p in patches:
p.stop()
def test_no_services_installed_peer_gets_empty_service_access(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry):
"""When no store services are installed the default service_access must be empty."""
patches = _make_admin_client_with_installed(
auth_mgr, mock_email_mgr, mock_calendar_mgr,
mock_file_mgr, mock_wg_mgr, mock_peer_registry,
installed_services={},
)
started = [p.start() for p in patches]
try:
with app.test_client() as client:
_login(client)
resp = _post_peer(client, _peer_payload()) # no service_access in payload
assert resp.status_code == 201, (
f'expected 201 with no services installed, got {resp.status_code}: {resp.data}'
)
peer_dict = mock_peer_registry.add_peer.call_args[0][0]
assert peer_dict.get('service_access') == [], (
f"service_access should be [] when no services installed, got {peer_dict.get('service_access')}"
)
finally:
for p in patches:
p.stop()
+22
View File
@@ -801,6 +801,28 @@ class TestReapplyOnStartup(unittest.TestCase):
with patch('firewall_manager.apply_service_rules'):
ssm.reapply_on_startup() # must not raise
def test_reapply_always_regenerates_caddy_even_with_no_services(self):
"""Caddy must be regenerated on startup even when no store services are installed.
Regression guard: a cell rename or fresh install produces a stale Caddyfile
(wrong domain) if reapply_on_startup() returns early before calling caddy.
"""
ssm = _make_manager(installed={}) # no installed services
ssm.caddy_manager = MagicMock()
ssm.reapply_on_startup()
ssm.caddy_manager.regenerate_with_installed.assert_called_once_with([])
def test_reapply_regenerates_caddy_with_installed_routes(self):
"""When services are installed their caddy_route dicts are passed to regenerate."""
route = {'subdomain': 'myapp', 'backend': 'http://myapp:8080'}
ssm = _make_manager(installed={'myapp': {'service_ip': '172.20.1.10',
'iptables_rules': [],
'caddy_route': route}})
ssm.caddy_manager = MagicMock()
with patch('firewall_manager.apply_service_rules'):
ssm.reapply_on_startup()
ssm.caddy_manager.regenerate_with_installed.assert_called_once_with([route])
if __name__ == '__main__':
unittest.main()