MCP Gemini Server
by amitsh06
Verified
from flask import Flask, request, jsonify
import google.generativeai as genai
import os
from dotenv import load_dotenv
import json
# Load environment variables
load_dotenv()
# Configure the Gemini API
api_key = os.getenv('GEMINI_API_KEY')
if not api_key:
raise ValueError("No Gemini API key found. Make sure to add it to your .env file.")
genai.configure(api_key=api_key)
# Choose a specific model that we know is available
TEXT_MODEL = 'models/gemini-1.5-pro'
VISION_MODEL = 'models/gemini-1.5-pro-vision-latest'
# Initialize Flask app
app = Flask(__name__)
@app.route('/health', methods=['GET'])
def health_check():
"""Simple health check endpoint."""
return jsonify({"status": "ok", "message": "MCP server is running"})
@app.route('/list-models', methods=['GET'])
def list_models():
"""List available models."""
try:
models = genai.list_models()
model_names = [model.name for model in models]
return jsonify({"available_models": model_names})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/mcp', methods=['POST'])
def handle_mcp_request():
"""Handle MCP protocol requests."""
# Get the request data
data = request.json
if not data:
return jsonify({"error": "No data provided"}), 400
# Extract the necessary information from the MCP request
action = data.get('action')
parameters = data.get('parameters', {})
# Handle different actions
if action == 'generate_text':
prompt = parameters.get('prompt')
if not prompt:
return jsonify({'error': 'No prompt provided'}), 400
# Get optional parameters with defaults
max_tokens = parameters.get('max_tokens', 1024)
temperature = parameters.get('temperature', 0.7)
try:
# Use the specified model
model = genai.GenerativeModel(
TEXT_MODEL,
generation_config={
"temperature": temperature,
"max_output_tokens": max_tokens
}
)
# Call Gemini API
response = model.generate_content(prompt)
# Format response according to MCP protocol
return jsonify({
'result': {
'text': response.text,
'model': TEXT_MODEL,
'usage': {
'prompt_tokens': len(prompt.split()),
'completion_tokens': len(response.text.split()),
'total_tokens': len(prompt.split()) + len(response.text.split())
}
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
elif action == 'analyze_text':
text = parameters.get('text')
analysis_type = parameters.get('analysis_type', 'general')
if not text:
return jsonify({'error': 'No text provided'}), 400
try:
# Construct a prompt based on the analysis type
if analysis_type == 'sentiment':
prompt = f"Analyze the sentiment of the following text and classify it as positive, negative, or neutral. Provide a brief explanation: {text}"
elif analysis_type == 'summary':
prompt = f"Provide a concise summary of the following text in 2-3 sentences: {text}"
elif analysis_type == 'keywords':
prompt = f"Extract and list the 5-7 most important keywords from this text: {text}"
else:
prompt = f"Analyze the following text and provide insights: {text}"
# Use the model
model = genai.GenerativeModel(TEXT_MODEL)
response = model.generate_content(prompt)
# Format response according to MCP protocol
return jsonify({
'result': {
'analysis': response.text,
'analysis_type': analysis_type,
'model': TEXT_MODEL
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
elif action == 'chat':
messages = parameters.get('messages', [])
if not messages or not isinstance(messages, list) or len(messages) == 0:
return jsonify({'error': 'No valid messages provided'}), 400
# Get optional parameters with defaults
temperature = parameters.get('temperature', 0.7)
try:
# Use the model in chat mode
model = genai.GenerativeModel(
TEXT_MODEL,
generation_config={"temperature": temperature}
)
# Format messages for Gemini
chat = model.start_chat(history=[])
# Add messages to chat
formatted_messages = []
for msg in messages:
if msg.get('role') == 'user':
formatted_messages.append({"role": "user", "parts": [msg.get('content', '')]})
elif msg.get('role') == 'assistant':
formatted_messages.append({"role": "model", "parts": [msg.get('content', '')]})
# Get the last user message
last_user_message = next((msg.get('content', '') for msg in reversed(messages)
if msg.get('role') == 'user'), None)
if not last_user_message:
return jsonify({'error': 'No user message found'}), 400
# Send to Gemini
response = chat.send_message(last_user_message)
# Format response according to MCP protocol
return jsonify({
'result': {
'response': response.text,
'model': TEXT_MODEL,
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
else:
return jsonify({'error': f'Unknown action: {action}'}), 400
if __name__ == '__main__':
app.run(debug=True, port=5000)