fix: cross-cell routing for split-tunnel peers
Three related fixes for split-tunnel peers that need to reach connected cells: 1. apply_peer_rules/apply_all_peer_rules now accept wg_subnet (actual local VPN subnet) and cell_subnets (connected cells' vpn_subnets) parameters instead of hardcoding 10.0.0.0/24. All callers (startup, add_peer, update_peer, apply-enforcement endpoint) pass the real values. 2. Explicit ACCEPT rules are inserted in FORWARD for each connected cell's subnet so split-tunnel peers (internet_access=False) can still reach connected cells via the wg0→wg0 path. 3. apply_ip_range in network_manager now loads cell_links.json and passes it to generate_corefile(), fixing a race where the bootstrap DNS thread could overwrite the Corefile and wipe cross-cell DNS forwarding zones on startup. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+8
-1
@@ -308,7 +308,14 @@ def _apply_startup_enforcement():
|
|||||||
peers = peer_registry.list_peers()
|
peers = peer_registry.list_peers()
|
||||||
cell_links = cell_link_manager.list_connections()
|
cell_links = cell_link_manager.list_connections()
|
||||||
firewall_manager.reconcile_stale_peer_rules(peers)
|
firewall_manager.reconcile_stale_peer_rules(peers)
|
||||||
firewall_manager.apply_all_peer_rules(peers)
|
import ipaddress as _ipa
|
||||||
|
try:
|
||||||
|
_wg_addr = wireguard_manager._get_configured_address()
|
||||||
|
_wg_subnet = str(_ipa.ip_network(_wg_addr, strict=False)) if _wg_addr else '10.0.0.0/24'
|
||||||
|
except Exception:
|
||||||
|
_wg_subnet = '10.0.0.0/24'
|
||||||
|
_cell_subnets = [l['vpn_subnet'] for l in cell_links if l.get('vpn_subnet')]
|
||||||
|
firewall_manager.apply_all_peer_rules(peers, wg_subnet=_wg_subnet, cell_subnets=_cell_subnets)
|
||||||
firewall_manager.apply_all_cell_rules(cell_links)
|
firewall_manager.apply_all_cell_rules(cell_links)
|
||||||
firewall_manager.ensure_cell_api_dnat()
|
firewall_manager.ensure_cell_api_dnat()
|
||||||
# Embed DNAT rules in PostUp so they survive WireGuard interface restarts,
|
# Embed DNAT rules in PostUp so they survive WireGuard interface restarts,
|
||||||
|
|||||||
+29
-10
@@ -153,18 +153,27 @@ def clear_peer_rules(peer_ip: str) -> None:
|
|||||||
logger.error(f"clear_peer_rules({peer_ip}): {e}")
|
logger.error(f"clear_peer_rules({peer_ip}): {e}")
|
||||||
|
|
||||||
|
|
||||||
def apply_peer_rules(peer_ip: str, settings: Dict[str, Any]) -> bool:
|
def apply_peer_rules(peer_ip: str, settings: Dict[str, Any],
|
||||||
|
wg_subnet: str = '10.0.0.0/24',
|
||||||
|
cell_subnets: Optional[List[str]] = None) -> bool:
|
||||||
"""
|
"""
|
||||||
Apply iptables FORWARD rules for a peer based on their access settings.
|
Apply iptables FORWARD rules for a peer based on their access settings.
|
||||||
|
|
||||||
|
wg_subnet: the local cell's WireGuard VPN subnet (e.g. '10.0.2.0/24').
|
||||||
|
Used for the peer-to-peer ACCEPT/DROP rule. Defaults to the
|
||||||
|
legacy hardcoded value so callers that don't yet pass it are safe.
|
||||||
|
cell_subnets: list of connected cells' vpn_subnet strings. When provided,
|
||||||
|
explicit ACCEPT rules are added so split-tunnel peers can reach
|
||||||
|
connected cells regardless of the internet_access setting.
|
||||||
|
|
||||||
Each rule is inserted at position 1 (-I), so the LAST call ends up at the TOP.
|
Each rule is inserted at position 1 (-I), so the LAST call ends up at the TOP.
|
||||||
We insert in reverse-priority order: lowest-priority rules first, highest last.
|
We insert in reverse-priority order: lowest-priority rules first, highest last.
|
||||||
|
|
||||||
Desired final chain order (top = highest priority):
|
Desired final chain order (top = highest priority):
|
||||||
1. Per-service DROP/ACCEPT (most specific — must beat private-net ACCEPT)
|
1. Connected-cell subnet ACCEPTs (explicit cross-cell routing)
|
||||||
2. Peer-to-peer ACCEPT/DROP (10.0.0.0/24)
|
2. Peer-to-peer ACCEPT/DROP (local VPN subnet)
|
||||||
3. Private-net ACCEPTs (for no-internet peers to reach local resources)
|
3. Private-net ACCEPTs (for no-internet peers to reach local resources)
|
||||||
4. Internet DROP or ACCEPT (lowest priority catch-all)
|
4. Internet DROP or ACCEPT (lowest priority catch-all)
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
comment = _peer_comment(peer_ip)
|
comment = _peer_comment(peer_ip)
|
||||||
@@ -187,24 +196,34 @@ def apply_peer_rules(peer_ip: str, settings: Dict[str, Any]) -> bool:
|
|||||||
_iptables(['-I', 'FORWARD', '-s', peer_ip, '-d', net,
|
_iptables(['-I', 'FORWARD', '-s', peer_ip, '-d', net,
|
||||||
'-m', 'comment', '--comment', comment, '-j', 'ACCEPT'])
|
'-m', 'comment', '--comment', comment, '-j', 'ACCEPT'])
|
||||||
|
|
||||||
# --- Step 2 --- Peer-to-peer (10.0.0.0/24)
|
# --- Step 2 --- Peer-to-peer: use the actual local VPN subnet
|
||||||
target = 'ACCEPT' if peer_access else 'DROP'
|
target = 'ACCEPT' if peer_access else 'DROP'
|
||||||
_iptables(['-I', 'FORWARD', '-s', peer_ip, '-d', '10.0.0.0/24',
|
_iptables(['-I', 'FORWARD', '-s', peer_ip, '-d', wg_subnet,
|
||||||
'-m', 'comment', '--comment', comment, '-j', target])
|
'-m', 'comment', '--comment', comment, '-j', target])
|
||||||
|
|
||||||
|
# --- Step 3 --- Explicit ACCEPT for each connected cell's subnet so
|
||||||
|
# split-tunnel peers can route to connected cells (wg0 → wg0 forwarding).
|
||||||
|
if cell_subnets:
|
||||||
|
for subnet in reversed(cell_subnets):
|
||||||
|
_iptables(['-I', 'FORWARD', '-s', peer_ip, '-d', subnet,
|
||||||
|
'-m', 'comment', '--comment', comment, '-j', 'ACCEPT'])
|
||||||
|
|
||||||
# Service access restriction is done entirely by CoreDNS ACL.
|
# Service access restriction is done entirely by CoreDNS ACL.
|
||||||
# No per-peer iptables rule for Caddy:80 — blocking it would also
|
# No per-peer iptables rule for Caddy:80 — blocking it would also
|
||||||
# prevent the peer from reaching the PIC web UI and API.
|
# prevent the peer from reaching the PIC web UI and API.
|
||||||
|
|
||||||
logger.info(f"Applied rules for {peer_ip}: internet={internet_access} "
|
logger.info(f"Applied rules for {peer_ip}: internet={internet_access} "
|
||||||
f"services={service_access} peers={peer_access}")
|
f"services={service_access} peers={peer_access} "
|
||||||
|
f"wg_subnet={wg_subnet} cell_subnets={cell_subnets}")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"apply_peer_rules({peer_ip}): {e}")
|
logger.error(f"apply_peer_rules({peer_ip}): {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def apply_all_peer_rules(peers: List[Dict[str, Any]]) -> None:
|
def apply_all_peer_rules(peers: List[Dict[str, Any]],
|
||||||
|
wg_subnet: str = '10.0.0.0/24',
|
||||||
|
cell_subnets: Optional[List[str]] = None) -> None:
|
||||||
"""Re-apply rules for all peers (called on startup)."""
|
"""Re-apply rules for all peers (called on startup)."""
|
||||||
ensure_caddy_virtual_ips()
|
ensure_caddy_virtual_ips()
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
@@ -215,7 +234,7 @@ def apply_all_peer_rules(peers: List[Dict[str, Any]]) -> None:
|
|||||||
'internet_access': peer.get('internet_access', True),
|
'internet_access': peer.get('internet_access', True),
|
||||||
'service_access': peer.get('service_access', list(SERVICE_IPS.keys())),
|
'service_access': peer.get('service_access', list(SERVICE_IPS.keys())),
|
||||||
'peer_access': peer.get('peer_access', True),
|
'peer_access': peer.get('peer_access', True),
|
||||||
})
|
}, wg_subnet=wg_subnet, cell_subnets=cell_subnets)
|
||||||
|
|
||||||
|
|
||||||
def reconcile_stale_peer_rules(peers: List[Dict[str, Any]]) -> int:
|
def reconcile_stale_peer_rules(peers: List[Dict[str, Any]]) -> int:
|
||||||
|
|||||||
@@ -460,7 +460,8 @@ class NetworkManager(BaseServiceManager):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
warnings.append(f"dnsmasq domain update failed: {e}")
|
warnings.append(f"dnsmasq domain update failed: {e}")
|
||||||
|
|
||||||
# 2. Regenerate Corefile using generate_corefile so it always stays consistent
|
# 2. Regenerate Corefile — include cell-to-cell forwarding stanzas so a
|
||||||
|
# domain/ip_range change doesn't wipe cross-cell DNS forwarding zones.
|
||||||
try:
|
try:
|
||||||
import firewall_manager as _fm
|
import firewall_manager as _fm
|
||||||
corefile = os.path.join(self.config_dir, 'dns', 'Corefile')
|
corefile = os.path.join(self.config_dir, 'dns', 'Corefile')
|
||||||
@@ -470,7 +471,13 @@ class NetworkManager(BaseServiceManager):
|
|||||||
peers = _json.loads(open(peers_file).read()) if os.path.exists(peers_file) else []
|
peers = _json.loads(open(peers_file).read()) if os.path.exists(peers_file) else []
|
||||||
except Exception:
|
except Exception:
|
||||||
peers = []
|
peers = []
|
||||||
_fm.generate_corefile(peers, corefile, domain)
|
cell_links_file = os.path.join(self.data_dir, 'cell_links.json')
|
||||||
|
try:
|
||||||
|
import json as _json2
|
||||||
|
cell_links = _json2.loads(open(cell_links_file).read()) if os.path.exists(cell_links_file) else []
|
||||||
|
except Exception:
|
||||||
|
cell_links = []
|
||||||
|
_fm.generate_corefile(peers, corefile, domain, cell_links=cell_links)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
warnings.append(f"Corefile domain update failed: {e}")
|
warnings.append(f"Corefile domain update failed: {e}")
|
||||||
|
|
||||||
|
|||||||
+20
-3
@@ -38,6 +38,13 @@ def add_peer():
|
|||||||
from app import (peer_registry, wireguard_manager, firewall_manager,
|
from app import (peer_registry, wireguard_manager, firewall_manager,
|
||||||
email_manager, calendar_manager, file_manager, auth_manager,
|
email_manager, calendar_manager, file_manager, auth_manager,
|
||||||
cell_link_manager, _configured_domain, COREFILE_PATH)
|
cell_link_manager, _configured_domain, COREFILE_PATH)
|
||||||
|
try:
|
||||||
|
_wg_addr = wireguard_manager._get_configured_address()
|
||||||
|
_wg_subnet = str(ipaddress.ip_network(_wg_addr, strict=False)) if _wg_addr else '10.0.0.0/24'
|
||||||
|
except Exception:
|
||||||
|
_wg_subnet = '10.0.0.0/24'
|
||||||
|
_cell_links = cell_link_manager.list_connections()
|
||||||
|
_cell_subnets = [l['vpn_subnet'] for l in _cell_links if l.get('vpn_subnet')]
|
||||||
data = request.get_json(silent=True)
|
data = request.get_json(silent=True)
|
||||||
if data is None:
|
if data is None:
|
||||||
return jsonify({"error": "No data provided"}), 400
|
return jsonify({"error": "No data provided"}), 400
|
||||||
@@ -118,7 +125,8 @@ def add_peer():
|
|||||||
return jsonify({"error": f"Peer {peer_name} already exists"}), 400
|
return jsonify({"error": f"Peer {peer_name} already exists"}), 400
|
||||||
peer_added_to_registry = True
|
peer_added_to_registry = True
|
||||||
|
|
||||||
firewall_manager.apply_peer_rules(peer_info['ip'], peer_info)
|
firewall_manager.apply_peer_rules(peer_info['ip'], peer_info,
|
||||||
|
wg_subnet=_wg_subnet, cell_subnets=_cell_subnets)
|
||||||
firewall_applied = True
|
firewall_applied = True
|
||||||
|
|
||||||
wg_allowed = f"{assigned_ip}/32" if '/' not in assigned_ip else assigned_ip
|
wg_allowed = f"{assigned_ip}/32" if '/' not in assigned_ip else assigned_ip
|
||||||
@@ -153,7 +161,15 @@ def add_peer():
|
|||||||
@bp.route('/api/peers/<peer_name>', methods=['PUT'])
|
@bp.route('/api/peers/<peer_name>', methods=['PUT'])
|
||||||
def update_peer(peer_name):
|
def update_peer(peer_name):
|
||||||
try:
|
try:
|
||||||
from app import peer_registry, firewall_manager, cell_link_manager, _configured_domain, COREFILE_PATH
|
from app import (peer_registry, wireguard_manager, firewall_manager,
|
||||||
|
cell_link_manager, _configured_domain, COREFILE_PATH)
|
||||||
|
try:
|
||||||
|
_wg_addr = wireguard_manager._get_configured_address()
|
||||||
|
_wg_subnet = str(ipaddress.ip_network(_wg_addr, strict=False)) if _wg_addr else '10.0.0.0/24'
|
||||||
|
except Exception:
|
||||||
|
_wg_subnet = '10.0.0.0/24'
|
||||||
|
_cell_links = cell_link_manager.list_connections()
|
||||||
|
_cell_subnets = [l['vpn_subnet'] for l in _cell_links if l.get('vpn_subnet')]
|
||||||
data = request.get_json(silent=True) or {}
|
data = request.get_json(silent=True) or {}
|
||||||
existing = peer_registry.get_peer(peer_name)
|
existing = peer_registry.get_peer(peer_name)
|
||||||
if not existing:
|
if not existing:
|
||||||
@@ -173,7 +189,8 @@ def update_peer(peer_name):
|
|||||||
if success:
|
if success:
|
||||||
updated_peer = peer_registry.get_peer(peer_name)
|
updated_peer = peer_registry.get_peer(peer_name)
|
||||||
if updated_peer:
|
if updated_peer:
|
||||||
firewall_manager.apply_peer_rules(updated_peer['ip'], updated_peer)
|
firewall_manager.apply_peer_rules(updated_peer['ip'], updated_peer,
|
||||||
|
wg_subnet=_wg_subnet, cell_subnets=_cell_subnets)
|
||||||
firewall_manager.apply_all_dns_rules(peer_registry.list_peers(), COREFILE_PATH, _configured_domain(),
|
firewall_manager.apply_all_dns_rules(peer_registry.list_peers(), COREFILE_PATH, _configured_domain(),
|
||||||
cell_links=cell_link_manager.list_connections())
|
cell_links=cell_link_manager.list_connections())
|
||||||
return jsonify({"message": f"Peer {peer_name} updated", "config_changed": config_changed})
|
return jsonify({"message": f"Peer {peer_name} updated", "config_changed": config_changed})
|
||||||
|
|||||||
+12
-3
@@ -1,4 +1,5 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import ipaddress
|
||||||
from flask import Blueprint, request, jsonify
|
from flask import Blueprint, request, jsonify
|
||||||
logger = logging.getLogger('picell')
|
logger = logging.getLogger('picell')
|
||||||
bp = Blueprint('wireguard', __name__)
|
bp = Blueprint('wireguard', __name__)
|
||||||
@@ -221,11 +222,19 @@ def refresh_external_ip():
|
|||||||
@bp.route('/api/wireguard/apply-enforcement', methods=['POST'])
|
@bp.route('/api/wireguard/apply-enforcement', methods=['POST'])
|
||||||
def apply_wireguard_enforcement():
|
def apply_wireguard_enforcement():
|
||||||
try:
|
try:
|
||||||
from app import peer_registry, firewall_manager, cell_link_manager, _configured_domain, COREFILE_PATH
|
from app import (peer_registry, wireguard_manager, firewall_manager,
|
||||||
|
cell_link_manager, _configured_domain, COREFILE_PATH)
|
||||||
peers = peer_registry.list_peers()
|
peers = peer_registry.list_peers()
|
||||||
firewall_manager.apply_all_peer_rules(peers)
|
try:
|
||||||
|
_wg_addr = wireguard_manager._get_configured_address()
|
||||||
|
_wg_subnet = str(ipaddress.ip_network(_wg_addr, strict=False)) if _wg_addr else '10.0.0.0/24'
|
||||||
|
except Exception:
|
||||||
|
_wg_subnet = '10.0.0.0/24'
|
||||||
|
_cell_links = cell_link_manager.list_connections()
|
||||||
|
_cell_subnets = [l['vpn_subnet'] for l in _cell_links if l.get('vpn_subnet')]
|
||||||
|
firewall_manager.apply_all_peer_rules(peers, wg_subnet=_wg_subnet, cell_subnets=_cell_subnets)
|
||||||
firewall_manager.apply_all_dns_rules(peers, COREFILE_PATH, _configured_domain(),
|
firewall_manager.apply_all_dns_rules(peers, COREFILE_PATH, _configured_domain(),
|
||||||
cell_links=cell_link_manager.list_connections())
|
cell_links=_cell_links)
|
||||||
return jsonify({'ok': True, 'peers': len(peers)})
|
return jsonify({'ok': True, 'peers': len(peers)})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return jsonify({'error': str(e)}), 500
|
return jsonify({'error': str(e)}), 500
|
||||||
|
|||||||
@@ -308,6 +308,87 @@ class TestApplyPeerRules(unittest.TestCase):
|
|||||||
for c in vpn_rules:
|
for c in vpn_rules:
|
||||||
self.assertIn('DROP', c)
|
self.assertIn('DROP', c)
|
||||||
|
|
||||||
|
def test_custom_wg_subnet_replaces_default(self):
|
||||||
|
"""wg_subnet parameter is used instead of hardcoded 10.0.0.0/24."""
|
||||||
|
calls_made = []
|
||||||
|
|
||||||
|
def fake_wg_exec(args):
|
||||||
|
calls_made.append(args)
|
||||||
|
m = MagicMock()
|
||||||
|
m.returncode = 0
|
||||||
|
m.stdout = ''
|
||||||
|
return m
|
||||||
|
|
||||||
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec):
|
||||||
|
firewall_manager.apply_peer_rules(
|
||||||
|
'10.0.2.5',
|
||||||
|
{'internet_access': True, 'peer_access': True,
|
||||||
|
'service_access': list(firewall_manager.SERVICE_IPS.keys())},
|
||||||
|
wg_subnet='10.0.2.0/24',
|
||||||
|
)
|
||||||
|
|
||||||
|
iptables_calls = [c for c in calls_made if 'iptables' in c]
|
||||||
|
subnets_in_rules = [token for c in iptables_calls for token in c
|
||||||
|
if '/' in token and token.startswith('10.')]
|
||||||
|
self.assertIn('10.0.2.0/24', subnets_in_rules,
|
||||||
|
"Custom wg_subnet should appear in peer-to-peer FORWARD rule")
|
||||||
|
self.assertNotIn('10.0.0.0/24', subnets_in_rules,
|
||||||
|
"Default hardcoded subnet must not appear when custom wg_subnet given")
|
||||||
|
|
||||||
|
def test_cell_subnets_get_explicit_accept_rules(self):
|
||||||
|
"""Each cell subnet gets an explicit ACCEPT rule for cross-cell routing."""
|
||||||
|
calls_made = []
|
||||||
|
|
||||||
|
def fake_wg_exec(args):
|
||||||
|
calls_made.append(args)
|
||||||
|
m = MagicMock()
|
||||||
|
m.returncode = 0
|
||||||
|
m.stdout = ''
|
||||||
|
return m
|
||||||
|
|
||||||
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec):
|
||||||
|
firewall_manager.apply_peer_rules(
|
||||||
|
'10.0.2.5',
|
||||||
|
{'internet_access': False, 'peer_access': True,
|
||||||
|
'service_access': list(firewall_manager.SERVICE_IPS.keys())},
|
||||||
|
wg_subnet='10.0.2.0/24',
|
||||||
|
cell_subnets=['10.0.0.0/24', '10.0.1.0/24'],
|
||||||
|
)
|
||||||
|
|
||||||
|
iptables_calls = [c for c in calls_made if 'iptables' in c]
|
||||||
|
for cell_net in ('10.0.0.0/24', '10.0.1.0/24'):
|
||||||
|
cell_rules = [c for c in iptables_calls if '-d' in c and cell_net in c]
|
||||||
|
self.assertTrue(cell_rules,
|
||||||
|
f"Expected FORWARD rule for cell subnet {cell_net}")
|
||||||
|
for c in cell_rules:
|
||||||
|
self.assertIn('ACCEPT', c,
|
||||||
|
f"Cell subnet rule for {cell_net} must be ACCEPT")
|
||||||
|
|
||||||
|
def test_no_cell_subnets_no_extra_rules(self):
|
||||||
|
"""When cell_subnets is empty/None, no extra FORWARD rules are added."""
|
||||||
|
calls_made = []
|
||||||
|
|
||||||
|
def fake_wg_exec(args):
|
||||||
|
calls_made.append(args)
|
||||||
|
m = MagicMock()
|
||||||
|
m.returncode = 0
|
||||||
|
m.stdout = ''
|
||||||
|
return m
|
||||||
|
|
||||||
|
with patch.object(firewall_manager, '_wg_exec', side_effect=fake_wg_exec):
|
||||||
|
firewall_manager.apply_peer_rules(
|
||||||
|
'10.0.2.5',
|
||||||
|
{'internet_access': True, 'peer_access': True,
|
||||||
|
'service_access': list(firewall_manager.SERVICE_IPS.keys())},
|
||||||
|
wg_subnet='10.0.2.0/24',
|
||||||
|
cell_subnets=[],
|
||||||
|
)
|
||||||
|
|
||||||
|
iptables_calls = [c for c in calls_made if 'iptables' in c and '-I' in c]
|
||||||
|
# Only 2 rules expected: the catch-all ACCEPT + the peer-to-peer ACCEPT
|
||||||
|
self.assertEqual(len(iptables_calls), 2,
|
||||||
|
f"Expected exactly 2 INSERT rules, got: {iptables_calls}")
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# apply_all_peer_rules
|
# apply_all_peer_rules
|
||||||
@@ -330,6 +411,18 @@ class TestApplyAllPeerRules(unittest.TestCase):
|
|||||||
firewall_manager.apply_all_peer_rules(peers)
|
firewall_manager.apply_all_peer_rules(peers)
|
||||||
self.assertEqual(mock_apply.call_count, 1)
|
self.assertEqual(mock_apply.call_count, 1)
|
||||||
|
|
||||||
|
def test_wg_subnet_and_cell_subnets_forwarded(self):
|
||||||
|
"""apply_all_peer_rules passes wg_subnet and cell_subnets to each apply_peer_rules call."""
|
||||||
|
peers = [_make_peer('10.0.2.2')]
|
||||||
|
cell_subnets = ['10.0.0.0/24', '10.0.1.0/24']
|
||||||
|
with patch.object(firewall_manager, 'ensure_caddy_virtual_ips', return_value=True), \
|
||||||
|
patch.object(firewall_manager, 'apply_peer_rules', return_value=True) as mock_apply:
|
||||||
|
firewall_manager.apply_all_peer_rules(peers, wg_subnet='10.0.2.0/24',
|
||||||
|
cell_subnets=cell_subnets)
|
||||||
|
call_kwargs = mock_apply.call_args_list[0].kwargs
|
||||||
|
self.assertEqual(call_kwargs.get('wg_subnet'), '10.0.2.0/24')
|
||||||
|
self.assertEqual(call_kwargs.get('cell_subnets'), cell_subnets)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# clear_peer_rules
|
# clear_peer_rules
|
||||||
|
|||||||
Reference in New Issue
Block a user