Skip to main content
Glama

FastMCP Multi-Tool Server

by rt0120-Ramco
server.py14.1 kB
#!/usr/bin/env python3 """ FastMCP Server with Multiple Tools A comprehensive MCP server with various utility tools """ import json import os import sys import subprocess import tempfile from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional import requests from dotenv import load_dotenv # Import FastMCP from fastmcp import FastMCP # Load environment variables load_dotenv() # Initialize FastMCP server mcp = FastMCP("Multi-Tool MCP Server") @mcp.tool() def get_current_time() -> str: """Get the current date and time.""" return datetime.now().strftime("%Y-%m-%d %H:%M:%S") @mcp.tool() def get_weather(city: str) -> Dict[str, Any]: """ Get weather information for a city using OpenWeatherMap API. Args: city: The name of the city to get weather for Returns: Dictionary containing weather information """ api_key = os.getenv("OPENWEATHER_API_KEY") if not api_key: return {"error": "OpenWeatherMap API key not configured. Set OPENWEATHER_API_KEY environment variable."} try: url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric" response = requests.get(url, timeout=10) response.raise_for_status() data = response.json() return { "city": data["name"], "country": data["sys"]["country"], "temperature": data["main"]["temp"], "feels_like": data["main"]["feels_like"], "humidity": data["main"]["humidity"], "pressure": data["main"]["pressure"], "description": data["weather"][0]["description"], "wind_speed": data["wind"]["speed"], "visibility": data.get("visibility", "N/A") } except requests.RequestException as e: return {"error": f"Failed to fetch weather data: {str(e)}"} except KeyError as e: return {"error": f"Unexpected API response format: {str(e)}"} @mcp.tool() def create_file(filepath: str, content: str) -> Dict[str, Any]: """ Create a new file with the specified content. Args: filepath: Path where the file should be created content: Content to write to the file Returns: Dictionary with operation status and details """ try: # Create directory if it doesn't exist path = Path(filepath) path.parent.mkdir(parents=True, exist_ok=True) # Write content to file with open(path, 'w', encoding='utf-8') as f: f.write(content) return { "success": True, "message": f"File created successfully at {filepath}", "size": len(content), "absolute_path": str(path.absolute()) } except Exception as e: return { "success": False, "error": f"Failed to create file: {str(e)}" } @mcp.tool() def read_file(filepath: str) -> Dict[str, Any]: """ Read the content of a file. Args: filepath: Path to the file to read Returns: Dictionary with file content or error message """ try: path = Path(filepath) if not path.exists(): return { "success": False, "error": f"File does not exist: {filepath}" } with open(path, 'r', encoding='utf-8') as f: content = f.read() return { "success": True, "content": content, "size": len(content), "absolute_path": str(path.absolute()), "last_modified": datetime.fromtimestamp(path.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: return { "success": False, "error": f"Failed to read file: {str(e)}" } @mcp.tool() def list_directory(directory_path: str) -> Dict[str, Any]: """ List contents of a directory. Args: directory_path: Path to the directory to list Returns: Dictionary with directory contents or error message """ try: path = Path(directory_path) if not path.exists(): return { "success": False, "error": f"Directory does not exist: {directory_path}" } if not path.is_dir(): return { "success": False, "error": f"Path is not a directory: {directory_path}" } items = [] for item in path.iterdir(): items.append({ "name": item.name, "type": "directory" if item.is_dir() else "file", "size": item.stat().st_size if item.is_file() else None, "last_modified": datetime.fromtimestamp(item.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S") }) return { "success": True, "directory": str(path.absolute()), "items": sorted(items, key=lambda x: (x["type"], x["name"])), "total_items": len(items) } except Exception as e: return { "success": False, "error": f"Failed to list directory: {str(e)}" } @mcp.tool() def execute_command(command: str, working_directory: Optional[str] = None) -> Dict[str, Any]: """ Execute a shell command and return the result. Args: command: The command to execute working_directory: Optional working directory for the command Returns: Dictionary with command output and execution details """ try: # Security check - prevent dangerous commands dangerous_commands = ['rm -rf', 'del /f', 'format', 'shutdown', 'reboot'] if any(dangerous in command.lower() for dangerous in dangerous_commands): return { "success": False, "error": "Command contains potentially dangerous operations and was blocked" } # Set working directory cwd = Path(working_directory) if working_directory else Path.cwd() # Execute command result = subprocess.run( command, shell=True, capture_output=True, text=True, cwd=cwd, timeout=30 # 30-second timeout ) return { "success": True, "command": command, "return_code": result.returncode, "stdout": result.stdout, "stderr": result.stderr, "working_directory": str(cwd.absolute()) } except subprocess.TimeoutExpired: return { "success": False, "error": "Command timed out after 30 seconds" } except Exception as e: return { "success": False, "error": f"Failed to execute command: {str(e)}" } @mcp.tool() def search_files(directory: str, pattern: str, file_extension: Optional[str] = None) -> Dict[str, Any]: """ Search for files containing a specific pattern. Args: directory: Directory to search in pattern: Text pattern to search for file_extension: Optional file extension filter (e.g., '.py', '.txt') Returns: Dictionary with search results """ try: path = Path(directory) if not path.exists(): return { "success": False, "error": f"Directory does not exist: {directory}" } matches = [] # Get all files matching extension filter if file_extension: files = path.rglob(f"*{file_extension}") else: files = path.rglob("*") files = [f for f in files if f.is_file()] for file_path in files: if file_path.is_file(): try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() if pattern.lower() in content.lower(): # Find line numbers containing the pattern lines = content.split('\n') matching_lines = [] for i, line in enumerate(lines, 1): if pattern.lower() in line.lower(): matching_lines.append({ "line_number": i, "content": line.strip() }) matches.append({ "file": str(file_path.relative_to(path)), "absolute_path": str(file_path.absolute()), "matching_lines": matching_lines[:10] # Limit to first 10 matches per file }) except: # Skip files that can't be read (binary files, etc.) continue return { "success": True, "pattern": pattern, "directory": str(path.absolute()), "file_extension": file_extension, "matches": matches, "total_files_with_matches": len(matches) } except Exception as e: return { "success": False, "error": f"Failed to search files: {str(e)}" } @mcp.tool() def calculate_expression(expression: str) -> Dict[str, Any]: """ Safely evaluate a mathematical expression. Args: expression: Mathematical expression to evaluate Returns: Dictionary with calculation result """ try: # Security check - only allow safe mathematical operations allowed_chars = set('0123456789+-*/().= ') allowed_names = {'abs', 'round', 'min', 'max', 'pow', 'sqrt'} # Remove spaces and check characters clean_expr = expression.replace(' ', '') if not all(c in allowed_chars or c.isalnum() for c in clean_expr): # Check if it contains only allowed function names import re tokens = re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', expression) if any(token not in allowed_names for token in tokens): return { "success": False, "error": "Expression contains disallowed characters or functions" } # Import math for advanced functions import math # Create a safe namespace for evaluation safe_dict = { "__builtins__": {}, "abs": abs, "round": round, "min": min, "max": max, "pow": pow, "sqrt": math.sqrt, "sin": math.sin, "cos": math.cos, "tan": math.tan, "pi": math.pi, "e": math.e } result = eval(expression, safe_dict) return { "success": True, "expression": expression, "result": result, "type": type(result).__name__ } except Exception as e: return { "success": False, "error": f"Failed to evaluate expression: {str(e)}" } @mcp.tool() def get_system_info() -> Dict[str, Any]: """ Get system information. Returns: Dictionary with system details """ try: import platform import psutil return { "success": True, "system": platform.system(), "platform": platform.platform(), "processor": platform.processor(), "python_version": platform.python_version(), "cpu_count": psutil.cpu_count(), "memory_total": psutil.virtual_memory().total, "memory_available": psutil.virtual_memory().available, "disk_usage": { "total": psutil.disk_usage('/').total if platform.system() != 'Windows' else psutil.disk_usage('C:').total, "used": psutil.disk_usage('/').used if platform.system() != 'Windows' else psutil.disk_usage('C:').used, "free": psutil.disk_usage('/').free if platform.system() != 'Windows' else psutil.disk_usage('C:').free } } except ImportError: return { "success": False, "error": "psutil package required for system info. Install with: pip install psutil" } except Exception as e: return { "success": False, "error": f"Failed to get system info: {str(e)}" } @mcp.tool() def create_temporary_file(content: str, suffix: str = ".txt") -> Dict[str, Any]: """ Create a temporary file with the specified content. Args: content: Content to write to the temporary file suffix: File extension/suffix for the temporary file Returns: Dictionary with temporary file details """ try: with tempfile.NamedTemporaryFile(mode='w', suffix=suffix, delete=False, encoding='utf-8') as tmp_file: tmp_file.write(content) tmp_path = tmp_file.name return { "success": True, "temporary_file_path": tmp_path, "content_size": len(content), "suffix": suffix, "message": "Temporary file created successfully. Remember to delete it when no longer needed." } except Exception as e: return { "success": False, "error": f"Failed to create temporary file: {str(e)}" } if __name__ == "__main__": # Run the MCP server mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rt0120-Ramco/mcp-py'

If you have feedback or need assistance with the MCP directory API, please join our Discord server