Files
pic/api/cell_cli.py
Constantin 2277b11563 init
2025-09-12 23:04:52 +03:00

402 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Personal Internet Cell - CLI Tool
Command-line interface for managing the cell
"""
import argparse
import requests
import json
import sys
from datetime import datetime
API_BASE = "http://localhost:3000/api"
def api_request(method, endpoint, data=None):
"""Make API request"""
url = f"{API_BASE}{endpoint}"
try:
if method == "GET":
response = requests.get(url)
elif method == "POST":
response = requests.post(url, json=data)
elif method == "PUT":
response = requests.put(url, json=data)
elif method == "DELETE":
response = requests.delete(url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
def show_status():
"""Show cell status"""
status = api_request("GET", "/status")
if status:
print("Personal Internet Cell Status")
print("=" * 40)
print(f"Cell Name: {status.get('cell_name', 'Unknown')}")
print(f"Domain: {status.get('domain', 'Unknown')}")
print(f"Peers: {status.get('peers_count', 0)}")
print(f"Uptime: {status.get('uptime', 0)} seconds")
print("\nServices:")
services = status.get('services', {})
for service, running in services.items():
status_icon = "🟢" if running else "🔴"
print(f" {status_icon} {service}")
def list_peers():
"""List configured peers"""
peers = api_request("GET", "/peers")
if peers is not None:
if not peers:
print("No peers configured.")
return
print("Configured Peers:")
print("=" * 40)
for peer in peers:
print(f"Name: {peer.get('name', 'Unknown')}")
print(f"IP: {peer.get('ip', 'Unknown')}")
print(f"Public Key: {peer.get('public_key', 'Unknown')[:20]}...")
print(f"Added: {peer.get('added_at', 'Unknown')}")
print("-" * 20)
else:
print("Failed to fetch peers.")
def add_peer(name, ip, public_key):
"""Add a new peer"""
data = {
"name": name,
"ip": ip,
"public_key": public_key
}
result = api_request("POST", "/peers", data)
if result:
print(f"{result.get('message', 'Peer added successfully')}")
else:
print("❌ Failed to add peer")
def remove_peer(name):
"""Remove a peer"""
result = api_request("DELETE", f"/peers/{name}")
if result:
print(f"{result.get('message', 'Peer removed successfully')}")
else:
print("❌ Failed to remove peer")
def show_config():
"""Show cell configuration"""
config = api_request("GET", "/config")
if config:
print("Cell Configuration:")
print("=" * 40)
for key, value in config.items():
print(f"{key}: {value}")
def update_config(key, value):
"""Update cell configuration"""
data = {key: value}
result = api_request("PUT", "/config", data)
if result:
print(f"{result.get('message', 'Configuration updated')}")
else:
print("❌ Failed to update configuration")
def list_nat_rules():
result = api_request("GET", "/routing/nat")
if result and "nat_rules" in result:
rules = result["nat_rules"]
if not rules:
print("No NAT rules configured.")
return
print("NAT Rules:")
for rule in rules:
print(f"ID: {rule.get('id')}, Source: {rule.get('source_network')}, Target: {rule.get('target_interface')}, Masquerade: {rule.get('masquerade')}, Type: {rule.get('nat_type', 'MASQUERADE')}, Protocol: {rule.get('protocol', 'ALL')}, ExtPort: {rule.get('external_port', '')}, IntIP: {rule.get('internal_ip', '')}, IntPort: {rule.get('internal_port', '')}")
else:
print("Failed to fetch NAT rules.")
def add_nat_rule(source, target, masquerade, nat_type, protocol, external_port, internal_ip, internal_port):
data = {
"source_network": source,
"target_interface": target,
"masquerade": masquerade,
"nat_type": nat_type,
"protocol": protocol,
"external_port": external_port,
"internal_ip": internal_ip,
"internal_port": internal_port,
}
# Remove empty fields
data = {k: v for k, v in data.items() if v not in [None, ""]}
result = api_request("POST", "/routing/nat", data)
if result:
print("✅ NAT rule added.")
else:
print("❌ Failed to add NAT rule.")
def delete_nat_rule(rule_id):
result = api_request("DELETE", f"/routing/nat/{rule_id}")
if result:
print("✅ NAT rule deleted.")
else:
print("❌ Failed to delete NAT rule.")
def list_peer_routes():
result = api_request("GET", "/routing/peers")
if result and "peer_routes" in result:
routes = result["peer_routes"]
if not routes:
print("No peer routes configured.")
return
print("Peer Routes:")
for route in routes:
print(f"Peer: {route.get('peer_name')}, IP: {route.get('peer_ip')}, Networks: {route.get('allowed_networks')}, Type: {route.get('route_type')}")
else:
print("Failed to fetch peer routes.")
def add_peer_route(name, ip, networks, route_type):
data = {"peer_name": name, "peer_ip": ip, "allowed_networks": [n.strip() for n in networks.split(',') if n.strip()], "route_type": route_type}
result = api_request("POST", "/routing/peers", data)
if result:
print("✅ Peer route added.")
else:
print("❌ Failed to add peer route.")
def delete_peer_route(name):
result = api_request("DELETE", f"/routing/peers/{name}")
if result:
print("✅ Peer route deleted.")
else:
print("❌ Failed to delete peer route.")
def list_firewall_rules():
result = api_request("GET", "/routing/firewall")
if result and "firewall_rules" in result:
rules = result["firewall_rules"]
if not rules:
print("No firewall rules configured.")
return
print("Firewall Rules:")
for rule in rules:
print(f"ID: {rule.get('id')}, Type: {rule.get('rule_type')}, Source: {rule.get('source')}, Dest: {rule.get('destination')}, Protocol: {rule.get('protocol', 'ALL')}, PortRange: {rule.get('port_range', '')}, Action: {rule.get('action')}")
else:
print("Failed to fetch firewall rules.")
def add_firewall_rule(rule_type, source, destination, action, protocol, port_range):
data = {
"rule_type": rule_type,
"source": source,
"destination": destination,
"action": action,
"protocol": protocol,
"port_range": port_range,
}
# Remove empty fields
data = {k: v for k, v in data.items() if v not in [None, ""]}
result = api_request("POST", "/routing/firewall", data)
if result:
print("✅ Firewall rule added.")
else:
print("❌ Failed to add firewall rule.")
def delete_firewall_rule(rule_id):
result = api_request("DELETE", f"/routing/firewall/{rule_id}")
if result:
print("✅ Firewall rule deleted.")
else:
print("❌ Failed to delete firewall rule.")
def show_services_status():
status = api_request("GET", "/services/status")
if status:
print("Service Status:")
for svc, info in status.items():
if isinstance(info, dict):
print(f" {svc}: {info.get('status', 'unknown')}")
else:
print(f" {svc}: {info}")
else:
print("Failed to fetch service status.")
def list_wireguard_peers():
peers = api_request("GET", "/wireguard/peers")
if peers is not None:
print("WireGuard Peers:")
for peer in peers:
print(f" Name: {peer.get('name', 'Unknown')}, Public Key: {peer.get('public_key', 'Unknown')}, IP: {peer.get('ip', 'Unknown')}, Status: {peer.get('status', 'Unknown')}")
else:
print("Failed to fetch WireGuard peers.")
def show_network_info():
info = api_request("GET", "/network/info")
if info:
print("Network Info:")
for k, v in info.items():
print(f" {k}: {v}")
else:
print("Failed to fetch network info.")
def show_dns_status():
status = api_request("GET", "/dns/status")
if status:
print("DNS Status:")
for k, v in status.items():
print(f" {k}: {v}")
else:
print("Failed to fetch DNS status.")
def show_ntp_status():
status = api_request("GET", "/ntp/status")
if status:
print("NTP Status:")
for k, v in status.items():
print(f" {k}: {v}")
else:
print("Failed to fetch NTP status.")
def main():
parser = argparse.ArgumentParser(description="Personal Internet Cell CLI")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Status command
subparsers.add_parser("status", help="Show cell status")
# Peers commands
peers_parser = subparsers.add_parser("peers", help="Manage peers")
peers_subparsers = peers_parser.add_subparsers(dest="peer_command")
peers_subparsers.add_parser("list", help="List all peers")
add_parser = peers_subparsers.add_parser("add", help="Add a peer")
add_parser.add_argument("name", help="Peer name")
add_parser.add_argument("ip", help="Peer IP address")
add_parser.add_argument("public_key", help="Peer public key")
remove_parser = peers_subparsers.add_parser("remove", help="Remove a peer")
remove_parser.add_argument("name", help="Peer name")
# Config commands
config_parser = subparsers.add_parser("config", help="Manage configuration")
config_subparsers = config_parser.add_subparsers(dest="config_command")
config_subparsers.add_parser("show", help="Show current configuration")
update_parser = config_subparsers.add_parser("update", help="Update configuration")
update_parser.add_argument("key", help="Configuration key")
update_parser.add_argument("value", help="Configuration value")
# Routing commands
routing_parser = subparsers.add_parser("routing", help="Manage routing, NAT, and firewall rules")
routing_subparsers = routing_parser.add_subparsers(dest="routing_command")
# NAT
nat_parser = routing_subparsers.add_parser("nat", help="Manage NAT rules")
nat_subparsers = nat_parser.add_subparsers(dest="nat_command")
nat_subparsers.add_parser("list", help="List NAT rules")
nat_add = nat_subparsers.add_parser("add", help="Add NAT rule")
nat_add.add_argument("source", help="Source network (e.g. 192.168.1.0/24)")
nat_add.add_argument("target", help="Target interface (e.g. eth0)")
nat_add.add_argument("--masquerade", action="store_true", help="Enable masquerade (default: true)")
nat_add.add_argument("--nat-type", default="MASQUERADE", choices=["MASQUERADE", "SNAT", "DNAT"], help="NAT type")
nat_add.add_argument("--protocol", default="ALL", choices=["ALL", "TCP", "UDP"], help="Protocol")
nat_add.add_argument("--external-port", default="", help="External port (for DNAT)")
nat_add.add_argument("--internal-ip", default="", help="Internal IP (for DNAT)")
nat_add.add_argument("--internal-port", default="", help="Internal port (for DNAT)")
nat_del = nat_subparsers.add_parser("delete", help="Delete NAT rule")
nat_del.add_argument("rule_id", help="NAT rule ID")
# Peer Routes
peers_parser = routing_subparsers.add_parser("peers", help="Manage peer routes")
peers_subparsers = peers_parser.add_subparsers(dest="peers_command")
peers_subparsers.add_parser("list", help="List peer routes")
peers_add = peers_subparsers.add_parser("add", help="Add peer route")
peers_add.add_argument("name", help="Peer name")
peers_add.add_argument("ip", help="Peer IP")
peers_add.add_argument("networks", help="Allowed networks (comma-separated)")
peers_add.add_argument("--route-type", default="lan", help="Route type (lan, exit, bridge, split)")
peers_del = peers_subparsers.add_parser("delete", help="Delete peer route")
peers_del.add_argument("name", help="Peer name")
# Firewall
fw_parser = routing_subparsers.add_parser("firewall", help="Manage firewall rules")
fw_subparsers = fw_parser.add_subparsers(dest="fw_command")
fw_subparsers.add_parser("list", help="List firewall rules")
fw_add = fw_subparsers.add_parser("add", help="Add firewall rule")
fw_add.add_argument("rule_type", help="Rule type (INPUT, OUTPUT, FORWARD)")
fw_add.add_argument("source", help="Source network")
fw_add.add_argument("destination", help="Destination network")
fw_add.add_argument("action", help="Action (ACCEPT, DROP, REJECT)")
fw_add.add_argument("--protocol", default="ALL", choices=["ALL", "TCP", "UDP", "ICMP"], help="Protocol")
fw_add.add_argument("--port-range", default="", help="Port or port range (e.g. 80 or 1000-2000)")
fw_del = fw_subparsers.add_parser("delete", help="Delete firewall rule")
fw_del.add_argument("rule_id", help="Firewall rule ID")
# Add new CLI commands
subparsers.add_parser("services-status", help="Show status of all services")
subparsers.add_parser("wireguard-peers", help="List WireGuard peers")
subparsers.add_parser("network-info", help="Show network info (IP, etc.)")
subparsers.add_parser("dns-status", help="Show DNS status")
subparsers.add_parser("ntp-status", help="Show NTP status")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
if args.command == "status":
show_status()
elif args.command == "peers":
if args.peer_command == "list":
list_peers()
elif args.peer_command == "add":
add_peer(args.name, args.ip, args.public_key)
elif args.peer_command == "remove":
remove_peer(args.name)
elif args.command == "config":
if args.config_command == "show":
show_config()
elif args.config_command == "update":
update_config(args.key, args.value)
elif args.command == "routing":
if args.routing_command == "nat":
if args.nat_command == "list":
list_nat_rules()
elif args.nat_command == "add":
add_nat_rule(args.source, args.target, args.masquerade, args.nat_type, args.protocol, args.external_port, args.internal_ip, args.internal_port)
elif args.nat_command == "delete":
delete_nat_rule(args.rule_id)
elif args.routing_command == "peers":
if args.peers_command == "list":
list_peer_routes()
elif args.peers_command == "add":
add_peer_route(args.name, args.ip, args.networks, args.route_type)
elif args.peers_command == "delete":
delete_peer_route(args.name)
elif args.routing_command == "firewall":
if args.fw_command == "list":
list_firewall_rules()
elif args.fw_command == "add":
add_firewall_rule(args.rule_type, args.source, args.destination, args.action, args.protocol, args.port_range)
elif args.fw_command == "delete":
delete_firewall_rule(args.rule_id)
elif args.command == "services-status":
show_services_status()
elif args.command == "wireguard-peers":
list_wireguard_peers()
elif args.command == "network-info":
show_network_info()
elif args.command == "dns-status":
show_dns_status()
elif args.command == "ntp-status":
show_ntp_status()
if __name__ == "__main__":
main()