#!/usr/bin/env python3
"""
MCP Server for Claude Code Integration
Implements the Model Context Protocol for Gemini collaboration
"""
import asyncio
import json
import sys
from typing import Any, Dict, List, Optional
import aiohttp
class MCPServer:
"""Model Context Protocol server for Gemini collaboration"""
def __init__(self, collaborative_server_url: str = "http://localhost:8080"):
self.collaborative_server_url = collaborative_server_url
self.conversation_id = None
async def list_tools(self) -> List[Dict[str, Any]]:
"""List available MCP tools"""
return [
{
"name": "start_gemini_collaboration",
"description": "Start a new collaborative conversation with Gemini",
"inputSchema": {
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "Topic for the collaboration"
}
},
"required": ["topic"]
}
},
{
"name": "consult_gemini",
"description": "Ask Gemini a question in the current collaboration context",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Question or statement to discuss with Gemini"
},
"include_context": {
"type": "boolean",
"description": "Include previous conversation context",
"default": True
}
},
"required": ["query"]
}
},
{
"name": "get_collaboration_history",
"description": "Get the full conversation history with Gemini",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
}
]
async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Call a specific tool"""
try:
if name == "start_gemini_collaboration":
return await self._start_collaboration(arguments.get("topic", "General Discussion"))
elif name == "consult_gemini":
return await self._consult_gemini(
arguments.get("query", ""),
arguments.get("include_context", True)
)
elif name == "get_collaboration_history":
return await self._get_collaboration_history()
else:
return {
"isError": True,
"content": [{"type": "text", "text": f"Unknown tool: {name}"}]
}
except Exception as e:
return {
"isError": True,
"content": [{"type": "text", "text": f"Error executing {name}: {str(e)}"}]
}
async def _start_collaboration(self, topic: str) -> Dict[str, Any]:
"""Start a new collaboration"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.collaborative_server_url}/start_conversation",
json={"topic": topic}
) as response:
if response.status == 200:
data = await response.json()
self.conversation_id = data.get('conversation_id')
return {
"content": [{
"type": "text",
"text": f"✅ Started collaboration with Gemini on '{topic}'\nConversation ID: {self.conversation_id}"
}]
}
else:
error_data = await response.json()
return {
"isError": True,
"content": [{
"type": "text",
"text": f"Failed to start collaboration: {error_data.get('error', 'Unknown error')}"
}]
}
async def _consult_gemini(self, query: str, include_context: bool = True) -> Dict[str, Any]:
"""Consult Gemini with a query"""
if not self.conversation_id:
# Auto-start collaboration
await self._start_collaboration("General Discussion")
async with aiohttp.ClientSession() as session:
# Add Claude's message to conversation
await session.post(
f"{self.collaborative_server_url}/add_message",
json={
"conversation_id": self.conversation_id,
"role": "claude",
"content": query
}
)
# Get Gemini's response
async with session.post(
f"{self.collaborative_server_url}/consult_gemini",
json={
"conversation_id": self.conversation_id,
"query": query,
"include_context": include_context
}
) as response:
if response.status == 200:
data = await response.json()
gemini_response = data.get('gemini_response', {})
if gemini_response.get('error'):
return {
"isError": True,
"content": [{
"type": "text",
"text": f"Gemini error: {gemini_response.get('message', 'Unknown error')}"
}]
}
else:
return {
"content": [{
"type": "text",
"text": f"🤖 Gemini responds:\n\n{gemini_response.get('text', 'No response')}"
}]
}
elif response.status == 429:
error_data = await response.json()
return {
"isError": True,
"content": [{
"type": "text",
"text": f"Rate limit exceeded. Please wait {error_data.get('retry_after', 60)} seconds."
}]
}
else:
error_data = await response.json()
return {
"isError": True,
"content": [{
"type": "text",
"text": f"Failed to consult Gemini: {error_data.get('error', 'Unknown error')}"
}]
}
async def _get_collaboration_history(self) -> Dict[str, Any]:
"""Get collaboration history"""
if not self.conversation_id:
return {
"content": [{
"type": "text",
"text": "No active collaboration. Start one with start_gemini_collaboration first."
}]
}
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.collaborative_server_url}/conversation/{self.conversation_id}"
) as response:
if response.status == 200:
data = await response.json()
messages = data.get('messages', [])
if not messages:
return {
"content": [{
"type": "text",
"text": "No messages in conversation yet."
}]
}
history_lines = ["📜 **Collaboration History:**\n"]
for msg in messages:
role = msg['role'].upper()
content = msg['content']
timestamp = msg.get('timestamp', '')
history_lines.append(f"**{role}**: {content}\n")
return {
"content": [{
"type": "text",
"text": "\n".join(history_lines)
}]
}
else:
return {
"isError": True,
"content": [{
"type": "text",
"text": "Failed to get conversation history"
}]
}
async def main():
"""Main MCP server loop"""
server = MCPServer()
# Read from stdin and write to stdout (MCP protocol)
while True:
try:
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
request = json.loads(line.strip())
method = request.get('method')
params = request.get('params', {})
request_id = request.get('id')
if method == "tools/list":
tools = await server.list_tools()
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": {"tools": tools}
}
elif method == "tools/call":
tool_name = params.get('name')
arguments = params.get('arguments', {})
result = await server.call_tool(tool_name, arguments)
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
else:
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
print(json.dumps(response), flush=True)
except json.JSONDecodeError:
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error"
}
}
print(json.dumps(error_response), flush=True)
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"id": request_id if 'request_id' in locals() else None,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
print(json.dumps(error_response), flush=True)
if __name__ == "__main__":
asyncio.run(main())