#!/usr/bin/env python3
"""
Gemini MCP Server - MCP interface for Google AI Studio via AIStudioProxyAPI.
Supports multi-turn conversations, file/image uploads, web search, and thinking modes.
"""
import base64
import glob
import json
import os
import sys
import uuid
from collections import OrderedDict
from enum import Enum
from pathlib import Path
from typing import Any, Optional
import httpx
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, ConfigDict, Field
mcp = FastMCP("gemini_mcp")
API_BASE_URL = os.getenv("GEMINI_API_BASE_URL", "http://127.0.0.1:2048")
API_KEY = os.getenv("GEMINI_API_KEY", "")
REQUEST_TIMEOUT = 600.0
PROJECT_ROOT = os.getenv("GEMINI_PROJECT_ROOT", os.getcwd())
MAX_RETRIES = 3
LONG_TEXT_THRESHOLD = 8000
MODEL_PRIMARY = "gemini-3-pro-preview"
MODEL_LONG_TEXT = "gemini-2.5-pro"
MODEL_FALLBACK = "gemini-2.5-flash"
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"}
class SessionManager:
"""Manage conversation sessions with LRU eviction."""
def __init__(self, max_sessions: int = 50):
self._sessions: OrderedDict[str, list[dict[str, Any]]] = OrderedDict()
self._max_sessions = max_sessions
self._last_session_id: Optional[str] = None
def create(self) -> str:
session_id = uuid.uuid4().hex[:12]
self._sessions[session_id] = []
self._last_session_id = session_id
self._evict_if_needed()
return session_id
def get(self, session_id: str) -> Optional[list[dict[str, Any]]]:
if session_id == "last" and self._last_session_id:
session_id = self._last_session_id
if session_id in self._sessions:
self._sessions.move_to_end(session_id)
self._last_session_id = session_id
return self._sessions[session_id]
return None
def get_actual_id(self, session_id: str) -> Optional[str]:
if session_id == "last":
return self._last_session_id
return session_id if session_id in self._sessions else None
def append(self, session_id: str, user_text: str, assistant_text: str) -> None:
actual_id = self.get_actual_id(session_id)
if actual_id and actual_id in self._sessions:
self._sessions[actual_id].append({"role": "user", "content": user_text})
self._sessions[actual_id].append({"role": "assistant", "content": assistant_text})
self._last_session_id = actual_id
def get_history_length(self, session_id: str) -> int:
history = self.get(session_id)
if not history:
return 0
return sum(len(m.get("content", "")) for m in history if isinstance(m.get("content"), str))
def _evict_if_needed(self) -> None:
while len(self._sessions) > self._max_sessions:
self._sessions.popitem(last=False)
sessions = SessionManager()
class ResponseFormat(str, Enum):
MARKDOWN = "markdown"
JSON = "json"
class GeminiChatInput(BaseModel):
"""Input model for Gemini chat completion."""
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True)
prompt: str = Field(
...,
description="The prompt to send to Gemini.",
min_length=1,
max_length=100000,
)
file: Optional[list[str]] = Field(
default=None,
description="Optional file paths (text, code, images). Supports glob patterns.",
)
session_id: Optional[str] = Field(
default=None,
description="Session ID for multi-turn conversation. Use 'last' for most recent session.",
)
model: Optional[str] = Field(
default=None,
description="Override model selection. Auto-selects based on content length if not specified.",
)
system_prompt: Optional[str] = Field(
default=None,
description="System prompt to set context.",
max_length=50000,
)
temperature: Optional[float] = Field(
default=None,
description="Sampling temperature (0.0-2.0).",
ge=0.0,
le=2.0,
)
max_tokens: Optional[int] = Field(
default=None,
description="Maximum tokens in response.",
ge=1,
le=65536,
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' or 'json'.",
)
class GeminiListModelsInput(BaseModel):
"""Input model for listing available Gemini models."""
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' or 'json'.",
)
filter_text: Optional[str] = Field(
default=None,
description="Filter models by name.",
max_length=50,
)
async def _make_api_request(
endpoint: str,
method: str = "GET",
json_data: Optional[dict[str, Any]] = None,
stream: bool = False,
) -> dict[str, Any]:
"""Make HTTP request to AIStudioProxyAPI."""
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["Authorization"] = f"Bearer {API_KEY}"
url = f"{API_BASE_URL}{endpoint}"
async with httpx.AsyncClient(
timeout=REQUEST_TIMEOUT, proxy=None, trust_env=False
) as client:
if stream:
chunks: list[str] = []
reasoning_chunks: list[str] = []
async with client.stream(
method, url, headers=headers, json=json_data
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
if content := delta.get("content"):
chunks.append(content)
if reasoning := delta.get("reasoning_content"):
reasoning_chunks.append(reasoning)
except json.JSONDecodeError:
continue
return {
"content": "".join(chunks),
"reasoning": "".join(reasoning_chunks) if reasoning_chunks else None,
}
else:
response = await client.request(method, url, headers=headers, json=json_data)
response.raise_for_status()
return response.json()
def _resolve_file_path(file_path: str) -> Optional[str]:
"""Resolve a file path to an absolute path."""
if os.path.isabs(file_path) and os.path.isfile(file_path):
return file_path
for base in [os.getcwd(), PROJECT_ROOT]:
full_path = os.path.join(base, file_path)
if os.path.isfile(full_path):
return os.path.abspath(full_path)
for base in [os.getcwd(), PROJECT_ROOT]:
matches = glob.glob(os.path.join(base, file_path), recursive=True)
for match in matches:
if os.path.isfile(match):
return os.path.abspath(match)
if not any(c in file_path for c in ["*", "?", "/", "\\"]):
for base in [PROJECT_ROOT, os.getcwd()]:
for root, _, files in os.walk(base):
if file_path in files:
return os.path.abspath(os.path.join(root, file_path))
return None
def _resolve_glob_pattern(pattern: str) -> list[str]:
"""Resolve a glob pattern to a list of file paths."""
results: list[str] = []
for base in [os.getcwd(), PROJECT_ROOT]:
for match in glob.glob(os.path.join(base, pattern), recursive=True):
if os.path.isfile(match):
abs_path = os.path.abspath(match)
if abs_path not in results:
results.append(abs_path)
return results
def _is_image_file(file_path: str) -> bool:
return Path(file_path).suffix.lower() in IMAGE_EXTENSIONS
def _get_mime_type(file_path: str) -> str:
suffix = Path(file_path).suffix.lower()
mime_map = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp",
}
return mime_map.get(suffix, "application/octet-stream")
def _read_file_as_base64(file_path: str) -> str:
with open(file_path, "rb") as f:
return base64.b64encode(f.read()).decode("ascii")
def _read_text_file(file_path: str) -> str:
for encoding in ["utf-8", "latin-1", "cp1252"]:
try:
with open(file_path, "r", encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
continue
return _read_file_as_base64(file_path)
def _build_content_with_files(
prompt: str, file_paths: list[str]
) -> tuple[list[dict[str, Any]], list[str], list[str], int, str]:
"""
Build content array.
Returns (content_parts, text_files, image_files, total_chars, text_only_prompt).
text_only_prompt is used for session storage (without base64 data).
"""
content_parts: list[dict[str, Any]] = []
text_files: list[str] = []
image_files: list[str] = []
total_chars = len(prompt)
text_parts_for_session: list[str] = []
for file_path in file_paths:
file_name = Path(file_path).name
if _is_image_file(file_path):
mime_type = _get_mime_type(file_path)
b64_data = _read_file_as_base64(file_path)
content_parts.append({
"type": "image_url",
"image_url": {"url": f"data:{mime_type};base64,{b64_data}"}
})
image_files.append(file_name)
text_parts_for_session.append(f"[Image: {file_name}]")
else:
text_content = _read_text_file(file_path)
total_chars += len(text_content)
file_text = f"=== File: {file_name} ===\n{text_content}"
content_parts.append({"type": "text", "text": file_text})
text_files.append(file_name)
text_parts_for_session.append(file_text)
content_parts.append({"type": "text", "text": prompt})
text_parts_for_session.append(prompt)
session_text = "\n\n".join(text_parts_for_session)
return content_parts, text_files, image_files, total_chars, session_text
def _select_model(total_chars: int, history_chars: int, override: Optional[str]) -> str:
"""Select model based on total content length."""
if override:
return override
combined = total_chars + history_chars
return MODEL_LONG_TEXT if combined > LONG_TEXT_THRESHOLD else MODEL_PRIMARY
def _format_error(e: Exception) -> str:
if isinstance(e, httpx.HTTPStatusError):
status = e.response.status_code
error_map = {
401: "Authentication failed.",
404: "Model not found.",
429: "Rate limit exceeded.",
502: "Backend temporarily unavailable.",
503: "Service unavailable.",
}
return error_map.get(status, f"HTTP {status}")
elif isinstance(e, httpx.TimeoutException):
return "Request timed out."
elif isinstance(e, httpx.ConnectError):
return "Cannot connect to API."
return str(e)
async def _request_with_retry(
payload: dict[str, Any],
models_to_try: list[str],
) -> tuple[dict[str, Any], str, list[str]]:
"""Try request with retries and model fallback. Returns (result, used_model, errors)."""
errors: list[str] = []
for model in models_to_try:
payload["model"] = model
for attempt in range(MAX_RETRIES):
try:
result = await _make_api_request(
"/v1/chat/completions", method="POST", json_data=payload, stream=True
)
if result.get("content"):
return result, model, errors
errors.append(f"{model}: Empty response (attempt {attempt + 1})")
except Exception as e:
errors.append(f"{model}: {_format_error(e)} (attempt {attempt + 1})")
return {"content": "", "reasoning": None}, models_to_try[-1], errors
@mcp.tool(
name="gemini_chat",
annotations={
"title": "Chat with Gemini",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def gemini_chat(params: GeminiChatInput) -> str:
"""
Send a message to Google Gemini and get a response.
Args:
params (GeminiChatInput): Chat parameters including:
- prompt (str): The prompt to send
- file (Optional[list[str]]): Files to include (text, code, images)
- session_id (Optional[str]): Session ID for multi-turn chat, use 'last' for recent
- model (Optional[str]): Override model selection
- system_prompt (Optional[str]): System context
- temperature (Optional[float]): Creativity (0.0-2.0)
- max_tokens (Optional[int]): Max response length
- response_format: Output format - 'markdown' or 'json'
Returns:
str: Response with SESSION_ID for continuation.
Examples:
- Simple: prompt="What is AI?"
- With file: prompt="Review", file=["main.py"]
- With image: prompt="Describe", file=["photo.jpg"]
- Continue: prompt="Tell me more", session_id="last"
"""
try:
messages: list[dict[str, Any]] = []
text_files: list[str] = []
image_files: list[str] = []
not_found: list[str] = []
total_chars = len(params.prompt)
session_user_text = params.prompt
history_chars = 0
# Handle session continuation
session_id = params.session_id
is_continuation = False
if session_id:
history = sessions.get(session_id)
if history:
messages.extend(history)
history_chars = sessions.get_history_length(session_id)
is_continuation = True
session_id = sessions.get_actual_id(session_id) or session_id
else:
return f"Error: Session '{session_id}' not found."
else:
session_id = sessions.create()
# Add system prompt
if params.system_prompt and not is_continuation:
messages.append({"role": "system", "content": params.system_prompt})
# Process files
if params.file:
resolved_files: list[str] = []
for file_path in params.file:
if any(c in file_path for c in ["*", "?"]):
matches = _resolve_glob_pattern(file_path)
resolved_files.extend(matches) if matches else not_found.append(file_path)
else:
resolved = _resolve_file_path(file_path)
resolved_files.append(resolved) if resolved else not_found.append(file_path)
seen: set[str] = set()
unique_files = [f for f in resolved_files if not (f in seen or seen.add(f))]
if unique_files:
content_parts, text_files, image_files, total_chars, session_user_text = (
_build_content_with_files(params.prompt, unique_files)
)
user_message: dict[str, Any] = {"role": "user", "content": content_parts}
elif not_found and not resolved_files:
return f"Error: Files not found: {', '.join(not_found)}"
else:
user_message = {"role": "user", "content": params.prompt}
else:
user_message = {"role": "user", "content": params.prompt}
messages.append(user_message)
# Select model based on total content
selected_model = _select_model(total_chars, history_chars, params.model)
models_to_try = [selected_model]
if selected_model != MODEL_FALLBACK:
models_to_try.append(MODEL_FALLBACK)
# Build payload (stream=True, thinking=high, web_search=True are fixed)
payload: dict[str, Any] = {
"model": selected_model,
"messages": messages,
"stream": True,
"reasoning_effort": "high",
"tools": [{"type": "google_search"}],
}
if params.temperature is not None:
payload["temperature"] = params.temperature
if params.max_tokens is not None:
payload["max_tokens"] = params.max_tokens
# Make request with retry
result, used_model, errors = await _request_with_retry(payload, models_to_try)
content = result.get("content", "")
reasoning = result.get("reasoning")
if not content:
error_detail = "; ".join(errors) if errors else "Unknown error"
return f"Error: All models failed. Details: {error_detail}"
# Save to session (text only, no base64)
sessions.append(session_id, session_user_text, content)
# Format response
if params.response_format == ResponseFormat.JSON:
response_data: dict[str, Any] = {
"session_id": session_id,
"model": used_model,
"content": content,
}
if reasoning:
response_data["reasoning"] = reasoning
if text_files or image_files:
response_data["files"] = {"text": text_files, "images": image_files}
if not_found:
response_data["files_not_found"] = not_found
if errors:
response_data["retry_errors"] = errors
return json.dumps(response_data, ensure_ascii=False, indent=2)
else:
lines = [f"## Gemini Response\n"]
lines.append(f"**Session**: `{session_id}` | **Model**: `{used_model}`\n")
if text_files or image_files:
file_info = []
if text_files:
file_info.append(f"{len(text_files)} text")
if image_files:
file_info.append(f"{len(image_files)} image")
lines.append(f"**Files**: {', '.join(file_info)}\n")
if not_found:
lines.append(f"**Not found**: {', '.join(not_found)}\n")
if errors:
lines.append(f"**Retries**: {len(errors)}\n")
if reasoning:
lines.append("\n### Thinking\n")
lines.append(f"{reasoning[:2000]}{'...' if len(reasoning) > 2000 else ''}\n")
lines.append("\n### Response\n")
lines.append(content)
return "\n".join(lines)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@mcp.tool(
name="gemini_list_models",
annotations={
"title": "List Gemini Models",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def gemini_list_models(params: GeminiListModelsInput) -> str:
"""List available Gemini models."""
try:
result = await _make_api_request("/v1/models")
if not isinstance(result, dict):
return "Error: Unexpected response format"
models = result.get("data", [])
if params.filter_text:
filter_lower = params.filter_text.lower()
models = [
m for m in models
if filter_lower in m.get("id", "").lower()
or filter_lower in m.get("display_name", "").lower()
]
if not models:
return f"No models found{f' matching {params.filter_text}' if params.filter_text else ''}"
if params.response_format == ResponseFormat.JSON:
return json.dumps({"count": len(models), "models": models}, ensure_ascii=False, indent=2)
else:
lines = ["# Available Models\n", f"**Total**: {len(models)}\n"]
for m in models:
lines.append(f"- `{m.get('id')}` - {m.get('display_name', 'N/A')}")
return "\n".join(lines)
except Exception as e:
return f"Error: {_format_error(e)}"
def main() -> None:
print("Gemini MCP Server starting...", file=sys.stderr)
print(f"API: {API_BASE_URL} | Primary: {MODEL_PRIMARY} | Long: {MODEL_LONG_TEXT}", file=sys.stderr)
mcp.run()
if __name__ == "__main__":
main()