#!/usr/bin/env python3 """ Personal Internet Cell - CLI Tool Command-line interface for managing the cell """ import argparse import requests import json import sys from datetime import datetime API_BASE = "http://localhost:3000/api" def api_request(method, endpoint, data=None): """Make API request""" url = f"{API_BASE}{endpoint}" try: if method == "GET": response = requests.get(url) elif method == "POST": response = requests.post(url, json=data) elif method == "PUT": response = requests.put(url, json=data) elif method == "DELETE": response = requests.delete(url) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Error: {e}") return None def show_status(): """Show cell status""" status = api_request("GET", "/status") if status: print("Personal Internet Cell Status") print("=" * 40) print(f"Cell Name: {status.get('cell_name', 'Unknown')}") print(f"Domain: {status.get('domain', 'Unknown')}") print(f"Peers: {status.get('peers_count', 0)}") print(f"Uptime: {status.get('uptime', 0)} seconds") print("\nServices:") services = status.get('services', {}) for service, running in services.items(): status_icon = "🟢" if running else "🔴" print(f" {status_icon} {service}") def list_peers(): """List configured peers""" peers = api_request("GET", "/peers") if peers is not None: if not peers: print("No peers configured.") return print("Configured Peers:") print("=" * 40) for peer in peers: print(f"Name: {peer.get('name', 'Unknown')}") print(f"IP: {peer.get('ip', 'Unknown')}") print(f"Public Key: {peer.get('public_key', 'Unknown')[:20]}...") print(f"Added: {peer.get('added_at', 'Unknown')}") print("-" * 20) else: print("Failed to fetch peers.") def add_peer(name, ip, public_key): """Add a new peer""" data = { "name": name, "ip": ip, "public_key": public_key } result = api_request("POST", "/peers", data) if result: print(f"✅ {result.get('message', 'Peer added successfully')}") else: print("❌ Failed to add peer") def remove_peer(name): """Remove a peer""" result = api_request("DELETE", f"/peers/{name}") if result: print(f"✅ {result.get('message', 'Peer removed successfully')}") else: print("❌ Failed to remove peer") def show_config(): """Show cell configuration""" config = api_request("GET", "/config") if config: print("Cell Configuration:") print("=" * 40) for key, value in config.items(): print(f"{key}: {value}") def update_config(key, value): """Update cell configuration""" data = {key: value} result = api_request("PUT", "/config", data) if result: print(f"✅ {result.get('message', 'Configuration updated')}") else: print("❌ Failed to update configuration") def list_nat_rules(): result = api_request("GET", "/routing/nat") if result and "nat_rules" in result: rules = result["nat_rules"] if not rules: print("No NAT rules configured.") return print("NAT Rules:") for rule in rules: print(f"ID: {rule.get('id')}, Source: {rule.get('source_network')}, Target: {rule.get('target_interface')}, Masquerade: {rule.get('masquerade')}, Type: {rule.get('nat_type', 'MASQUERADE')}, Protocol: {rule.get('protocol', 'ALL')}, ExtPort: {rule.get('external_port', '')}, IntIP: {rule.get('internal_ip', '')}, IntPort: {rule.get('internal_port', '')}") else: print("Failed to fetch NAT rules.") def add_nat_rule(source, target, masquerade, nat_type, protocol, external_port, internal_ip, internal_port): data = { "source_network": source, "target_interface": target, "masquerade": masquerade, "nat_type": nat_type, "protocol": protocol, "external_port": external_port, "internal_ip": internal_ip, "internal_port": internal_port, } # Remove empty fields data = {k: v for k, v in data.items() if v not in [None, ""]} result = api_request("POST", "/routing/nat", data) if result: print("✅ NAT rule added.") else: print("❌ Failed to add NAT rule.") def delete_nat_rule(rule_id): result = api_request("DELETE", f"/routing/nat/{rule_id}") if result: print("✅ NAT rule deleted.") else: print("❌ Failed to delete NAT rule.") def list_peer_routes(): result = api_request("GET", "/routing/peers") if result and "peer_routes" in result: routes = result["peer_routes"] if not routes: print("No peer routes configured.") return print("Peer Routes:") for route in routes: print(f"Peer: {route.get('peer_name')}, IP: {route.get('peer_ip')}, Networks: {route.get('allowed_networks')}, Type: {route.get('route_type')}") else: print("Failed to fetch peer routes.") def add_peer_route(name, ip, networks, route_type): data = {"peer_name": name, "peer_ip": ip, "allowed_networks": [n.strip() for n in networks.split(',') if n.strip()], "route_type": route_type} result = api_request("POST", "/routing/peers", data) if result: print("✅ Peer route added.") else: print("❌ Failed to add peer route.") def delete_peer_route(name): result = api_request("DELETE", f"/routing/peers/{name}") if result: print("✅ Peer route deleted.") else: print("❌ Failed to delete peer route.") def list_firewall_rules(): result = api_request("GET", "/routing/firewall") if result and "firewall_rules" in result: rules = result["firewall_rules"] if not rules: print("No firewall rules configured.") return print("Firewall Rules:") for rule in rules: print(f"ID: {rule.get('id')}, Type: {rule.get('rule_type')}, Source: {rule.get('source')}, Dest: {rule.get('destination')}, Protocol: {rule.get('protocol', 'ALL')}, PortRange: {rule.get('port_range', '')}, Action: {rule.get('action')}") else: print("Failed to fetch firewall rules.") def add_firewall_rule(rule_type, source, destination, action, protocol, port_range): data = { "rule_type": rule_type, "source": source, "destination": destination, "action": action, "protocol": protocol, "port_range": port_range, } # Remove empty fields data = {k: v for k, v in data.items() if v not in [None, ""]} result = api_request("POST", "/routing/firewall", data) if result: print("✅ Firewall rule added.") else: print("❌ Failed to add firewall rule.") def delete_firewall_rule(rule_id): result = api_request("DELETE", f"/routing/firewall/{rule_id}") if result: print("✅ Firewall rule deleted.") else: print("❌ Failed to delete firewall rule.") def show_services_status(): status = api_request("GET", "/services/status") if status: print("Service Status:") for svc, info in status.items(): if isinstance(info, dict): print(f" {svc}: {info.get('status', 'unknown')}") else: print(f" {svc}: {info}") else: print("Failed to fetch service status.") def list_wireguard_peers(): peers = api_request("GET", "/wireguard/peers") if peers is not None: print("WireGuard Peers:") for peer in peers: print(f" Name: {peer.get('name', 'Unknown')}, Public Key: {peer.get('public_key', 'Unknown')}, IP: {peer.get('ip', 'Unknown')}, Status: {peer.get('status', 'Unknown')}") else: print("Failed to fetch WireGuard peers.") def show_network_info(): info = api_request("GET", "/network/info") if info: print("Network Info:") for k, v in info.items(): print(f" {k}: {v}") else: print("Failed to fetch network info.") def show_dns_status(): status = api_request("GET", "/dns/status") if status: print("DNS Status:") for k, v in status.items(): print(f" {k}: {v}") else: print("Failed to fetch DNS status.") def show_ntp_status(): status = api_request("GET", "/ntp/status") if status: print("NTP Status:") for k, v in status.items(): print(f" {k}: {v}") else: print("Failed to fetch NTP status.") def main(): parser = argparse.ArgumentParser(description="Personal Internet Cell CLI") subparsers = parser.add_subparsers(dest="command", help="Available commands") # Status command subparsers.add_parser("status", help="Show cell status") # Peers commands peers_parser = subparsers.add_parser("peers", help="Manage peers") peers_subparsers = peers_parser.add_subparsers(dest="peer_command") peers_subparsers.add_parser("list", help="List all peers") add_parser = peers_subparsers.add_parser("add", help="Add a peer") add_parser.add_argument("name", help="Peer name") add_parser.add_argument("ip", help="Peer IP address") add_parser.add_argument("public_key", help="Peer public key") remove_parser = peers_subparsers.add_parser("remove", help="Remove a peer") remove_parser.add_argument("name", help="Peer name") # Config commands config_parser = subparsers.add_parser("config", help="Manage configuration") config_subparsers = config_parser.add_subparsers(dest="config_command") config_subparsers.add_parser("show", help="Show current configuration") update_parser = config_subparsers.add_parser("update", help="Update configuration") update_parser.add_argument("key", help="Configuration key") update_parser.add_argument("value", help="Configuration value") # Routing commands routing_parser = subparsers.add_parser("routing", help="Manage routing, NAT, and firewall rules") routing_subparsers = routing_parser.add_subparsers(dest="routing_command") # NAT nat_parser = routing_subparsers.add_parser("nat", help="Manage NAT rules") nat_subparsers = nat_parser.add_subparsers(dest="nat_command") nat_subparsers.add_parser("list", help="List NAT rules") nat_add = nat_subparsers.add_parser("add", help="Add NAT rule") nat_add.add_argument("source", help="Source network (e.g. 192.168.1.0/24)") nat_add.add_argument("target", help="Target interface (e.g. eth0)") nat_add.add_argument("--masquerade", action="store_true", help="Enable masquerade (default: true)") nat_add.add_argument("--nat-type", default="MASQUERADE", choices=["MASQUERADE", "SNAT", "DNAT"], help="NAT type") nat_add.add_argument("--protocol", default="ALL", choices=["ALL", "TCP", "UDP"], help="Protocol") nat_add.add_argument("--external-port", default="", help="External port (for DNAT)") nat_add.add_argument("--internal-ip", default="", help="Internal IP (for DNAT)") nat_add.add_argument("--internal-port", default="", help="Internal port (for DNAT)") nat_del = nat_subparsers.add_parser("delete", help="Delete NAT rule") nat_del.add_argument("rule_id", help="NAT rule ID") # Peer Routes peers_parser = routing_subparsers.add_parser("peers", help="Manage peer routes") peers_subparsers = peers_parser.add_subparsers(dest="peers_command") peers_subparsers.add_parser("list", help="List peer routes") peers_add = peers_subparsers.add_parser("add", help="Add peer route") peers_add.add_argument("name", help="Peer name") peers_add.add_argument("ip", help="Peer IP") peers_add.add_argument("networks", help="Allowed networks (comma-separated)") peers_add.add_argument("--route-type", default="lan", help="Route type (lan, exit, bridge, split)") peers_del = peers_subparsers.add_parser("delete", help="Delete peer route") peers_del.add_argument("name", help="Peer name") # Firewall fw_parser = routing_subparsers.add_parser("firewall", help="Manage firewall rules") fw_subparsers = fw_parser.add_subparsers(dest="fw_command") fw_subparsers.add_parser("list", help="List firewall rules") fw_add = fw_subparsers.add_parser("add", help="Add firewall rule") fw_add.add_argument("rule_type", help="Rule type (INPUT, OUTPUT, FORWARD)") fw_add.add_argument("source", help="Source network") fw_add.add_argument("destination", help="Destination network") fw_add.add_argument("action", help="Action (ACCEPT, DROP, REJECT)") fw_add.add_argument("--protocol", default="ALL", choices=["ALL", "TCP", "UDP", "ICMP"], help="Protocol") fw_add.add_argument("--port-range", default="", help="Port or port range (e.g. 80 or 1000-2000)") fw_del = fw_subparsers.add_parser("delete", help="Delete firewall rule") fw_del.add_argument("rule_id", help="Firewall rule ID") # Add new CLI commands subparsers.add_parser("services-status", help="Show status of all services") subparsers.add_parser("wireguard-peers", help="List WireGuard peers") subparsers.add_parser("network-info", help="Show network info (IP, etc.)") subparsers.add_parser("dns-status", help="Show DNS status") subparsers.add_parser("ntp-status", help="Show NTP status") args = parser.parse_args() if not args.command: parser.print_help() return if args.command == "status": show_status() elif args.command == "peers": if args.peer_command == "list": list_peers() elif args.peer_command == "add": add_peer(args.name, args.ip, args.public_key) elif args.peer_command == "remove": remove_peer(args.name) elif args.command == "config": if args.config_command == "show": show_config() elif args.config_command == "update": update_config(args.key, args.value) elif args.command == "routing": if args.routing_command == "nat": if args.nat_command == "list": list_nat_rules() elif args.nat_command == "add": add_nat_rule(args.source, args.target, args.masquerade, args.nat_type, args.protocol, args.external_port, args.internal_ip, args.internal_port) elif args.nat_command == "delete": delete_nat_rule(args.rule_id) elif args.routing_command == "peers": if args.peers_command == "list": list_peer_routes() elif args.peers_command == "add": add_peer_route(args.name, args.ip, args.networks, args.route_type) elif args.peers_command == "delete": delete_peer_route(args.name) elif args.routing_command == "firewall": if args.fw_command == "list": list_firewall_rules() elif args.fw_command == "add": add_firewall_rule(args.rule_type, args.source, args.destination, args.action, args.protocol, args.port_range) elif args.fw_command == "delete": delete_firewall_rule(args.rule_id) elif args.command == "services-status": show_services_status() elif args.command == "wireguard-peers": list_wireguard_peers() elif args.command == "network-info": show_network_info() elif args.command == "dns-status": show_dns_status() elif args.command == "ntp-status": show_ntp_status() if __name__ == "__main__": main()