#!/usr/bin/env python3
"""
MCP Server for Simple Data Manager with AI
"""
import asyncio
import json
import sys
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
from app.models.data_model import DataModel
from app.ai.local_model import ai_model
# Initialize the data model
data_model = DataModel()
# Create MCP server
server = Server("simple-mcp-server-with-ai")
@server.list_tools()
async def handle_list_tools() -> ListToolsResult:
"""List available tools"""
return ListToolsResult(
tools=[
# Data management tools
Tool(
name="get_all_items",
description="Get all items from the data store",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="get_item_by_id",
description="Get a specific item by its ID",
inputSchema={
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "The ID of the item to retrieve"
}
},
"required": ["id"]
}
),
Tool(
name="create_item",
description="Create a new item in the data store",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the item"
},
"description": {
"type": "string",
"description": "Description of the item"
},
"category": {
"type": "string",
"description": "Category of the item"
}
},
"required": ["name"]
}
),
Tool(
name="update_item",
description="Update an existing item by ID",
inputSchema={
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "The ID of the item to update"
},
"name": {
"type": "string",
"description": "New name for the item"
},
"description": {
"type": "string",
"description": "New description for the item"
},
"category": {
"type": "string",
"description": "New category for the item"
}
},
"required": ["id"]
}
),
Tool(
name="delete_item",
description="Delete an item by ID",
inputSchema={
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "The ID of the item to delete"
}
},
"required": ["id"]
}
),
Tool(
name="search_items",
description="Search items by query string",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query to find matching items"
}
},
"required": ["query"]
}
),
# AI model tools
Tool(
name="get_ai_model_info",
description="Get information about the loaded AI model",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="change_ai_model",
description="Change the AI model type and name",
inputSchema={
"type": "object",
"properties": {
"model_type": {
"type": "string",
"description": "Type of model (sentence_transformer, text_generation, text_classification, sentiment_analysis, tfidf)"
},
"model_name": {
"type": "string",
"description": "Name of the model to load"
}
},
"required": ["model_type", "model_name"]
}
),
Tool(
name="analyze_text",
description="Analyze text using the AI model",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Text to analyze"
}
},
"required": ["text"]
}
),
Tool(
name="analyze_all_items",
description="Analyze all items using the AI model",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="find_similar_items",
description="Find items similar to a query using AI embeddings",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"top_k": {
"type": "integer",
"description": "Number of similar items to return",
"default": 5
}
},
"required": ["query"]
}
),
Tool(
name="analyze_single_item",
description="Analyze a specific item using the AI model",
inputSchema={
"type": "object",
"properties": {
"item_id": {
"type": "string",
"description": "ID of the item to analyze"
}
},
"required": ["item_id"]
}
),
Tool(
name="smart_search",
description="Smart search combining traditional search with AI similarity",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"top_k": {
"type": "integer",
"description": "Number of results to return",
"default": 10
}
},
"required": ["query"]
}
)
]
)
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Handle tool calls"""
try:
# Data management tools
if name == "get_all_items":
items = await data_model.get_all_items()
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Found {len(items)} items:\n{json.dumps(items, indent=2)}"
)
]
)
elif name == "get_item_by_id":
item_id = arguments.get("id")
if not item_id:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Item ID is required"
)
]
)
item = await data_model.get_item_by_id(item_id)
if not item:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Item with ID {item_id} not found"
)
]
)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Item found:\n{json.dumps(item, indent=2)}"
)
]
)
elif name == "create_item":
name_val = arguments.get("name")
if not name_val:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Item name is required"
)
]
)
item_data = {
"name": name_val,
"description": arguments.get("description", ""),
"category": arguments.get("category", "")
}
new_item = await data_model.create_item(item_data)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Item created successfully:\n{json.dumps(new_item, indent=2)}"
)
]
)
elif name == "update_item":
item_id = arguments.get("id")
if not item_id:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Item ID is required"
)
]
)
update_data = {}
if "name" in arguments:
update_data["name"] = arguments["name"]
if "description" in arguments:
update_data["description"] = arguments["description"]
if "category" in arguments:
update_data["category"] = arguments["category"]
updated_item = await data_model.update_item(item_id, update_data)
if not updated_item:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Item with ID {item_id} not found"
)
]
)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Item updated successfully:\n{json.dumps(updated_item, indent=2)}"
)
]
)
elif name == "delete_item":
item_id = arguments.get("id")
if not item_id:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Item ID is required"
)
]
)
deleted_item = await data_model.delete_item(item_id)
if not deleted_item:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Item with ID {item_id} not found"
)
]
)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Item deleted successfully:\n{json.dumps(deleted_item, indent=2)}"
)
]
)
elif name == "search_items":
query = arguments.get("query")
if not query:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Search query is required"
)
]
)
results = await data_model.search_items(query)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Search results for '{query}':\n{json.dumps(results, indent=2)}"
)
]
)
# AI model tools
elif name == "get_ai_model_info":
info = ai_model.get_model_info()
return CallToolResult(
content=[
TextContent(
type="text",
text=f"AI Model Information:\n{json.dumps(info, indent=2)}"
)
]
)
elif name == "change_ai_model":
model_type = arguments.get("model_type")
model_name = arguments.get("model_name")
if not model_type or not model_name:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Both model_type and model_name are required"
)
]
)
result = await ai_model.change_model(model_type, model_name)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Model changed successfully:\n{json.dumps(result, indent=2)}"
)
]
)
elif name == "analyze_text":
text = arguments.get("text")
if not text:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Text is required"
)
]
)
result = await ai_model.analyze_text(text)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Text analysis results:\n{json.dumps(result, indent=2)}"
)
]
)
elif name == "analyze_all_items":
items = await data_model.get_all_items()
if not items:
return CallToolResult(
content=[
TextContent(
type="text",
text="No items to analyze"
)
]
)
result = await ai_model.analyze_items(items)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Items analysis results:\n{json.dumps(result, indent=2)}"
)
]
)
elif name == "find_similar_items":
query = arguments.get("query")
top_k = arguments.get("top_k", 5)
if not query:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Query is required"
)
]
)
items = await data_model.get_all_items()
if not items:
return CallToolResult(
content=[
TextContent(
type="text",
text="No items to search"
)
]
)
result = await ai_model.find_similar_items(query, items, top_k)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Similar items for '{query}':\n{json.dumps(result, indent=2)}"
)
]
)
elif name == "analyze_single_item":
item_id = arguments.get("item_id")
if not item_id:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Item ID is required"
)
]
)
item = await data_model.get_item_by_id(item_id)
if not item:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Item with ID {item_id} not found"
)
]
)
# Extract text from item
text_parts = []
if item.get('name'):
text_parts.append(item['name'])
if item.get('description'):
text_parts.append(item['description'])
if item.get('category'):
text_parts.append(item['category'])
text = ' '.join(text_parts)
if not text.strip():
return CallToolResult(
content=[
TextContent(
type="text",
text="Item has no text content to analyze"
)
]
)
result = await ai_model.analyze_text(text)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Item analysis results:\n{json.dumps(result, indent=2)}"
)
]
)
elif name == "smart_search":
query = arguments.get("query")
top_k = arguments.get("top_k", 10)
if not query:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Query is required"
)
]
)
# Get traditional search results
traditional_results = await data_model.search_items(query)
# Get AI similarity results
all_items = await data_model.get_all_items()
ai_results = await ai_model.find_similar_items(query, all_items, top_k)
# Combine results
combined_results = []
# Add traditional search results
for item in traditional_results:
combined_results.append({
"item": item,
"score": 1.0,
"source": "traditional_search"
})
# Add AI similarity results
if "similar_items" in ai_results:
for ai_item in ai_results["similar_items"]:
# Check if item already exists
exists = any(r["item"]["id"] == ai_item["item"]["id"] for r in combined_results)
if not exists:
combined_results.append({
"item": ai_item["item"],
"score": ai_item["similarity_score"],
"source": "ai_similarity"
})
# Sort by score (descending)
combined_results.sort(key=lambda x: x["score"], reverse=True)
result = {
"query": query,
"results": combined_results[:top_k],
"traditional_count": len(traditional_results),
"total_results": len(combined_results)
}
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Smart search results for '{query}':\n{json.dumps(result, indent=2)}"
)
]
)
else:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Unknown tool: {name}"
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"Error: {str(e)}"
)
]
)
async def main():
"""Main function to run the MCP server"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())