from typing import Dict, Any, Optional
from pydantic import BaseModel
from datetime import datetime
import re
class MCPMessage(BaseModel):
"""Message format following MCP protocol."""
role: str
content: str
name: Optional[str] = None
timestamp: Optional[str] = None
class MessageConverter:
"""Handles conversion between MCP and Ollama message formats."""
def __init__(self, backend: str = "ollama"):
self.system_prompt = "You are DeepSeek, a helpful AI assistant."
self.conversation_history = []
def clean_think_tags(self, content: str) -> str:
"""Remove <think> tags from Ollama responses."""
content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
content = re.sub(r'<\/?think>', '', content)
return content.strip()
def mcp_to_ollama(self, message: MCPMessage, config: Dict[str, Any]) -> Dict[str, Any]:
"""Convert MCP message to Ollama format."""
prompt = f"System: {self.system_prompt}\nHuman: {message.content}\nAssistant: "
return {
"model": "deepseek-r1",
"prompt": prompt,
"stream": False,
"options": {
"temperature": config.get("temperature", 0.7),
"top_p": config.get("top_p", 0.9)
}
}
def ollama_to_mcp(self, response: Dict[str, Any]) -> MCPMessage:
"""Convert Ollama response to MCP format."""
content = response.get("response", "")
content = self.clean_think_tags(content)
return MCPMessage(
role="assistant",
content=content,
timestamp=datetime.now().isoformat()
)