Skip to main content
Glama
vpn.py12.4 kB
"""VPN provider for OpenAccess MCP.""" import asyncio import subprocess import tempfile import os from pathlib import Path from typing import Optional, Dict, Any, List from dataclasses import dataclass @dataclass class VPNStatus: """VPN connection status.""" interface: str status: str # "up", "down", "connecting", "error" peer_id: Optional[str] = None config_id: Optional[str] = None ip_address: Optional[str] = None error: Optional[str] = None metadata: Dict[str, Any] = None def __post_init__(self): if self.metadata is None: self.metadata = {} class VPNProvider: """Handles VPN operations for WireGuard and OpenVPN.""" def __init__(self): self._active_connections: Dict[str, VPNStatus] = {} async def wireguard_toggle( self, peer_id: str, action: str, config_path: Optional[str] = None, interface_name: Optional[str] = None ) -> VPNStatus: """Toggle WireGuard connection up/down.""" if action not in ["up", "down"]: raise ValueError("Action must be 'up' or 'down'") try: if action == "up": return await self._wireguard_up(peer_id, config_path, interface_name) else: return await self._wireguard_down(peer_id, interface_name) except Exception as e: return VPNStatus( interface=interface_name or f"wg-{peer_id}", status="error", peer_id=peer_id, error=str(e) ) async def _wireguard_up( self, peer_id: str, config_path: Optional[str] = None, interface_name: Optional[str] = None ) -> VPNStatus: """Bring WireGuard interface up.""" interface = interface_name or f"wg-{peer_id}" # Check if interface already exists if await self._interface_exists(interface): return VPNStatus( interface=interface, status="up", peer_id=peer_id, error="Interface already exists" ) # Use wg-quick to bring interface up if config_path: cmd = ["wg-quick", "up", config_path] else: # Create temporary config if none provided config_path = await self._create_temp_wg_config(peer_id) cmd = ["wg-quick", "up", config_path] try: result = await self._run_command(cmd) if result.returncode == 0: # Get interface status ip_info = await self._get_interface_ip(interface) status = VPNStatus( interface=interface, status="up", peer_id=peer_id, ip_address=ip_info.get("ip"), metadata={"config_path": config_path} ) self._active_connections[peer_id] = status return status else: return VPNStatus( interface=interface, status="error", peer_id=peer_id, error=f"wg-quick failed: {result.stderr}" ) except Exception as e: return VPNStatus( interface=interface, status="error", peer_id=peer_id, error=str(e) ) async def _wireguard_down( self, peer_id: str, interface_name: Optional[str] = None ) -> VPNStatus: """Bring WireGuard interface down.""" interface = interface_name or f"wg-{peer_id}" if not await self._interface_exists(interface): return VPNStatus( interface=interface, status="down", peer_id=peer_id, error="Interface does not exist" ) try: cmd = ["wg-quick", "down", interface] result = await self._run_command(cmd) if result.returncode == 0: status = VPNStatus( interface=interface, status="down", peer_id=peer_id ) # Remove from active connections self._active_connections.pop(peer_id, None) return status else: return VPNStatus( interface=interface, status="error", peer_id=peer_id, error=f"wg-quick down failed: {result.stderr}" ) except Exception as e: return VPNStatus( interface=interface, status="error", peer_id=peer_id, error=str(e) ) async def openvpn_toggle( self, config_id: str, action: str, config_path: str, interface_name: Optional[str] = None ) -> VPNStatus: """Toggle OpenVPN connection connect/disconnect.""" if action not in ["connect", "disconnect"]: raise ValueError("Action must be 'connect' or 'disconnect'") try: if action == "connect": return await self._openvpn_connect(config_id, config_path, interface_name) else: return await self._openvpn_disconnect(config_id, interface_name) except Exception as e: return VPNStatus( interface=interface_name or f"tun-{config_id}", status="error", config_id=config_id, error=str(e) ) async def _openvpn_connect( self, config_id: str, config_path: str, interface_name: Optional[str] = None ) -> VPNStatus: """Connect to OpenVPN.""" interface = interface_name or f"tun-{config_id}" # Check if already connected if config_id in self._active_connections: return VPNStatus( interface=interface, status="up", config_id=config_id, error="Already connected" ) try: # Start OpenVPN in background cmd = ["openvpn", "--config", config_path, "--daemon"] result = await self._run_command(cmd) if result.returncode == 0: # Wait a bit for connection to establish await asyncio.sleep(2) # Check connection status if await self._openvpn_connected(config_path): status = VPNStatus( interface=interface, status="up", config_id=config_id, metadata={"config_path": config_path} ) self._active_connections[config_id] = status return status else: return VPNStatus( interface=interface, status="error", config_id=config_id, error="Connection failed to establish" ) else: return VPNStatus( interface=interface, status="error", config_id=config_id, error=f"OpenVPN failed: {result.stderr}" ) except Exception as e: return VPNStatus( interface=interface, status="error", config_id=config_id, error=str(e) ) async def _openvpn_disconnect( self, config_id: str, interface_name: Optional[str] = None ) -> VPNStatus: """Disconnect from OpenVPN.""" interface = interface_name or f"tun-{config_id}" if config_id not in self._active_connections: return VPNStatus( interface=interface, status="down", config_id=config_id, error="Not connected" ) try: # Kill OpenVPN processes for this config cmd = ["pkill", "-f", f"openvpn.*{config_id}"] await self._run_command(cmd) status = VPNStatus( interface=interface, status="down", config_id=config_id ) # Remove from active connections self._active_connections.pop(config_id, None) return status except Exception as e: return VPNStatus( interface=interface, status="error", config_id=config_id, error=str(e) ) async def list_connections(self) -> List[VPNStatus]: """List all active VPN connections.""" return list(self._active_connections.values()) async def get_connection_status(self, connection_id: str) -> Optional[VPNStatus]: """Get status of a specific connection.""" return self._active_connections.get(connection_id) async def _interface_exists(self, interface: str) -> bool: """Check if network interface exists.""" try: cmd = ["ip", "link", "show", interface] result = await self._run_command(cmd) return result.returncode == 0 except Exception: return False async def _get_interface_ip(self, interface: str) -> Dict[str, str]: """Get IP address information for an interface.""" try: cmd = ["ip", "addr", "show", interface] result = await self._run_command(cmd) if result.returncode == 0: # Parse IP address from output lines = result.stdout.split('\n') for line in lines: if 'inet ' in line: ip = line.split()[1].split('/')[0] return {"ip": ip} return {} except Exception: return {} async def _openvpn_connected(self, config_path: str) -> bool: """Check if OpenVPN is connected for a specific config.""" try: cmd = ["pgrep", "-f", f"openvpn.*{config_path}"] result = await self._run_command(cmd) return result.returncode == 0 except Exception: return False async def _create_temp_wg_config(self, peer_id: str) -> str: """Create a temporary WireGuard configuration file.""" # This is a placeholder - in production you'd load from a secure store config_content = f"""[Interface] PrivateKey = <private_key> Address = 10.0.0.1/24 ListenPort = 51820 [Peer] PublicKey = <public_key> AllowedIPs = 10.0.0.2/32 """ # Create temporary file temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) temp_file.write(config_content) temp_file.close() return temp_file.name async def _run_command(self, cmd: List[str]) -> subprocess.CompletedProcess: """Run a command asynchronously.""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, subprocess.run, cmd, capture_output=True, text=True) def get_stats(self) -> Dict[str, Any]: """Get VPN provider statistics.""" return { "active_connections": len(self._active_connections), "connection_types": { "wireguard": len([c for c in self._active_connections.values() if c.peer_id]), "openvpn": len([c for c in self._active_connections.values() if c.config_id]) } } async def stop_cleanup_task(self) -> None: """Stop the background cleanup task.""" # This is a no-op for VPN provider as it doesn't have a cleanup task pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keepithuman/openaccess-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server