2f5370bd98
Unit Tests / test (push) Successful in 11m24s
These files were created during Steps 1-4 of the services architecture but were never staged: AccountManager (per-service credential provisioning), ServiceComposer (docker-compose lifecycle), built-in service manifests for email/calendar/files, and their test suites (158 tests). Also un-tracks .coverage binaries that were accidentally committed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
442 lines
18 KiB
Python
442 lines
18 KiB
Python
"""
|
|
Tests for AccountManager — per-service credential provisioning.
|
|
|
|
Covers:
|
|
- provision: dispatches to right manager method, stores credentials, generates password
|
|
- deprovision: calls manager method, removes stored credentials
|
|
- get_credentials / list_accounts / list_peer_services
|
|
- deprovision_peer: bulk cleanup on peer deletion
|
|
- store_credentials: direct storage (used by peers-POST legacy route)
|
|
- get_all_credentials: returns all creds for a peer
|
|
- credential file is created with 0o600
|
|
- unknown service / missing manager errors
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import stat
|
|
import threading
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'api'))
|
|
|
|
from account_manager import AccountManager
|
|
|
|
|
|
# ── helpers ────────────────────────────────────────────────────────────────────
|
|
|
|
def _make_am(tmp_path: Path, registry=None, **managers) -> AccountManager:
|
|
if registry is None:
|
|
registry = _make_registry()
|
|
return AccountManager(service_registry=registry, data_dir=str(tmp_path), **managers)
|
|
|
|
|
|
def _make_registry(services=None):
|
|
reg = MagicMock()
|
|
if services is None:
|
|
services = {
|
|
'email': {
|
|
'id': 'email', 'kind': 'builtin',
|
|
'accounts': {'manager': 'email_manager', 'credentials': ['password']},
|
|
'config': {'domain': 'example.com', 'smtp_port': 25},
|
|
},
|
|
'calendar': {
|
|
'id': 'calendar', 'kind': 'builtin',
|
|
'accounts': {'manager': 'calendar_manager', 'credentials': ['password']},
|
|
'config': {},
|
|
},
|
|
'files': {
|
|
'id': 'files', 'kind': 'builtin',
|
|
'accounts': {'manager': 'file_manager', 'credentials': ['password']},
|
|
'config': {},
|
|
},
|
|
}
|
|
reg.get.side_effect = lambda svc_id: services.get(svc_id)
|
|
return reg
|
|
|
|
|
|
def _make_email_mgr(ok=True):
|
|
m = MagicMock()
|
|
m.create_email_user.return_value = ok
|
|
m.delete_email_user.return_value = ok
|
|
return m
|
|
|
|
|
|
def _make_cal_mgr(ok=True):
|
|
m = MagicMock()
|
|
m.create_calendar_user.return_value = ok
|
|
m.delete_calendar_user.return_value = ok
|
|
return m
|
|
|
|
|
|
def _make_file_mgr(ok=True):
|
|
m = MagicMock()
|
|
m.create_user.return_value = ok
|
|
m.delete_user.return_value = ok
|
|
return m
|
|
|
|
|
|
# ── Provision ─────────────────────────────────────────────────────────────────
|
|
|
|
class TestProvision(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
import tempfile
|
|
self.tmp = Path(tempfile.mkdtemp())
|
|
self.email_mgr = _make_email_mgr()
|
|
self.cal_mgr = _make_cal_mgr()
|
|
self.file_mgr = _make_file_mgr()
|
|
self.am = _make_am(
|
|
self.tmp,
|
|
email_manager=self.email_mgr,
|
|
calendar_manager=self.cal_mgr,
|
|
file_manager=self.file_mgr,
|
|
)
|
|
|
|
def test_provision_email_calls_create_email_user(self):
|
|
self.am.provision('email', 'alice', password='s3cret')
|
|
self.email_mgr.create_email_user.assert_called_once_with('alice', 'example.com', 's3cret')
|
|
|
|
def test_provision_calendar_calls_create_calendar_user(self):
|
|
self.am.provision('calendar', 'alice', password='s3cret')
|
|
self.cal_mgr.create_calendar_user.assert_called_once_with('alice', 's3cret')
|
|
|
|
def test_provision_files_calls_create_user(self):
|
|
self.am.provision('files', 'alice', password='s3cret')
|
|
self.file_mgr.create_user.assert_called_once_with('alice', 's3cret')
|
|
|
|
def test_provision_generates_password_when_none_given(self):
|
|
creds = self.am.provision('email', 'alice')
|
|
self.assertIn('password', creds)
|
|
self.assertTrue(len(creds['password']) >= 16)
|
|
|
|
def test_provision_returns_credential_dict(self):
|
|
creds = self.am.provision('email', 'alice', password='mypassword')
|
|
self.assertEqual(creds, {'password': 'mypassword'})
|
|
|
|
def test_provision_stores_credentials(self):
|
|
self.am.provision('email', 'alice', password='pw')
|
|
stored = self.am.get_credentials('email', 'alice')
|
|
self.assertEqual(stored, {'password': 'pw'})
|
|
|
|
def test_provision_multiple_peers_stored_independently(self):
|
|
self.am.provision('email', 'alice', password='pw-alice')
|
|
self.am.provision('email', 'bob', password='pw-bob')
|
|
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'pw-alice'})
|
|
self.assertEqual(self.am.get_credentials('email', 'bob'), {'password': 'pw-bob'})
|
|
|
|
def test_provision_raises_for_unknown_service(self):
|
|
with self.assertRaises(ValueError):
|
|
self.am.provision('doesnotexist', 'alice')
|
|
|
|
def test_provision_raises_when_service_has_no_accounts(self):
|
|
reg = _make_registry({'nosvc': {'id': 'nosvc', 'accounts': {}, 'config': {}}})
|
|
am = _make_am(self.tmp, registry=reg, email_manager=self.email_mgr)
|
|
with self.assertRaises(ValueError):
|
|
am.provision('nosvc', 'alice')
|
|
|
|
def test_provision_raises_when_manager_not_registered(self):
|
|
am = _make_am(self.tmp) # no managers passed
|
|
with self.assertRaises(ValueError):
|
|
am.provision('email', 'alice')
|
|
|
|
def test_provision_raises_runtime_error_when_manager_returns_false(self):
|
|
am = _make_am(self.tmp, email_manager=_make_email_mgr(ok=False))
|
|
with self.assertRaises(RuntimeError):
|
|
am.provision('email', 'alice')
|
|
|
|
def test_provision_email_raises_when_domain_not_configured(self):
|
|
reg = _make_registry({'email': {
|
|
'id': 'email', 'accounts': {'manager': 'email_manager'},
|
|
'config': {'domain': ''},
|
|
}})
|
|
am = _make_am(self.tmp, registry=reg, email_manager=self.email_mgr)
|
|
with self.assertRaises(ValueError):
|
|
am.provision('email', 'alice')
|
|
|
|
|
|
# ── Credential file permissions ───────────────────────────────────────────────
|
|
|
|
class TestCredentialFilePermissions(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
import tempfile
|
|
self.tmp = Path(tempfile.mkdtemp())
|
|
self.am = _make_am(self.tmp, email_manager=_make_email_mgr())
|
|
|
|
def test_credentials_file_created_with_0600(self):
|
|
self.am.provision('email', 'alice', password='pw')
|
|
creds_path = self.tmp / 'peer_service_credentials.json'
|
|
mode = stat.S_IMODE(creds_path.stat().st_mode)
|
|
self.assertEqual(mode, 0o600, f'Expected 0o600, got {oct(mode)}')
|
|
|
|
|
|
# ── Deprovision ───────────────────────────────────────────────────────────────
|
|
|
|
class TestDeprovision(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
import tempfile
|
|
self.tmp = Path(tempfile.mkdtemp())
|
|
self.email_mgr = _make_email_mgr()
|
|
self.cal_mgr = _make_cal_mgr()
|
|
self.file_mgr = _make_file_mgr()
|
|
self.am = _make_am(
|
|
self.tmp,
|
|
email_manager=self.email_mgr,
|
|
calendar_manager=self.cal_mgr,
|
|
file_manager=self.file_mgr,
|
|
)
|
|
self.am.provision('email', 'alice', password='pw')
|
|
|
|
def test_deprovision_email_calls_delete_email_user(self):
|
|
self.am.deprovision('email', 'alice')
|
|
self.email_mgr.delete_email_user.assert_called_once_with('alice', 'example.com')
|
|
|
|
def test_deprovision_removes_stored_credentials(self):
|
|
self.am.deprovision('email', 'alice')
|
|
self.assertIsNone(self.am.get_credentials('email', 'alice'))
|
|
|
|
def test_deprovision_returns_true_on_success(self):
|
|
ok = self.am.deprovision('email', 'alice')
|
|
self.assertTrue(ok)
|
|
|
|
def test_deprovision_raises_for_unknown_service(self):
|
|
with self.assertRaises(ValueError):
|
|
self.am.deprovision('ghost', 'alice')
|
|
|
|
def test_deprovision_removes_service_entry_when_last_peer_gone(self):
|
|
self.am.deprovision('email', 'alice')
|
|
creds_file = self.tmp / 'peer_service_credentials.json'
|
|
data = json.loads(creds_file.read_text())
|
|
self.assertNotIn('email', data)
|
|
|
|
def test_deprovision_calendar_calls_delete_calendar_user(self):
|
|
self.am.provision('calendar', 'alice', password='pw')
|
|
self.am.deprovision('calendar', 'alice')
|
|
self.cal_mgr.delete_calendar_user.assert_called_once_with('alice')
|
|
|
|
def test_deprovision_files_calls_delete_user(self):
|
|
self.am.provision('files', 'alice', password='pw')
|
|
self.am.deprovision('files', 'alice')
|
|
self.file_mgr.delete_user.assert_called_once_with('alice')
|
|
|
|
|
|
# ── Queries ───────────────────────────────────────────────────────────────────
|
|
|
|
class TestQueries(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
import tempfile
|
|
self.tmp = Path(tempfile.mkdtemp())
|
|
self.am = _make_am(
|
|
self.tmp,
|
|
email_manager=_make_email_mgr(),
|
|
calendar_manager=_make_cal_mgr(),
|
|
file_manager=_make_file_mgr(),
|
|
)
|
|
self.am.provision('email', 'alice', password='pw-alice-email')
|
|
self.am.provision('email', 'bob', password='pw-bob-email')
|
|
self.am.provision('calendar', 'alice', password='pw-alice-cal')
|
|
|
|
def test_get_credentials_returns_stored(self):
|
|
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'pw-alice-email'})
|
|
|
|
def test_get_credentials_returns_none_for_unknown_peer(self):
|
|
self.assertIsNone(self.am.get_credentials('email', 'nobody'))
|
|
|
|
def test_get_credentials_returns_none_for_unknown_service(self):
|
|
self.assertIsNone(self.am.get_credentials('ghost', 'alice'))
|
|
|
|
def test_list_accounts_returns_provisioned_peers(self):
|
|
accounts = self.am.list_accounts('email')
|
|
self.assertIn('alice', accounts)
|
|
self.assertIn('bob', accounts)
|
|
|
|
def test_list_accounts_empty_for_unprovisioned_service(self):
|
|
self.assertEqual(self.am.list_accounts('files'), [])
|
|
|
|
def test_list_peer_services_returns_all_services_for_peer(self):
|
|
services = self.am.list_peer_services('alice')
|
|
self.assertIn('email', services)
|
|
self.assertIn('calendar', services)
|
|
|
|
def test_list_peer_services_returns_empty_for_unknown_peer(self):
|
|
self.assertEqual(self.am.list_peer_services('nobody'), [])
|
|
|
|
def test_is_provisioned_true_when_account_exists(self):
|
|
self.assertTrue(self.am.is_provisioned('email', 'alice'))
|
|
|
|
def test_is_provisioned_false_when_no_account(self):
|
|
self.assertFalse(self.am.is_provisioned('email', 'nobody'))
|
|
|
|
def test_get_all_credentials_returns_all_services(self):
|
|
all_creds = self.am.get_all_credentials('alice')
|
|
self.assertIn('email', all_creds)
|
|
self.assertIn('calendar', all_creds)
|
|
self.assertEqual(all_creds['email'], {'password': 'pw-alice-email'})
|
|
|
|
def test_get_all_credentials_empty_for_unknown_peer(self):
|
|
self.assertEqual(self.am.get_all_credentials('nobody'), {})
|
|
|
|
|
|
# ── Bulk deprovision ──────────────────────────────────────────────────────────
|
|
|
|
class TestDeprovisionPeer(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
import tempfile
|
|
self.tmp = Path(tempfile.mkdtemp())
|
|
self.email_mgr = _make_email_mgr()
|
|
self.cal_mgr = _make_cal_mgr()
|
|
self.am = _make_am(
|
|
self.tmp,
|
|
email_manager=self.email_mgr,
|
|
calendar_manager=self.cal_mgr,
|
|
file_manager=_make_file_mgr(),
|
|
)
|
|
self.am.provision('email', 'alice', password='pw')
|
|
self.am.provision('calendar', 'alice', password='pw')
|
|
|
|
def test_deprovision_peer_removes_from_all_services(self):
|
|
self.am.deprovision_peer('alice')
|
|
self.assertIsNone(self.am.get_credentials('email', 'alice'))
|
|
self.assertIsNone(self.am.get_credentials('calendar', 'alice'))
|
|
|
|
def test_deprovision_peer_returns_results_dict(self):
|
|
results = self.am.deprovision_peer('alice')
|
|
self.assertIn('email', results)
|
|
self.assertIn('calendar', results)
|
|
self.assertTrue(results['email'])
|
|
self.assertTrue(results['calendar'])
|
|
|
|
def test_deprovision_peer_continues_after_one_service_fails(self):
|
|
self.email_mgr.delete_email_user.side_effect = RuntimeError('smtp down')
|
|
results = self.am.deprovision_peer('alice')
|
|
self.assertFalse(results.get('email'))
|
|
# calendar should still succeed even though email failed
|
|
self.assertTrue(results.get('calendar'))
|
|
|
|
def test_deprovision_peer_no_op_for_unknown_peer(self):
|
|
results = self.am.deprovision_peer('nobody')
|
|
self.assertEqual(results, {})
|
|
|
|
|
|
# ── Direct credential storage ─────────────────────────────────────────────────
|
|
|
|
class TestStoreCredentials(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
import tempfile
|
|
self.tmp = Path(tempfile.mkdtemp())
|
|
self.am = _make_am(self.tmp)
|
|
|
|
def test_store_credentials_makes_them_retrievable(self):
|
|
self.am.store_credentials('email', 'alice', {'password': 'mypassword'})
|
|
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'mypassword'})
|
|
|
|
def test_store_credentials_overwrites_existing(self):
|
|
self.am.store_credentials('email', 'alice', {'password': 'old'})
|
|
self.am.store_credentials('email', 'alice', {'password': 'new'})
|
|
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'new'})
|
|
|
|
def test_store_credentials_creates_file_with_0600(self):
|
|
self.am.store_credentials('email', 'alice', {'password': 'pw'})
|
|
creds_path = self.tmp / 'peer_service_credentials.json'
|
|
mode = stat.S_IMODE(creds_path.stat().st_mode)
|
|
self.assertEqual(mode, 0o600)
|
|
|
|
|
|
# ── Thread safety ─────────────────────────────────────────────────────────────
|
|
|
|
class TestThreadSafety(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
import tempfile
|
|
self.tmp = Path(tempfile.mkdtemp())
|
|
self.am = _make_am(self.tmp)
|
|
|
|
def test_concurrent_store_credentials_no_data_loss(self):
|
|
errors = []
|
|
def worker(peer_name):
|
|
try:
|
|
self.am.store_credentials('email', peer_name, {'password': f'pw-{peer_name}'})
|
|
except Exception as e:
|
|
errors.append(e)
|
|
|
|
threads = [threading.Thread(target=worker, args=(f'peer{i}',)) for i in range(20)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
self.assertEqual(errors, [])
|
|
accounts = self.am.list_accounts('email')
|
|
self.assertEqual(len(accounts), 20)
|
|
|
|
|
|
class TestEdgeCases(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
import tempfile
|
|
self.tmp = Path(tempfile.mkdtemp())
|
|
self.email_mgr = _make_email_mgr()
|
|
self.am = _make_am(self.tmp, email_manager=self.email_mgr,
|
|
calendar_manager=_make_cal_mgr(),
|
|
file_manager=_make_file_mgr())
|
|
|
|
def test_deprovision_peer_never_provisioned_returns_empty(self):
|
|
self.assertEqual(self.am.deprovision_peer('ghost'), {})
|
|
|
|
def test_deprovision_clears_credentials_even_when_manager_returns_false(self):
|
|
"""Credentials are removed even if underlying manager reports failure."""
|
|
self.am.provision('email', 'alice', password='pw')
|
|
self.email_mgr.delete_email_user.return_value = False
|
|
self.am.deprovision('email', 'alice')
|
|
self.assertIsNone(self.am.get_credentials('email', 'alice'))
|
|
|
|
def test_provision_twice_overwrites_credentials(self):
|
|
self.am.provision('email', 'alice', password='first')
|
|
self.am.provision('email', 'alice', password='second')
|
|
self.assertEqual(self.am.get_credentials('email', 'alice'), {'password': 'second'})
|
|
|
|
def test_provision_twice_calls_manager_both_times(self):
|
|
self.am.provision('email', 'alice', password='first')
|
|
self.am.provision('email', 'alice', password='second')
|
|
self.assertEqual(self.email_mgr.create_email_user.call_count, 2)
|
|
|
|
def test_corrupted_credentials_file_returns_empty_and_continues(self):
|
|
"""A corrupted JSON file is treated as empty rather than crashing."""
|
|
creds_path = self.tmp / 'peer_service_credentials.json'
|
|
creds_path.write_text('{invalid json}')
|
|
result = self.am.get_all_credentials('alice')
|
|
self.assertEqual(result, {})
|
|
|
|
def test_file_permissions_preserved_on_second_write(self):
|
|
"""0o600 must hold even after overwriting with a second provision."""
|
|
self.am.provision('email', 'alice', password='first')
|
|
self.am.provision('email', 'bob', password='second')
|
|
creds_path = self.tmp / 'peer_service_credentials.json'
|
|
mode = stat.S_IMODE(creds_path.stat().st_mode)
|
|
self.assertEqual(mode, 0o600, f'Expected 0o600 after overwrite, got {oct(mode)}')
|
|
|
|
def test_generated_password_is_url_safe(self):
|
|
"""token_urlsafe must not produce + or / characters."""
|
|
creds = self.am.provision('email', 'alice')
|
|
pwd = creds['password']
|
|
self.assertNotIn('+', pwd)
|
|
self.assertNotIn('/', pwd)
|
|
|
|
def test_store_then_deprovision_removes_credentials(self):
|
|
"""store_credentials + deprovision should cleanly remove the entry."""
|
|
self.am.store_credentials('email', 'alice', {'password': 'stored'})
|
|
self.am.deprovision('email', 'alice')
|
|
self.assertIsNone(self.am.get_credentials('email', 'alice'))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|