MCP Chat Analysis Server
- src
- mcp_chat_analysis
#!/usr/bin/env python3
from typing import Dict, Any, List, Optional
import asyncio
import os
from pathlib import Path
import logging
from datetime import datetime
from modelcontextprotocol import Server, StdioServerTransport
from modelcontextprotocol.types import (
CallToolRequestSchema,
ErrorCode,
ListToolsRequestSchema,
McpError,
)
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.http import models as qdrant_models
from neo4j import AsyncGraphDatabase
from .models import ConversationData, SearchQuery, MetricsRequest, ConceptRequest
from .processors import ConversationProcessor
from .embeddings import EmbeddingGenerator
logger = logging.getLogger(__name__)
class ChatAnalysisServer:
"""MCP server providing chat analysis capabilities"""
def __init__(self):
# Initialize MCP server
self.server = Server(
{
"name": "mcp-server-chat-analysis",
"version": "0.1.0",
},
{
"capabilities": {
"tools": {},
}
}
)
# Load configuration from environment
self.qdrant_url = os.getenv("QDRANT_URL", "http://localhost:6333")
self.qdrant_api_key = os.getenv("QDRANT_API_KEY")
self.neo4j_url = os.getenv("NEO4J_URL", "bolt://localhost:7687")
self.neo4j_user = os.getenv("NEO4J_USER", "neo4j")
self.neo4j_password = os.getenv("NEO4J_PASSWORD")
self.embedding_model = os.getenv(
"EMBEDDING_MODEL",
"sentence-transformers/all-MiniLM-L6-v2"
)
# Initialize components
self._setup_clients()
self._setup_tools()
logger.info(f"Initialized ChatAnalysisServer with model: {self.embedding_model}")
def _setup_clients(self):
"""Initialize database clients and processors"""
# Initialize Qdrant client
self.qdrant = QdrantClient(
url=self.qdrant_url,
api_key=self.qdrant_api_key
)
# Initialize Neo4j driver
self.neo4j = AsyncGraphDatabase.driver(
self.neo4j_url,
auth=(self.neo4j_user, self.neo4j_password)
)
# Initialize embedding model
self.embedding_generator = EmbeddingGenerator(
model_name=self.embedding_model
)
# Initialize processor
self.processor = ConversationProcessor(
qdrant=self.qdrant,
neo4j=self.neo4j,
embedding_generator=self.embedding_generator
)
def _setup_tools(self):
"""Set up MCP tools"""
self.server.set_request_handler(
ListToolsRequestSchema,
self._handle_list_tools
)
self.server.set_request_handler(
CallToolRequestSchema,
self._handle_call_tool
)
async def _handle_list_tools(self, _):
"""Handle tool listing request"""
return {
"tools": [
{
"name": "import_conversations",
"description": "Import and analyze chat conversations",
"inputSchema": ConversationData.schema()
},
{
"name": "semantic_search",
"description": "Search conversations by semantic similarity",
"inputSchema": SearchQuery.schema()
},
{
"name": "analyze_metrics",
"description": "Analyze conversation metrics",
"inputSchema": MetricsRequest.schema()
},
{
"name": "extract_concepts",
"description": "Extract and analyze concepts from conversations",
"inputSchema": ConceptRequest.schema()
}
]
}
async def _handle_call_tool(self, request):
"""Handle tool execution request"""
try:
if request.params.name == "import_conversations":
return await self._import_conversations(request.params.arguments)
elif request.params.name == "semantic_search":
return await self._semantic_search(request.params.arguments)
elif request.params.name == "analyze_metrics":
return await self._analyze_metrics(request.params.arguments)
elif request.params.name == "extract_concepts":
return await self._extract_concepts(request.params.arguments)
else:
raise McpError(
ErrorCode.MethodNotFound,
f"Unknown tool: {request.params.name}"
)
except Exception as e:
logger.error(f"Error handling tool {request.params.name}: {e}")
raise McpError(ErrorCode.InternalError, str(e))
async def _import_conversations(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Import and process conversations"""
data = ConversationData(**args)
stats = await self.processor.process_conversations(data)
return {
"content": [{
"type": "text",
"text": f"Imported conversations:\n{stats}"
}]
}
async def _semantic_search(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Perform semantic search"""
query = SearchQuery(**args)
results = await self.processor.search(query)
return {
"content": [{
"type": "text",
"text": f"Search results:\n{results}"
}]
}
async def _analyze_metrics(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze conversation metrics"""
request = MetricsRequest(**args)
metrics = await self.processor.analyze_metrics(request)
return {
"content": [{
"type": "text",
"text": f"Metrics analysis:\n{metrics}"
}]
}
async def _extract_concepts(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Extract and analyze concepts"""
request = ConceptRequest(**args)
concepts = await self.processor.extract_concepts(request)
return {
"content": [{
"type": "text",
"text": f"Extracted concepts:\n{concepts}"
}]
}
async def run(self):
"""Run the MCP server"""
transport = StdioServerTransport()
await self.server.connect(transport)
logger.info("Chat Analysis MCP server running on stdio")
def main():
"""Entry point for the MCP server"""
# Configure logging
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Run server
server = ChatAnalysisServer()
asyncio.run(server.run())
if __name__ == "__main__":
main()