#!/usr/bin/env python3
"""
Chatlog MCP Server
A Model Context Protocol (MCP) server for analyzing chat logs
"""
import asyncio
import httpx
import os
import logging
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types
from typing import Any, Dict, List, Optional
import json
# HTTP API 配置 - 从环境变量读取或使用默认值
API_BASE_URL = os.environ.get("CHATLOG_API_URL", "http://127.0.0.1:5030/api/v1")
LOG_LEVEL = os.environ.get("CHATLOG_LOG_LEVEL", "info").upper()
# 设置日志
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 创建服务器实例
server = Server("chatlog")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""返回可用的工具列表"""
return [
types.Tool(
name="list_sessions",
description="获取会话列表",
inputSchema={
"type": "object",
"properties": {
"format": {
"type": "string",
"description": "输出格式:json/text(可选)",
"default": "text"
}
}
}
),
types.Tool(
name="list_chatrooms",
description="获取群聊列表",
inputSchema={
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "搜索关键词(可选)"
},
"format": {
"type": "string",
"description": "输出格式:json/text(可选)",
"default": "text"
}
}
}
),
types.Tool(
name="list_contacts",
description="获取联系人列表",
inputSchema={
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "搜索关键词(可选)"
},
"format": {
"type": "string",
"description": "输出格式:json/text(可选)",
"default": "text"
}
}
}
),
types.Tool(
name="get_chatlog",
description="获取聊天记录",
inputSchema={
"type": "object",
"properties": {
"time": {
"type": "string",
"description": "时间范围,如 '2026-01-10' 或 '2026-01-10~2026-01-13'"
},
"talker": {
"type": "string",
"description": "聊天对象 ID 或群聊 ID"
},
"sender": {
"type": "string",
"description": "发送者 ID(可选)"
},
"keyword": {
"type": "string",
"description": "关键词搜索(可选)"
},
"format": {
"type": "string",
"description": "输出格式:json/text(可选)",
"default": "text"
}
},
"required": []
}
)
]
async def call_api(endpoint: str, params: Dict[str, Any]) -> str:
"""调用 HTTP API"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{API_BASE_URL}/{endpoint}",
params=params,
timeout=30
)
response.raise_for_status()
return response.text
except Exception as e:
return f"API 调用失败: {str(e)}"
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]:
"""处理工具调用"""
try:
if name == "list_sessions":
params = {"format": arguments.get("format", "text")}
result = await call_api("session", params)
return [types.TextContent(type="text", text=result)]
elif name == "list_chatrooms":
params = {"format": arguments.get("format", "text")}
if arguments.get("keyword"):
params["keyword"] = arguments["keyword"]
result = await call_api("chatroom", params)
return [types.TextContent(type="text", text=result)]
elif name == "list_contacts":
params = {"format": arguments.get("format", "text")}
if arguments.get("keyword"):
params["keyword"] = arguments["keyword"]
result = await call_api("contact", params)
return [types.TextContent(type="text", text=result)]
elif name == "get_chatlog":
params = {}
# 只有当参数存在时才添加到请求中
if arguments.get("time"):
params["time"] = arguments["time"]
if arguments.get("talker"):
params["talker"] = arguments["talker"]
if arguments.get("sender"):
params["sender"] = arguments["sender"]
if arguments.get("keyword"):
params["keyword"] = arguments["keyword"]
if arguments.get("format"):
params["format"] = arguments["format"]
result = await call_api("chatlog", params)
return [types.TextContent(type="text", text=result)]
else:
return [types.TextContent(type="text", text=f"未知工具: {name}")]
except Exception as e:
return [types.TextContent(type="text", text=f"错误: {str(e)}")]
async def main():
"""Main function"""
logger.info(f"Starting Chatlog MCP Server v1.0.0")
logger.info(f"API base URL: {API_BASE_URL}")
# Use stdio transport to start server
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="chatlog",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
)
)
def run():
"""Entry point for the server"""
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}")
raise
if __name__ == "__main__":
print("Chatlog MCP Server starting...")
print(f"API base address: {API_BASE_URL}")
print("Server ready")
run()