from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import uvicorn
import asyncio
import json
import logging
import os
from typing import List, Optional
from pathlib import Path
from rich.markdown import Markdown
import argparse
from mcpcli.messages.send_ping import send_ping
from mcpcli.messages.send_tools_list import send_tools_list
from mcpcli.messages.send_resources import send_resources_list
from mcpcli.messages.send_prompts import send_prompts_list
from mcpcli.messages.send_initialize_message import send_initialize
from mcpcli.messages.send_call_tool import send_call_tool
from mcpcli.transport.stdio.stdio_client import stdio_client
from mcpcli.transport.stdio.stdio_server import stdio_server
from mcpcli.config import load_config
from mcpcli.chat_handler import handle_chat_mode, get_input
from mcpcli.llm_client import LLMClient
from mcpcli.tools_handler import convert_to_openai_tools
from mcpcli.chat_handler import generate_system_prompt
app = FastAPI(
title="MCP CLI API",
description="Model Context Provider CLI API",
version="1.0.0"
)
# 預設配置
DEFAULT_CONFIG_FILE = "server_config.json"
# 配置日誌
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
class GlobalState:
def __init__(self):
self.server_streams = []
self.context_managers = []
self.current_provider = os.getenv("LLM_PROVIDER", "openai")
self.current_model = os.getenv("LLM_MODEL", "gpt-4o-mini")
self.chat_history = []
state = GlobalState()
class CommandResponse(BaseModel):
status: str
message: str
data: Optional[dict] = None
class ToolCallRequest(BaseModel):
tool_name: str
arguments: dict
class ChatRequest(BaseModel):
message: str
provider: Optional[str] = None
model: Optional[str] = None
async def initialize_server(config_path: str, server_name: str, port: Optional[int] = None):
"""初始化伺服器連接"""
# 載入伺服器配置,傳入端口參數
server_params = await load_config(config_path, server_name, port)
# 建立 stdio 通信
cm = stdio_client(server_params)
read_stream, write_stream = await cm.__aenter__()
# 初始化連接
init_result = await send_initialize(read_stream, write_stream)
if not init_result:
await cm.__aexit__(None, None, None)
raise HTTPException(status_code=500, detail=f"伺服器初始化失敗: {server_name}")
return cm, read_stream, write_stream
def connect_to_mcp_server(args):
"""連接到指定的 MCP 伺服器"""
try:
# 設置 API keys 到環境變數
if args.openai_key:
os.environ['OPENAI_API_KEY'] = args.openai_key
logging.info("已設置 OpenAI API key")
if args.anthropic_key:
os.environ['ANTHROPIC_API_KEY'] = args.anthropic_key
logging.info("已設置 Anthropic API key")
if args.google_key:
os.environ['GOOGLE_API_KEY'] = args.google_key
logging.info("已設置 Google API key")
# 原有的連接邏輯保持不變
server_url = f"http://{args.host}:{args.port}"
logging.info(f"正在連接到 MCP 伺服器: {server_url}")
# 其餘原有的連接代碼...
return True
except Exception as e:
logging.error(f"連接 MCP 伺服器失敗: {str(e)}")
return False
@app.post("/connect/{server_name}", response_model=CommandResponse, tags=["Server"])
async def connect_server(
server_name: str,
config_file: str = DEFAULT_CONFIG_FILE,
port: Optional[int] = None # 新增可選的端口參數
):
"""連接到指定的 MCP 伺服器"""
try:
# 清理舊連接
for cm in state.context_managers:
await cm.__aexit__(None, None, None)
state.context_managers.clear()
state.server_streams.clear()
# 初始化新連接,傳入端口參數
cm, read_stream, write_stream = await initialize_server(config_file, server_name, port)
state.context_managers.append(cm)
state.server_streams.append((read_stream, write_stream))
# 準備回應訊息
port_info = f" (使用端口: {port})" if port else ""
return CommandResponse(
status="success",
message=f"已成功連接到伺服器: {server_name}{port_info}",
data={
"server": server_name,
"port": port
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/server", tags=["Server"])
async def run_server(host: str = '127.0.0.1', port: int = 8765):
"""啟動 MCP 伺服器"""
try:
logging.info(f"正在啟動 MCP 伺服器於 {host}:{port}")
# 建立伺服器
server = await asyncio.start_server(
stdio_server,
host=host,
port=port
)
addr = server.sockets[0].getsockname()
return CommandResponse(
status="success",
message=f"伺服器已啟動於 {addr}",
data={"host": host, "port": port}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"伺服器啟動失敗: {str(e)}")
@app.post("/ping", response_model=CommandResponse, tags=["Commands"])
async def handle_ping():
"""測試伺服器連線"""
try:
if not state.server_streams:
raise HTTPException(status_code=400, detail="尚未連接到伺服器")
results = []
for i, (read_stream, write_stream) in enumerate(state.server_streams):
result = await send_ping(read_stream, write_stream)
server_num = i + 1
results.append({
"server": server_num,
"status": "正常" if result else "失敗"
})
return CommandResponse(
status="success",
message="伺服器測試完成",
data={"results": results}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/tools", response_model=CommandResponse, tags=["Commands"])
async def handle_list_tools():
"""列出可用工具"""
try:
if not state.server_streams:
raise HTTPException(status_code=400, detail="尚未連接到伺服器")
results = []
for i, (read_stream, write_stream) in enumerate(state.server_streams):
response = await send_tools_list(read_stream, write_stream)
server_num = i + 1
tools_list = response.get("tools", [])
results.append({
"server": server_num,
"tools": tools_list
})
return CommandResponse(
status="success",
message="成功獲取工具列表",
data={"results": results}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/call-tool", response_model=CommandResponse, tags=["Commands"])
async def handle_call_tool(request: ToolCallRequest):
"""呼叫指定工具"""
try:
if not state.server_streams:
raise HTTPException(status_code=400, detail="尚未連接到伺服器")
if not request.tool_name:
return CommandResponse(
status="error",
message="Tool name cannot be empty.",
data=None
)
# 記錄呼叫資訊
logging.info(f"\n正在呼叫工具 '{request.tool_name}' 使用參數:\n")
logging.info(json.dumps(request.arguments, indent=2))
results = []
for read_stream, write_stream in state.server_streams:
try:
result = await send_call_tool(
request.tool_name,
request.arguments,
read_stream,
write_stream
)
# 檢查結果是否為 None
if result is None:
continue
# 檢查是否有錯誤
if isinstance(result, dict) and result.get("isError"):
continue
# 獲取內容
response_content = result.get("content") if isinstance(result, dict) else result
if not response_content:
continue
# 檢查錯誤訊息
if isinstance(response_content, list) and response_content and \
isinstance(response_content[0], dict) and \
response_content[0].get('text', '').startswith('Error:'):
continue
results.append(response_content)
except Exception as e:
logging.error(f"工具呼叫錯誤: {str(e)}")
continue
if not results:
return CommandResponse(
status="error",
message="所有伺服器呼叫都失敗了",
data={"error": "No successful tool calls"}
)
return CommandResponse(
status="success",
message="工具呼叫完成",
data={
"tool_name": request.tool_name,
"arguments": request.arguments,
"results": results
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/resources", response_model=CommandResponse, tags=["Commands"])
async def handle_list_resources():
"""列出可用資源"""
try:
if not state.server_streams:
raise HTTPException(status_code=400, detail="尚未連接到伺服器")
results = []
for i, (read_stream, write_stream) in enumerate(state.server_streams):
response = await send_resources_list(read_stream, write_stream)
server_num = i + 1
resources_list = response.get("resources", []) if response else None
results.append({
"server": server_num,
"resources": resources_list
})
return CommandResponse(
status="success",
message="成功獲取資源列表",
data={"results": results}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/prompts", response_model=CommandResponse, tags=["Commands"])
async def handle_list_prompts():
"""列出可用提示"""
try:
if not state.server_streams:
raise HTTPException(status_code=400, detail="尚未連接到伺服器")
results = []
for i, (read_stream, write_stream) in enumerate(state.server_streams):
response = await send_prompts_list(read_stream, write_stream)
server_num = i + 1
prompts_list = response.get("prompts", []) if response else None
results.append({
"server": server_num,
"prompts": prompts_list
})
return CommandResponse(
status="success",
message="成功獲取提示列表",
data={"results": results}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/chat", response_model=CommandResponse, tags=["Chat"])
async def handle_chat(
request: ChatRequest,
ollama_host: Optional[str] = None
):
"""處理聊天請求"""
try:
if not state.server_streams:
raise HTTPException(status_code=400, detail="尚未連接到伺服器")
# 使用請求中的 provider 和 model,如果沒有則使用預設值
provider = request.provider or state.current_provider
model = request.model or state.current_model
# 取得工具列表
read_stream, write_stream = state.server_streams[0]
tools_response = await send_tools_list(read_stream, write_stream)
tools = tools_response.get("tools", [])
# 建立 LLM 客戶端
client = LLMClient(
provider=provider,
model=model,
ollama_host=ollama_host or os.getenv("OLLAMA_HOST")
)
# 生成系統提示
system_prompt = generate_system_prompt(tools)
# 建立完整的訊息歷史
messages = [
{"role": "system", "content": system_prompt}
] + state.chat_history + [
{"role": "user", "content": request.message}
]
# 轉換工具格式
openai_tools = convert_to_openai_tools(tools)
# 取得 AI 回應
response = client.create_completion(messages, tools=openai_tools)
print(f"response: {response}")
# 取得回應文本和工具呼叫
ai_message = response.get("response", "")
print(f"ai_message: {ai_message}")
tool_calls = response.get("tool_calls", [])
print(f"tool_calls: {tool_calls}")
# 處理工具調用
tool_results = []
if tool_calls:
for tool_call in tool_calls:
try:
# 解析工具調用參數
tool_name = tool_call.function.name if hasattr(tool_call, 'function') else tool_call["function"]["name"]
tool_args = tool_call.function.arguments if hasattr(tool_call, 'function') else tool_call["function"]["arguments"]
# 確保參數是字典格式
if isinstance(tool_args, str):
tool_args = json.loads(tool_args)
logging.info(f"Calling tool: {tool_name} with args: {tool_args}")
# 發送工具調用請求 - 修正參數順序
tool_result = await send_call_tool(
tool_name, # 工具名稱
tool_args, # 工具參數
read_stream, # 讀取流
write_stream # 寫入流
)
tool_results.append({
"tool": tool_name,
"result": tool_result
})
# 將工具調用結果添加到聊天歷史
state.chat_history.append({
"role": "function",
"name": tool_name,
"content": json.dumps(tool_result) if isinstance(tool_result, dict) else str(tool_result)
})
except Exception as e:
logging.error(f"工具調用錯誤: {str(e)}")
logging.error(f"工具調用數據: {tool_call}")
tool_results.append({
"tool": tool_name if 'tool_name' in locals() else "unknown",
"error": str(e)
})
# 更新聊天歷史
state.chat_history.append({
"role": "user",
"content": request.message
})
if ai_message:
state.chat_history.append({
"role": "assistant",
"content": ai_message
})
return CommandResponse(
status="success",
message="成功處理聊天請求",
data={
"response": ai_message,
"tool_calls": tool_results
}
)
except Exception as e:
logging.error(f"聊天處理錯誤: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/chat/clear", response_model=CommandResponse, tags=["Chat"])
async def clear_chat_history():
"""清除聊天歷史"""
try:
state.chat_history.clear()
return CommandResponse(
status="success",
message="聊天歷史已清除",
data={"cleared": True}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def main():
"""主入口點"""
parser = argparse.ArgumentParser(description='MCP Web Client')
# 保留原有的參數
parser.add_argument('--host', default='localhost', help='MCP server host')
parser.add_argument('--port', type=int, default=8080, help='MCP server port')
# 添加 API key 參數
parser.add_argument('--openai-key', help='OpenAI API key')
parser.add_argument('--anthropic-key', help='Anthropic API key')
parser.add_argument('--google-key', help='Google API key')
args = parser.parse_args()
# 設置 API keys 到環境變數
if args.openai_key:
os.environ['OPENAI_API_KEY'] = args.openai_key
logging.info("已設置 OpenAI API key")
if args.anthropic_key:
os.environ['ANTHROPIC_API_KEY'] = args.anthropic_key
logging.info("已設置 Anthropic API key")
if args.google_key:
os.environ['GOOGLE_API_KEY'] = args.google_key
logging.info("已設置 Google API key")
# 啟動 FastAPI 服務
uvicorn.run(
app,
host="127.0.0.1",
port=7860,
log_level="info"
)
if __name__ == "__main__":
main()