OpenAI MCP Server
by arthurcolle
#!/usr/bin/env python3
"""
Model Context Protocol (MCP) Server Implementation
This module implements the Model Context Protocol server capabilities,
allowing the assistant to be used as an MCP-compatible context provider.
"""
import os
import json
import time
import uuid
import sys
import logging
import asyncio
import tiktoken
import re
from datetime import datetime
from typing import Dict, List, Any, Optional, Union, AsyncGenerator
from fastapi import FastAPI, HTTPException, Request, Response, Depends, BackgroundTasks, Query
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
import uvicorn
import openai
from openai import OpenAI
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("mcp_server")
# MCP Protocol Models
class MCPHealthResponse(BaseModel):
"""Health check response for MCP protocol"""
status: str = "healthy"
version: str = "1.0.0"
protocol_version: str = "0.1.0"
provider: str = "OpenAI Code Assistant"
models: List[str] = ["gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo"]
uptime: Optional[float] = None
request_count: Optional[int] = None
cache_hit_ratio: Optional[float] = None
class MCPContextRequest(BaseModel):
"""Request for context generation from a prompt template"""
prompt_id: str
parameters: Dict[str, Any] = Field(default_factory=dict, description="Parameters to fill in the prompt template")
model: Optional[str] = Field(None, description="Model to use for context generation")
stream: bool = Field(False, description="Whether to stream the response")
user: Optional[str] = Field(None, description="User identifier for tracking")
conversation_id: Optional[str] = Field(None, description="Conversation identifier")
message_id: Optional[str] = Field(None, description="Message identifier")
class MCPContextResponse(BaseModel):
"""Response containing generated context"""
context: str = Field(..., description="The generated context")
context_id: str = Field(..., description="Unique identifier for this context")
model: str = Field(..., description="Model used for generation")
usage: Dict[str, int] = Field(default_factory=dict, description="Token usage statistics")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
class MCPErrorResponse(BaseModel):
"""Error response format"""
error: str = Field(..., description="Error message")
error_type: str = Field(..., description="Type of error")
status_code: int = Field(..., description="HTTP status code")
details: Optional[Dict[str, Any]] = Field(None, description="Additional error details")
class MCPPromptTemplate(BaseModel):
"""Prompt template definition"""
id: str = Field(..., description="Unique identifier for the template")
template: str = Field(..., description="The prompt template with parameter placeholders")
description: Optional[str] = Field(None, description="Description of the template")
parameters: Dict[str, Dict[str, Any]] = Field(default_factory=dict, description="Parameter definitions")
default_model: Optional[str] = Field(None, description="Default model to use with this template")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
class MCPPromptLibraryResponse(BaseModel):
"""Response containing a list of prompt templates"""
prompts: List[MCPPromptTemplate] = Field(..., description="List of prompt templates")
count: int = Field(..., description="Number of templates")
# MCP Server Implementation
# Prometheus metrics
REQUEST_COUNT = Counter('mcp_requests_total', 'Total number of requests processed', ['endpoint', 'status'])
REQUEST_LATENCY = Histogram('mcp_request_latency_seconds', 'Request latency in seconds', ['endpoint'])
CACHE_HIT = Counter('mcp_cache_hits_total', 'Total number of cache hits')
CACHE_MISS = Counter('mcp_cache_misses_total', 'Total number of cache misses')
ACTIVE_CONNECTIONS = Gauge('mcp_active_connections', 'Number of active connections')
TOKEN_USAGE = Counter('mcp_token_usage_total', 'Total number of tokens used', ['model', 'type'])
# Cache implementation
class CacheManager:
"""Manages caching for context responses"""
def __init__(self, cache_type="memory", redis_url=None, ttl=3600):
self.cache_type = cache_type
self.redis_url = redis_url
self.ttl = ttl
self.memory_cache = {}
self.redis_client = None
if cache_type == "redis" and redis_url:
try:
import redis
self.redis_client = redis.from_url(redis_url)
logging.info(f"Redis cache initialized with URL: {redis_url}")
except ImportError:
logging.warning("Redis package not installed. Falling back to memory cache.")
self.cache_type = "memory"
except Exception as e:
logging.error(f"Failed to connect to Redis: {str(e)}")
self.cache_type = "memory"
async def get(self, key):
"""Get item from cache"""
if self.cache_type == "redis" and self.redis_client:
try:
value = self.redis_client.get(key)
if value:
CACHE_HIT.inc()
return json.loads(value)
CACHE_MISS.inc()
return None
except Exception as e:
logging.error(f"Redis get error: {str(e)}")
CACHE_MISS.inc()
return None
else:
# Memory cache
if key in self.memory_cache:
if time.time() - self.memory_cache[key]["timestamp"] < self.ttl:
CACHE_HIT.inc()
return self.memory_cache[key]["data"]
else:
# Expired
del self.memory_cache[key]
CACHE_MISS.inc()
return None
async def set(self, key, value, ttl=None):
"""Set item in cache"""
if ttl is None:
ttl = self.ttl
if self.cache_type == "redis" and self.redis_client:
try:
self.redis_client.setex(key, ttl, json.dumps(value))
except Exception as e:
logging.error(f"Redis set error: {str(e)}")
else:
# Memory cache
self.memory_cache[key] = {
"data": value,
"timestamp": time.time()
}
async def delete(self, key):
"""Delete item from cache"""
if self.cache_type == "redis" and self.redis_client:
try:
self.redis_client.delete(key)
except Exception as e:
logging.error(f"Redis delete error: {str(e)}")
else:
# Memory cache
if key in self.memory_cache:
del self.memory_cache[key]
async def clear(self):
"""Clear all cache"""
if self.cache_type == "redis" and self.redis_client:
try:
self.redis_client.flushdb()
except Exception as e:
logging.error(f"Redis flush error: {str(e)}")
else:
# Memory cache
self.memory_cache = {}
class MCPServer:
"""Model Context Protocol Server Implementation"""
def __init__(self, cache_type="memory", redis_url=None):
self.app = FastAPI(
title="OpenAI Code Assistant MCP Server",
description="Model Context Protocol server for OpenAI Code Assistant",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
)
# Initialize OpenAI client
self.openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Initialize cache
self.cache = CacheManager(cache_type=cache_type, redis_url=redis_url)
# Initialize tokenizer
self.tokenizer = tiktoken.get_encoding("cl100k_base")
# Setup routes and middleware
self.setup_routes()
self.setup_middleware()
# Load templates and static files
self.templates_dir = os.path.join(os.path.dirname(__file__), "templates")
os.makedirs(self.templates_dir, exist_ok=True)
self.static_dir = os.path.join(os.path.dirname(__file__), "static")
os.makedirs(self.static_dir, exist_ok=True)
# Create default template if it doesn't exist
self._create_default_template()
# Initialize templates
self.templates = Jinja2Templates(directory=self.templates_dir)
# Mount static files
self.app.mount("/static", StaticFiles(directory=self.static_dir), name="static")
# Load prompt templates
self.prompt_templates = self._load_prompt_templates()
# Initialize metrics
self.request_count = 0
self.start_time = time.time()
def setup_middleware(self):
"""Configure middleware for the FastAPI app"""
# Add CORS middleware
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add request tracking middleware
@self.app.middleware("http")
async def track_requests(request: Request, call_next):
# Increment active connections
ACTIVE_CONNECTIONS.inc()
# Track request start time
start_time = time.time()
# Process request
try:
response = await call_next(request)
# Record metrics
endpoint = request.url.path
status = response.status_code
REQUEST_COUNT.labels(endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(endpoint=endpoint).observe(time.time() - start_time)
# Increment total request count
self.request_count += 1
return response
finally:
# Decrement active connections
ACTIVE_CONNECTIONS.dec()
def _create_default_template(self):
"""Create default dashboard template if it doesn't exist"""
index_path = os.path.join(self.templates_dir, "index.html")
if not os.path.exists(index_path):
with open(index_path, "w") as f:
f.write("""
<!DOCTYPE html>
<html>
<head>
<title>OpenAI Code Assistant MCP Server</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css">
<style>
body { padding: 20px; }
.card { margin-bottom: 20px; }
</style>
</head>
<body>
<div class="container">
<h1>OpenAI Code Assistant MCP Server</h1>
<div class="row">
<div class="col-md-6">
<div class="card">
<div class="card-header">Server Status</div>
<div class="card-body">
<p><strong>Status:</strong> {{ status }}</p>
<p><strong>Uptime:</strong> {{ uptime }}</p>
<p><strong>Requests Served:</strong> {{ request_count }}</p>
<p><strong>Cache Hit Ratio:</strong> {{ cache_hit_ratio }}%</p>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card">
<div class="card-header">Available Models</div>
<div class="card-body">
<ul>
{% for model in models %}
<li>{{ model }}</li>
{% endfor %}
</ul>
</div>
</div>
</div>
</div>
<h2>Available Prompt Templates</h2>
<div class="row">
{% for template in templates %}
<div class="col-md-6">
<div class="card">
<div class="card-header">{{ template.id }}</div>
<div class="card-body">
<p><strong>Description:</strong> {{ template.description }}</p>
<p><strong>Parameters:</strong> {{ template.parameters|join(", ") }}</p>
<p><strong>Default Model:</strong> {{ template.default_model }}</p>
</div>
</div>
</div>
{% endfor %}
</div>
<h2>API Documentation</h2>
<p>
<a href="/docs" class="btn btn-primary">Interactive API Docs</a>
<a href="/redoc" class="btn btn-secondary">ReDoc API Docs</a>
<a href="/metrics" class="btn btn-info">Prometheus Metrics</a>
</p>
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js"></script>
</body>
</html>
""")
def setup_routes(self):
"""Configure API routes for MCP protocol"""
# MCP Protocol Routes
# Dashboard route
@self.app.get("/", tags=["Dashboard"])
async def dashboard(request: Request):
"""Dashboard showing server status and available templates"""
# Calculate cache hit ratio
cache_hits = prometheus_client.REGISTRY.get_sample_value('mcp_cache_hits_total') or 0
cache_misses = prometheus_client.REGISTRY.get_sample_value('mcp_cache_misses_total') or 0
total_cache_requests = cache_hits + cache_misses
cache_hit_ratio = (cache_hits / total_cache_requests * 100) if total_cache_requests > 0 else 0
# Format uptime
uptime_seconds = time.time() - self.start_time
days, remainder = divmod(uptime_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
uptime_str = f"{int(days)}d {int(hours)}h {int(minutes)}m {int(seconds)}s"
# Get template information
templates = []
for template_id, template in self.prompt_templates.items():
templates.append({
"id": template_id,
"description": template.get("description", ""),
"parameters": list(template.get("parameters", {}).keys()),
"default_model": template.get("default_model", "gpt-4o")
})
return self.templates.TemplateResponse("index.html", {
"request": request,
"status": "Healthy",
"uptime": uptime_str,
"request_count": self.request_count,
"cache_hit_ratio": round(cache_hit_ratio, 2),
"models": ["gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo"],
"templates": templates
})
# Prometheus metrics endpoint
@self.app.get("/metrics", tags=["Monitoring"])
async def metrics():
"""Expose Prometheus metrics"""
return Response(prometheus_client.generate_latest(), media_type="text/plain")
# Health check endpoints
@self.app.get("/health", response_model=MCPHealthResponse, tags=["Health"])
async def health():
"""Health check endpoint"""
# Calculate cache hit ratio
cache_hits = prometheus_client.REGISTRY.get_sample_value('mcp_cache_hits_total') or 0
cache_misses = prometheus_client.REGISTRY.get_sample_value('mcp_cache_misses_total') or 0
total_cache_requests = cache_hits + cache_misses
cache_hit_ratio = (cache_hits / total_cache_requests) if total_cache_requests > 0 else 0
return MCPHealthResponse(
status="healthy",
uptime=time.time() - self.start_time,
request_count=self.request_count,
cache_hit_ratio=cache_hit_ratio
)
@self.app.post("/context", response_model=MCPContextResponse, tags=["Context"])
async def get_context(
request: MCPContextRequest,
background_tasks: BackgroundTasks,
use_cache: bool = Query(True, description="Whether to use cached results if available")
):
"""
Get context for a prompt template with parameters.
This endpoint processes a prompt template with the provided parameters
and returns the generated context. It can optionally use OpenAI models
to enhance the context.
"""
try:
# Check if prompt template exists
if request.prompt_id not in self.prompt_templates:
raise HTTPException(
status_code=404,
detail=f"Prompt template '{request.prompt_id}' not found"
)
# Get prompt template
template = self.prompt_templates[request.prompt_id]
# Use default model if not specified
model = request.model or template.get("default_model", "gpt-4o")
# Generate context ID
context_id = str(uuid.uuid4())
# Generate cache key
cache_key = f"{request.prompt_id}:{json.dumps(request.parameters, sort_keys=True)}:{model}"
# Check cache if enabled
if use_cache:
cached_result = await self.cache.get(cache_key)
if cached_result:
# Update context ID for this request
cached_result["context_id"] = context_id
return MCPContextResponse(**cached_result)
# Process template with parameters
processed_template = self._process_template(template["template"], request.parameters)
# Check if we should use OpenAI to enhance the context
if template.get("use_openai", False):
# Generate context using OpenAI
context, usage = await self._generate_with_openai(
processed_template,
model,
template.get("system_prompt")
)
else:
# Use the processed template directly
context = processed_template
# Calculate token usage
token_count = len(self.tokenizer.encode(context))
usage = {
"prompt_tokens": token_count,
"completion_tokens": 0,
"total_tokens": token_count
}
# Track token usage in Prometheus
TOKEN_USAGE.labels(model=model, type="prompt").inc(usage["prompt_tokens"])
TOKEN_USAGE.labels(model=model, type="completion").inc(usage["completion_tokens"])
# Create response
response = MCPContextResponse(
context=context,
context_id=context_id,
model=model,
usage=usage,
metadata={
"prompt_id": request.prompt_id,
"timestamp": time.time(),
"parameters": request.parameters
}
)
# Store in cache
await self.cache.set(cache_key, response.dict())
return response
except Exception as e:
logger.error(f"Error processing context request: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Error processing context: {str(e)}"
)
@self.app.post("/context/stream", tags=["Context"])
async def stream_context(request: MCPContextRequest):
"""
Stream context generation.
Similar to /context but streams the response as it's generated.
"""
try:
# Check if prompt template exists
if request.prompt_id not in self.prompt_templates:
raise HTTPException(
status_code=404,
detail=f"Prompt template '{request.prompt_id}' not found"
)
# Get prompt template
template = self.prompt_templates[request.prompt_id]
# Use default model if not specified
model = request.model or template.get("default_model", "gpt-4o")
# Generate context ID
context_id = str(uuid.uuid4())
# Process template with parameters
processed_template = self._process_template(template["template"], request.parameters)
# Stream the context generation
return StreamingResponse(
self._stream_context(processed_template, model, context_id, template.get("system_prompt")),
media_type="text/event-stream"
)
except Exception as e:
logger.error(f"Error streaming context: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Error streaming context: {str(e)}"
)
@self.app.get("/prompts", response_model=MCPPromptLibraryResponse, tags=["Prompts"])
async def get_prompts():
"""
Get available prompt templates.
Returns a list of all prompt templates available in the system.
"""
prompts = [
MCPPromptTemplate(
id=prompt_id,
template=template["template"],
description=template.get("description", ""),
parameters=template.get("parameters", {}),
default_model=template.get("default_model", "gpt-4o"),
metadata=template.get("metadata", {})
)
for prompt_id, template in self.prompt_templates.items()
]
return MCPPromptLibraryResponse(
prompts=prompts,
count=len(prompts)
)
@self.app.get("/prompts/{prompt_id}", response_model=MCPPromptTemplate, tags=["Prompts"])
async def get_prompt(prompt_id: str):
"""
Get a specific prompt template.
Returns the details of a specific prompt template by ID.
"""
if prompt_id not in self.prompt_templates:
raise HTTPException(
status_code=404,
detail=f"Prompt template '{prompt_id}' not found"
)
template = self.prompt_templates[prompt_id]
return MCPPromptTemplate(
id=prompt_id,
template=template["template"],
description=template.get("description", ""),
parameters=template.get("parameters", {}),
default_model=template.get("default_model", "gpt-4o"),
metadata=template.get("metadata", {})
)
@self.app.post("/prompts", response_model=MCPPromptTemplate, status_code=201, tags=["Prompts"])
async def create_prompt(prompt: MCPPromptTemplate):
"""
Create a new prompt template.
Adds a new prompt template to the system.
"""
if prompt.id in self.prompt_templates:
raise HTTPException(
status_code=409,
detail=f"Prompt template '{prompt.id}' already exists"
)
self.prompt_templates[prompt.id] = {
"template": prompt.template,
"description": prompt.description,
"parameters": prompt.parameters,
"default_model": prompt.default_model,
"metadata": prompt.metadata
}
# Save updated templates
self._save_prompt_templates()
return prompt
@self.app.put("/prompts/{prompt_id}", response_model=MCPPromptTemplate, tags=["Prompts"])
async def update_prompt(prompt_id: str, prompt: MCPPromptTemplate):
"""
Update an existing prompt template.
Updates the details of an existing prompt template.
"""
if prompt_id != prompt.id:
raise HTTPException(
status_code=400,
detail="Prompt ID in path must match prompt ID in body"
)
if prompt_id not in self.prompt_templates:
raise HTTPException(
status_code=404,
detail=f"Prompt template '{prompt_id}' not found"
)
self.prompt_templates[prompt_id] = {
"template": prompt.template,
"description": prompt.description,
"parameters": prompt.parameters,
"default_model": prompt.default_model,
"metadata": prompt.metadata
}
# Save updated templates
self._save_prompt_templates()
return prompt
@self.app.delete("/prompts/{prompt_id}", tags=["Prompts"])
async def delete_prompt(prompt_id: str):
"""
Delete a prompt template.
Removes a prompt template from the system.
"""
if prompt_id not in self.prompt_templates:
raise HTTPException(
status_code=404,
detail=f"Prompt template '{prompt_id}' not found"
)
del self.prompt_templates[prompt_id]
# Save updated templates
self._save_prompt_templates()
return {"status": "deleted", "prompt_id": prompt_id}
# Additional endpoints for a more complete MCP server
@self.app.get("/models", tags=["Models"])
async def get_models():
"""
Get available models.
Returns a list of models that can be used with this MCP server.
"""
return {
"models": [
{
"id": "gpt-4o",
"name": "GPT-4o",
"description": "OpenAI's most advanced model",
"context_length": 128000,
"is_default": True
},
{
"id": "gpt-4-turbo",
"name": "GPT-4 Turbo",
"description": "Optimized version of GPT-4",
"context_length": 128000,
"is_default": False
},
{
"id": "gpt-3.5-turbo",
"name": "GPT-3.5 Turbo",
"description": "Fast and efficient model",
"context_length": 16385,
"is_default": False
}
],
"count": 3
}
@self.app.get("/stats", tags=["System"])
async def get_stats():
"""
Get server statistics.
Returns usage statistics and system information.
"""
return {
"uptime": time.time() - self.start_time,
"prompt_templates_count": len(self.prompt_templates),
"cache_size": len(self.context_cache),
"requests_served": {
"context": 0, # This would be tracked in a real implementation
"prompts": 0,
"total": 0
},
"system_info": {
"python_version": sys.version,
"platform": sys.platform
}
}
@self.app.post("/context/stream", tags=["Context"])
async def stream_context(request: MCPContextRequest):
"""
Stream context generation.
Similar to /context but streams the response as it's generated.
"""
# In a real implementation, this would stream the response
# For now, we'll just return a simple response
return JSONResponse(
content={"message": "Streaming not implemented in this version"},
status_code=501
)
# Error handlers
@self.app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Handle HTTP exceptions in MCP format"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"error_type": "http_error",
"status_code": exc.status_code,
"details": exc.detail if isinstance(exc.detail, dict) else None
}
)
@self.app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle general exceptions in MCP format"""
logger.error(f"Unhandled exception: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": str(exc),
"error_type": "server_error",
"status_code": 500,
"details": None
}
)
def _load_prompt_templates(self) -> Dict[str, Dict[str, Any]]:
"""Load prompt templates from file or initialize defaults"""
templates_file = os.path.join(os.path.dirname(__file__), "data", "prompt_templates.json")
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(templates_file), exist_ok=True)
# Try to load existing templates
if os.path.exists(templates_file):
try:
with open(templates_file, "r") as f:
templates = json.load(f)
logger.info(f"Loaded {len(templates)} prompt templates from {templates_file}")
return templates
except Exception as e:
logger.error(f"Error loading prompt templates: {str(e)}")
# Initialize with enhanced default templates
default_templates = {
"greeting": {
"template": "Hello! The current time is {time}. How can I help you today?",
"description": "A simple greeting template",
"parameters": {
"time": {
"type": "string",
"description": "The current time"
}
},
"default_model": "gpt-4o",
"metadata": {
"category": "general"
}
},
"code_review": {
"template": "Please review the following code:\n\n```{language}\n{code}\n```\n\nFocus on: {focus_areas}",
"description": "Template for code review requests",
"parameters": {
"language": {
"type": "string",
"description": "Programming language of the code"
},
"code": {
"type": "string",
"description": "The code to review"
},
"focus_areas": {
"type": "string",
"description": "Areas to focus on during review (e.g., 'performance, security')"
}
},
"default_model": "gpt-4o",
"use_openai": True,
"system_prompt": "You are a code review expert. Analyze the provided code and provide constructive feedback focusing on the specified areas.",
"metadata": {
"category": "development"
}
},
"system_prompt": {
"template": "You are OpenAI Code Assistant, a CLI tool that helps users with software engineering tasks and general information.\nUse the available tools to assist the user with their requests.\n\n# Tone and style\nYou should be concise, direct, and to the point. When you run a non-trivial bash command, \nyou should explain what the command does and why you are running it.\nOutput text to communicate with the user; all text you output outside of tool use is displayed to the user.\nRemember that your output will be displayed on a command line interface.\n\n# Tool usage policy\n- When doing file search, remember to search effectively with the available tools.\n- Always use the appropriate tool for the task.\n- Use parallel tool calls when appropriate to improve performance.\n- NEVER commit changes unless the user explicitly asks you to.\n- For weather queries, use the Weather tool to provide real-time information.\n\n# Tasks\nThe user will primarily request you perform software engineering tasks:\n1. Solving bugs\n2. Adding new functionality \n3. Refactoring code\n4. Explaining code\n5. Writing tests\n\nFor these tasks:\n1. Use search tools to understand the codebase\n2. Implement solutions using the available tools\n3. Verify solutions with tests if possible\n4. Run lint and typecheck commands when appropriate\n\nThe user may also ask for general information:\n1. Weather conditions\n2. Simple calculations\n3. General knowledge questions\n\n# Code style\n- Follow the existing code style of the project\n- Maintain consistent naming conventions\n- Use appropriate libraries that are already in the project\n- Add comments when code is complex or non-obvious\n\nIMPORTANT: You should minimize output tokens as much as possible while maintaining helpfulness, \nquality, and accuracy. Answer concisely with short lines of text unless the user asks for detail.",
"description": "System prompt for the assistant",
"parameters": {},
"default_model": "gpt-4o",
"metadata": {
"category": "system"
}
},
"documentation": {
"template": "Generate documentation for the following code:\n\n```{language}\n{code}\n```\n\nFormat: {format}",
"description": "Generate code documentation",
"parameters": {
"language": {
"type": "string",
"description": "Programming language of the code"
},
"code": {
"type": "string",
"description": "The code to document"
},
"format": {
"type": "string",
"description": "Documentation format (e.g., 'markdown', 'docstring', 'jsdoc')",
"default": "markdown"
}
},
"default_model": "gpt-4o",
"use_openai": True,
"system_prompt": "You are a technical documentation expert. Generate clear, concise, and accurate documentation for the provided code.",
"metadata": {
"category": "development"
}
},
"explain_code": {
"template": "Explain how the following code works:\n\n```{language}\n{code}\n```\n\nDetail level: {detail_level}",
"description": "Explain code functionality",
"parameters": {
"language": {
"type": "string",
"description": "Programming language of the code"
},
"code": {
"type": "string",
"description": "The code to explain"
},
"detail_level": {
"type": "string",
"description": "Level of detail in the explanation (e.g., 'basic', 'intermediate', 'advanced')",
"default": "intermediate"
}
},
"default_model": "gpt-4o",
"use_openai": True,
"system_prompt": "You are a programming instructor. Explain the provided code clearly at the requested level of detail.",
"metadata": {
"category": "education"
}
},
"current_time": {
"template": "The current time is {{now:%Y-%m-%d %H:%M:%S}}.",
"description": "Get the current time",
"parameters": {},
"default_model": "gpt-4o",
"metadata": {
"category": "utility"
}
}
}
# Save default templates
try:
with open(templates_file, "w") as f:
json.dump(default_templates, f, indent=2)
except Exception as e:
logger.error(f"Error saving default prompt templates: {str(e)}")
return default_templates
def _save_prompt_templates(self):
"""Save prompt templates to file"""
templates_file = os.path.join(os.path.dirname(__file__), "data", "prompt_templates.json")
try:
with open(templates_file, "w") as f:
json.dump(self.prompt_templates, f, indent=2)
except Exception as e:
logger.error(f"Error saving prompt templates: {str(e)}")
async def _generate_with_openai(self, prompt: str, model: str, system_prompt: Optional[str] = None) -> tuple:
"""Generate context using OpenAI API"""
messages = []
# Add system prompt if provided
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
# Add user prompt
messages.append({"role": "user", "content": prompt})
# Call OpenAI API
try:
response = await asyncio.to_thread(
self.openai_client.chat.completions.create,
model=model,
messages=messages,
temperature=0.0, # Use deterministic output for context generation
max_tokens=4000
)
# Extract content and usage
content = response.choices[0].message.content
usage = {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
return content, usage
except Exception as e:
logger.error(f"OpenAI API error: {str(e)}")
raise ValueError(f"Error generating context with OpenAI: {str(e)}")
async def _stream_context(self, prompt: str, model: str, context_id: str, system_prompt: Optional[str] = None) -> AsyncGenerator[str, None]:
"""Stream context generation using OpenAI API"""
messages = []
# Add system prompt if provided
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
# Add user prompt
messages.append({"role": "user", "content": prompt})
# Initial event with context ID
yield f"data: {json.dumps({'context_id': context_id, 'event': 'start'})}\n\n"
try:
# Call OpenAI API with streaming
stream = await asyncio.to_thread(
self.openai_client.chat.completions.create,
model=model,
messages=messages,
temperature=0.0,
max_tokens=4000,
stream=True
)
full_content = ""
# Process the stream
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content_piece = chunk.choices[0].delta.content
full_content += content_piece
# Yield the content piece
yield f"data: {json.dumps({'content': content_piece, 'event': 'content'})}\n\n"
# Calculate token usage
prompt_tokens = len(self.tokenizer.encode(prompt))
completion_tokens = len(self.tokenizer.encode(full_content))
total_tokens = prompt_tokens + completion_tokens
# Track token usage
TOKEN_USAGE.labels(model=model, type="prompt").inc(prompt_tokens)
TOKEN_USAGE.labels(model=model, type="completion").inc(completion_tokens)
# Final event with complete context and usage
yield f"data: {json.dumps({
'event': 'end',
'context': full_content,
'usage': {
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens,
'total_tokens': total_tokens
}
})}\n\n"
except Exception as e:
logger.error(f"Error streaming context: {str(e)}")
yield f"data: {json.dumps({'event': 'error', 'error': str(e)})}\n\n"
def _process_template(self, template: str, parameters: Dict[str, Any]) -> str:
"""Process a template with parameters"""
try:
# Handle date/time formatting if needed
processed_params = parameters.copy()
for key, value in processed_params.items():
if isinstance(value, str) and value.startswith("{{now") and value.endswith("}}"):
# Extract format string if present
format_match = re.search(r"{{now:(.+)}}", value)
if format_match:
format_string = format_match.group(1)
processed_params[key] = datetime.now().strftime(format_string)
else:
processed_params[key] = datetime.now().isoformat()
return template.format(**processed_params)
except KeyError as e:
raise ValueError(f"Missing required parameter: {e}")
except Exception as e:
raise ValueError(f"Error processing template: {str(e)}")
def start(self, host: str = "127.0.0.1", port: int = 8000, reload: bool = False):
"""Start the MCP server"""
uvicorn.run(self.app, host=host, port=port, reload=reload)
def create_mcp_app():
"""Factory function for creating the FastAPI app"""
server = MCPServer()
return server.app
if __name__ == "__main__":
# Create data directory if it doesn't exist
os.makedirs(os.path.join(os.path.dirname(__file__), "data"), exist_ok=True)
# Start server
server = MCPServer()
server.start()