# server_robust.py
from fastmcp import FastMCP
import os
import json
import hashlib
import base64
import datetime
import sys
import logging
from pathlib import Path
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mcp_server.log'),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger(__name__)
mcp = FastMCP("local-utils", version="0.1.0")
logger.info("Starting Local Utilities MCP Server...")
@mcp.tool(
description="Convert between °C and °F"
)
async def convert_temp(value: float, unit: str) -> str:
"""Convert temperature between Celsius and Fahrenheit."""
try:
logger.info(f"Converting temperature: {value}°{unit}")
if unit.upper() == "C":
result = f"{(value * 9/5) + 32:.2f} °F"
elif unit.upper() == "F":
result = f"{(value - 32) * 5/9:.2f} °C"
else:
raise ValueError("unit must be C or F")
logger.info(f"Conversion result: {result}")
return result
except Exception as e:
logger.error(f"Error in convert_temp: {e}")
return f"Error: {str(e)}"
@mcp.tool(
description="Read the contents of a text file"
)
async def read_file(file_path: str) -> str:
"""Read and return the contents of a text file."""
try:
logger.info(f"Reading file: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
logger.info(f"Successfully read {len(content)} characters from {file_path}")
return f"File contents of {file_path}:\n\n{content}"
except FileNotFoundError:
error_msg = f"Error: File '{file_path}' not found."
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Error reading file: {str(e)}"
logger.error(error_msg)
return error_msg
@mcp.tool(
description="Write text content to a file"
)
async def write_file(file_path: str, content: str) -> str:
"""Write content to a text file."""
try:
logger.info(f"Writing to file: {file_path}")
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
logger.info(f"Successfully wrote {len(content)} characters to {file_path}")
return f"Successfully wrote content to {file_path}"
except Exception as e:
error_msg = f"Error writing file: {str(e)}"
logger.error(error_msg)
return error_msg
@mcp.tool(
description="List files and directories in a given path"
)
async def list_directory(directory_path: str = ".") -> str:
"""List contents of a directory."""
try:
logger.info(f"Listing directory: {directory_path}")
path = Path(directory_path)
if not path.exists():
error_msg = f"Error: Directory '{directory_path}' does not exist."
logger.error(error_msg)
return error_msg
items = []
for item in sorted(path.iterdir()):
if item.is_dir():
items.append(f"📁 {item.name}/")
else:
size = item.stat().st_size
items.append(f"📄 {item.name} ({size} bytes)")
result = f"Contents of {directory_path}:\n" + "\n".join(items)
logger.info(f"Listed {len(items)} items in {directory_path}")
return result
except Exception as e:
error_msg = f"Error listing directory: {str(e)}"
logger.error(error_msg)
return error_msg
@mcp.tool(
description="Calculate hash (MD5, SHA1, SHA256) of text or file"
)
async def calculate_hash(text_or_path: str, hash_type: str = "sha256", is_file: bool = False) -> str:
"""Calculate hash of text or file content."""
try:
logger.info(f"Calculating {hash_type} hash, is_file: {is_file}")
hash_functions = {
"md5": hashlib.md5,
"sha1": hashlib.sha1,
"sha256": hashlib.sha256
}
if hash_type.lower() not in hash_functions:
error_msg = f"Error: Unsupported hash type. Use: {', '.join(hash_functions.keys())}"
logger.error(error_msg)
return error_msg
hash_func = hash_functions[hash_type.lower()]()
if is_file:
with open(text_or_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_func.update(chunk)
result = hash_func.hexdigest()
return f"{hash_type.upper()} hash of file '{text_or_path}': {result}"
else:
hash_func.update(text_or_path.encode('utf-8'))
result = hash_func.hexdigest()
return f"{hash_type.upper()} hash of text: {result}"
except Exception as e:
error_msg = f"Error calculating hash: {str(e)}"
logger.error(error_msg)
return error_msg
@mcp.tool(
description="Encode/decode text using Base64"
)
async def base64_encode_decode(text: str, operation: str = "encode") -> str:
"""Encode or decode text using Base64."""
try:
logger.info(f"Base64 {operation} operation")
if operation.lower() == "encode":
encoded = base64.b64encode(text.encode('utf-8')).decode('utf-8')
return f"Base64 encoded: {encoded}"
elif operation.lower() == "decode":
decoded = base64.b64decode(text).decode('utf-8')
return f"Base64 decoded: {decoded}"
else:
error_msg = "Error: Operation must be 'encode' or 'decode'"
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(error_msg)
return error_msg
@mcp.tool(
description="Get current date and time information"
)
async def get_datetime_info(format_string: str = "%Y-%m-%d %H:%M:%S") -> str:
"""Get current date and time with optional custom format."""
try:
logger.info(f"Getting datetime info with format: {format_string}")
now = datetime.datetime.now()
formatted_time = now.strftime(format_string)
info = f"""Current Date & Time Information:
Formatted: {formatted_time}
ISO Format: {now.isoformat()}
Timestamp: {now.timestamp()}
Weekday: {now.strftime('%A')}
Year: {now.year}
Month: {now.month} ({now.strftime('%B')})
Day: {now.day}
Hour: {now.hour}
Minute: {now.minute}
Second: {now.second}"""
return info
except Exception as e:
error_msg = f"Error getting datetime info: {str(e)}"
logger.error(error_msg)
return error_msg
@mcp.tool(
description="Count words, characters, and lines in text"
)
async def text_stats(text: str) -> str:
"""Calculate statistics for the given text."""
try:
logger.info("Calculating text statistics")
lines = text.split('\n')
words = text.split()
characters = len(text)
characters_no_spaces = len(text.replace(' ', ''))
return f"""Text Statistics:
Lines: {len(lines)}
Words: {len(words)}
Characters (with spaces): {characters}
Characters (without spaces): {characters_no_spaces}
Average words per line: {len(words) / len(lines):.2f}
Average characters per word: {characters / len(words):.2f if words else 0}"""
except Exception as e:
error_msg = f"Error calculating text stats: {str(e)}"
logger.error(error_msg)
return error_msg
@mcp.tool(
description="Generate a simple password with specified length and character sets"
)
async def generate_password(length: int = 12, include_uppercase: bool = True,
include_lowercase: bool = True, include_numbers: bool = True,
include_symbols: bool = False) -> str:
"""Generate a random password."""
try:
import random
import string
logger.info(f"Generating password of length {length}")
if length < 1:
error_msg = "Error: Password length must be at least 1"
logger.error(error_msg)
return error_msg
characters = ""
if include_lowercase:
characters += string.ascii_lowercase
if include_uppercase:
characters += string.ascii_uppercase
if include_numbers:
characters += string.digits
if include_symbols:
characters += "!@#$%^&*()_+-=[]{}|;:,.<>?"
if not characters:
error_msg = "Error: At least one character set must be selected"
logger.error(error_msg)
return error_msg
password = ''.join(random.choice(characters) for _ in range(length))
logger.info("Password generated successfully")
return f"Generated password: {password}"
except Exception as e:
error_msg = f"Error generating password: {str(e)}"
logger.error(error_msg)
return error_msg
if __name__ == "__main__":
try:
logger.info("Local Utilities MCP Server starting...")
logger.info(f"Python version: {sys.version}")
logger.info(f"Working directory: {os.getcwd()}")
# Transport can be 'stdio' (default), 'http', or 'sse'
mcp.run() # = mcp.run("stdio")
except Exception as e:
logger.error(f"Fatal error starting server: {e}")
sys.exit(1)