#!/usr/bin/env python3
"""
MCP 演示服务器 - 使用官方 MCP Python SDK
提供数学计算、文本分析和文件读取工具
"""
import asyncio
import json
import logging
import math
import os
import re
from typing import Any, Dict, List, Optional
from pathlib import Path
from mcp.server.fastmcp import FastMCP
# 配置日志 - 重要:MCP STDIO服务器不能使用stdout
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mcp_server.log'),
logging.StreamHandler() # 这会输出到stderr,对STDIO服务器是安全的
]
)
logger = logging.getLogger(__name__)
# 初始化 FastMCP 服务器
mcp = FastMCP("mcpdemo")
# 常量配置
MAX_FILE_SIZE = 2048 # 最大文件大小限制(字节)
SUPPORTED_FILE_EXTENSIONS = {'.txt', '.md', '.json', '.py', '.js', '.html', '.css', '.xml'}
class MathCalculator:
"""数学计算工具类"""
@staticmethod
def safe_eval(expression: str) -> Dict[str, Any]:
"""安全的数学表达式求值"""
try:
# 清理表达式,移除空格
expression = expression.strip()
# 检查是否包含不安全的字符
allowed_chars = set('0123456789+-*/.() ')
allowed_funcs = {'sin', 'cos', 'tan', 'log', 'sqrt', 'abs', 'pow'}
# 基本安全检查
if any(char not in allowed_chars and not char.isalpha() for char in expression):
return {"success": False, "error": "Expression contains invalid characters"}
# 创建安全的执行环境
safe_dict = {
"__builtins__": {},
"sin": math.sin, "cos": math.cos, "tan": math.tan,
"log": math.log, "sqrt": math.sqrt, "abs": abs,
"pow": pow, "pi": math.pi, "e": math.e
}
# 执行计算
result = eval(expression, safe_dict)
return {
"success": True,
"result": result,
"expression": expression
}
except ZeroDivisionError:
return {
"success": False,
"error": "division by zero",
"expression": expression
}
except Exception as e:
return {
"success": False,
"error": str(e),
"expression": expression
}
class TextAnalyzer:
"""文本分析工具类"""
@staticmethod
def analyze_text(text: str) -> Dict[str, Any]:
"""分析文本内容"""
try:
if not text or not isinstance(text, str):
return {"success": False, "error": "Invalid text input"}
# 基本统计
char_count = len(text)
word_count = len(text.split())
sentence_count = len([s for s in re.split(r'[.!?]+', text) if s.strip()])
paragraph_count = len([p for p in text.split('\n\n') if p.strip()])
# 字符分析
letters = sum(1 for c in text if c.isalpha())
digits = sum(1 for c in text if c.isdigit())
spaces = sum(1 for c in text if c.isspace())
punctuation = char_count - letters - digits - spaces
# 词频分析(前10个最常见的词)
words = re.findall(r'\b[a-zA-Z\u4e00-\u9fff]+\b', text.lower())
word_freq = {}
for word in words:
if len(word) > 2: # 忽略长度小于3的词
word_freq[word] = word_freq.get(word, 0) + 1
top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10]
return {
"success": True,
"statistics": {
"characters": char_count,
"words": word_count,
"sentences": sentence_count,
"paragraphs": paragraph_count,
"letters": letters,
"digits": digits,
"spaces": spaces,
"punctuation": punctuation
},
"word_frequency": top_words,
"reading_time_minutes": round(word_count / 200, 1) # 假设阅读速度200词/分钟
}
except Exception as e:
return {"success": False, "error": str(e)}
class FileReader:
"""文件读取工具类"""
@staticmethod
def read_file(file_path: str) -> Dict[str, Any]:
"""安全地读取文件内容"""
try:
# 验证文件路径
path = Path(file_path)
# 安全检查:不允许路径遍历攻击
if '..' in str(path) or str(path).startswith('/'):
return {"success": False, "error": "Invalid file path"}
# 确保文件在当前目录或子目录中
current_dir = Path.cwd()
try:
full_path = (current_dir / path).resolve()
full_path.relative_to(current_dir)
except ValueError:
return {"success": False, "error": "File path outside allowed directory"}
# 检查文件是否存在
if not full_path.exists():
return {"success": False, "error": "File not found"}
# 检查是否是文件
if not full_path.is_file():
return {"success": False, "error": "Path is not a file"}
# 检查文件扩展名
if full_path.suffix.lower() not in SUPPORTED_FILE_EXTENSIONS:
return {
"success": False,
"error": f"Unsupported file type. Supported: {', '.join(SUPPORTED_FILE_EXTENSIONS)}"
}
# 检查文件大小
file_size = full_path.stat().st_size
if file_size > MAX_FILE_SIZE:
return {
"success": False,
"error": f"File too large (max {MAX_FILE_SIZE} bytes)"
}
# 读取文件内容
try:
content = full_path.read_text(encoding='utf-8')
except UnicodeDecodeError:
try:
content = full_path.read_text(encoding='gbk')
except UnicodeDecodeError:
return {"success": False, "error": "Unable to decode file content"}
return {
"success": True,
"content": content,
"file_path": str(path),
"file_size": file_size,
"encoding": "utf-8"
}
except Exception as e:
return {"success": False, "error": str(e)}
# MCP 工具定义
@mcp.tool()
async def calculator(expression: str) -> str:
"""
执行数学计算
Args:
expression: 要计算的数学表达式 (支持 +, -, *, /, sin, cos, tan, log, sqrt, abs, pow, pi, e)
Returns:
计算结果的JSON字符串
"""
logger.info(f"计算器工具被调用,表达式: {expression}")
result = MathCalculator.safe_eval(expression)
return json.dumps(result, indent=2, ensure_ascii=False)
@mcp.tool()
async def text_analyzer(text: str) -> str:
"""
分析文本内容,包括字符统计、词频分析等
Args:
text: 要分析的文本内容
Returns:
文本分析结果的JSON字符串
"""
logger.info(f"文本分析工具被调用,文本长度: {len(text) if text else 0}")
result = TextAnalyzer.analyze_text(text)
return json.dumps(result, indent=2, ensure_ascii=False)
@mcp.tool()
async def file_reader(file_path: str) -> str:
"""
读取文件内容
Args:
file_path: 要读取的文件路径 (相对路径,支持 .txt, .md, .json, .py, .js, .html, .css, .xml)
Returns:
文件内容和元信息的JSON字符串
"""
logger.info(f"文件读取工具被调用,文件路径: {file_path}")
result = FileReader.read_file(file_path)
return json.dumps(result, indent=2, ensure_ascii=False)
@mcp.resource("config://server")
async def get_server_config() -> str:
"""获取服务器配置信息"""
config = {
"name": "MCP Demo Server",
"version": "2.0.0",
"description": "基于官方MCP Python SDK的演示服务器",
"tools": [
"calculator - 数学计算工具",
"text_analyzer - 文本分析工具",
"file_reader - 文件读取工具"
],
"resources": [
"config://server - 服务器配置",
"status://server - 服务器状态",
"logs://server - 服务器日志"
],
"max_file_size": MAX_FILE_SIZE,
"supported_file_types": list(SUPPORTED_FILE_EXTENSIONS)
}
return json.dumps(config, indent=2, ensure_ascii=False)
@mcp.resource("status://server")
async def get_server_status() -> str:
"""获取服务器状态信息"""
status = {
"status": "running",
"uptime": "运行中",
"tools_count": 3,
"resources_count": 3,
"memory_usage": "N/A",
"last_activity": "N/A"
}
return json.dumps(status, indent=2, ensure_ascii=False)
@mcp.resource("logs://server")
async def get_server_logs() -> str:
"""获取服务器日志"""
logs = [
"[INFO] MCP Server 启动成功",
"[INFO] 工具加载完成: calculator, text_analyzer, file_reader",
"[INFO] 资源初始化完成: config, status, logs",
"[INFO] 服务器准备就绪,等待连接..."
]
return "\n".join(logs)
def main():
"""主函数"""
logger.info("启动 MCP 演示服务器...")
logger.info(f"支持的工具: calculator, text_analyzer, file_reader")
logger.info(f"支持的资源: config://server, status://server, logs://server")
# 运行 MCP 服务器
mcp.run()
if __name__ == "__main__":
main()