Skip to main content
Glama
xumingjun5208

Gemini MCP Server

server.py20.7 kB
#!/usr/bin/env python3 """ Gemini MCP Server - MCP interface for Google AI Studio via AIStudioProxyAPI. Supports multi-turn conversations, file/image uploads, web search, and thinking modes. """ import base64 import glob import json import os import sys import uuid from collections import OrderedDict from enum import Enum from pathlib import Path from typing import Any, Optional import httpx from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, ConfigDict, Field mcp = FastMCP("gemini_mcp") API_BASE_URL = os.getenv("GEMINI_API_BASE_URL", "http://127.0.0.1:2048") API_KEY = os.getenv("GEMINI_API_KEY", "") REQUEST_TIMEOUT = 600.0 PROJECT_ROOT = os.getenv("GEMINI_PROJECT_ROOT", os.getcwd()) MAX_RETRIES = 3 LONG_TEXT_THRESHOLD = 8000 MODEL_PRIMARY = "gemini-3-pro-preview" MODEL_LONG_TEXT = "gemini-2.5-pro" MODEL_FALLBACK = "gemini-2.5-flash" IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"} class SessionManager: """Manage conversation sessions with LRU eviction.""" def __init__(self, max_sessions: int = 50): self._sessions: OrderedDict[str, list[dict[str, Any]]] = OrderedDict() self._max_sessions = max_sessions self._last_session_id: Optional[str] = None def create(self) -> str: session_id = uuid.uuid4().hex[:12] self._sessions[session_id] = [] self._last_session_id = session_id self._evict_if_needed() return session_id def get(self, session_id: str) -> Optional[list[dict[str, Any]]]: if session_id == "last" and self._last_session_id: session_id = self._last_session_id if session_id in self._sessions: self._sessions.move_to_end(session_id) self._last_session_id = session_id return self._sessions[session_id] return None def get_actual_id(self, session_id: str) -> Optional[str]: if session_id == "last": return self._last_session_id return session_id if session_id in self._sessions else None def append(self, session_id: str, user_text: str, assistant_text: str) -> None: actual_id = self.get_actual_id(session_id) if actual_id and actual_id in self._sessions: self._sessions[actual_id].append({"role": "user", "content": user_text}) self._sessions[actual_id].append({"role": "assistant", "content": assistant_text}) self._last_session_id = actual_id def get_history_length(self, session_id: str) -> int: history = self.get(session_id) if not history: return 0 return sum(len(m.get("content", "")) for m in history if isinstance(m.get("content"), str)) def _evict_if_needed(self) -> None: while len(self._sessions) > self._max_sessions: self._sessions.popitem(last=False) sessions = SessionManager() class ResponseFormat(str, Enum): MARKDOWN = "markdown" JSON = "json" class GeminiChatInput(BaseModel): """Input model for Gemini chat completion.""" model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True) prompt: str = Field( ..., description="The prompt to send to Gemini.", min_length=1, max_length=100000, ) file: Optional[list[str]] = Field( default=None, description="Optional file paths (text, code, images). Supports glob patterns.", ) session_id: Optional[str] = Field( default=None, description="Session ID for multi-turn conversation. Use 'last' for most recent session.", ) model: Optional[str] = Field( default=None, description="Override model selection. Auto-selects based on content length if not specified.", ) system_prompt: Optional[str] = Field( default=None, description="System prompt to set context.", max_length=50000, ) temperature: Optional[float] = Field( default=None, description="Sampling temperature (0.0-2.0).", ge=0.0, le=2.0, ) max_tokens: Optional[int] = Field( default=None, description="Maximum tokens in response.", ge=1, le=65536, ) response_format: ResponseFormat = Field( default=ResponseFormat.MARKDOWN, description="Output format: 'markdown' or 'json'.", ) class GeminiListModelsInput(BaseModel): """Input model for listing available Gemini models.""" response_format: ResponseFormat = Field( default=ResponseFormat.MARKDOWN, description="Output format: 'markdown' or 'json'.", ) filter_text: Optional[str] = Field( default=None, description="Filter models by name.", max_length=50, ) async def _make_api_request( endpoint: str, method: str = "GET", json_data: Optional[dict[str, Any]] = None, stream: bool = False, ) -> dict[str, Any]: """Make HTTP request to AIStudioProxyAPI.""" headers = {"Content-Type": "application/json"} if API_KEY: headers["Authorization"] = f"Bearer {API_KEY}" url = f"{API_BASE_URL}{endpoint}" async with httpx.AsyncClient( timeout=REQUEST_TIMEOUT, proxy=None, trust_env=False ) as client: if stream: chunks: list[str] = [] reasoning_chunks: list[str] = [] async with client.stream( method, url, headers=headers, json=json_data ) as response: response.raise_for_status() async for line in response.aiter_lines(): if line.startswith("data: "): data_str = line[6:] if data_str == "[DONE]": break try: data = json.loads(data_str) choices = data.get("choices", []) if choices: delta = choices[0].get("delta", {}) if content := delta.get("content"): chunks.append(content) if reasoning := delta.get("reasoning_content"): reasoning_chunks.append(reasoning) except json.JSONDecodeError: continue return { "content": "".join(chunks), "reasoning": "".join(reasoning_chunks) if reasoning_chunks else None, } else: response = await client.request(method, url, headers=headers, json=json_data) response.raise_for_status() return response.json() def _resolve_file_path(file_path: str) -> Optional[str]: """Resolve a file path to an absolute path.""" if os.path.isabs(file_path) and os.path.isfile(file_path): return file_path for base in [os.getcwd(), PROJECT_ROOT]: full_path = os.path.join(base, file_path) if os.path.isfile(full_path): return os.path.abspath(full_path) for base in [os.getcwd(), PROJECT_ROOT]: matches = glob.glob(os.path.join(base, file_path), recursive=True) for match in matches: if os.path.isfile(match): return os.path.abspath(match) if not any(c in file_path for c in ["*", "?", "/", "\\"]): for base in [PROJECT_ROOT, os.getcwd()]: for root, _, files in os.walk(base): if file_path in files: return os.path.abspath(os.path.join(root, file_path)) return None def _resolve_glob_pattern(pattern: str) -> list[str]: """Resolve a glob pattern to a list of file paths.""" results: list[str] = [] for base in [os.getcwd(), PROJECT_ROOT]: for match in glob.glob(os.path.join(base, pattern), recursive=True): if os.path.isfile(match): abs_path = os.path.abspath(match) if abs_path not in results: results.append(abs_path) return results def _is_image_file(file_path: str) -> bool: return Path(file_path).suffix.lower() in IMAGE_EXTENSIONS def _get_mime_type(file_path: str) -> str: suffix = Path(file_path).suffix.lower() mime_map = { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".gif": "image/gif", ".webp": "image/webp", ".bmp": "image/bmp", } return mime_map.get(suffix, "application/octet-stream") def _read_file_as_base64(file_path: str) -> str: with open(file_path, "rb") as f: return base64.b64encode(f.read()).decode("ascii") def _read_text_file(file_path: str) -> str: for encoding in ["utf-8", "latin-1", "cp1252"]: try: with open(file_path, "r", encoding=encoding) as f: return f.read() except UnicodeDecodeError: continue return _read_file_as_base64(file_path) def _build_content_with_files( prompt: str, file_paths: list[str] ) -> tuple[list[dict[str, Any]], list[str], list[str], int, str]: """ Build content array. Returns (content_parts, text_files, image_files, total_chars, text_only_prompt). text_only_prompt is used for session storage (without base64 data). """ content_parts: list[dict[str, Any]] = [] text_files: list[str] = [] image_files: list[str] = [] total_chars = len(prompt) text_parts_for_session: list[str] = [] for file_path in file_paths: file_name = Path(file_path).name if _is_image_file(file_path): mime_type = _get_mime_type(file_path) b64_data = _read_file_as_base64(file_path) content_parts.append({ "type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64_data}"} }) image_files.append(file_name) text_parts_for_session.append(f"[Image: {file_name}]") else: text_content = _read_text_file(file_path) total_chars += len(text_content) file_text = f"=== File: {file_name} ===\n{text_content}" content_parts.append({"type": "text", "text": file_text}) text_files.append(file_name) text_parts_for_session.append(file_text) content_parts.append({"type": "text", "text": prompt}) text_parts_for_session.append(prompt) session_text = "\n\n".join(text_parts_for_session) return content_parts, text_files, image_files, total_chars, session_text def _select_model(total_chars: int, history_chars: int, override: Optional[str]) -> str: """Select model based on total content length.""" if override: return override combined = total_chars + history_chars return MODEL_LONG_TEXT if combined > LONG_TEXT_THRESHOLD else MODEL_PRIMARY def _format_error(e: Exception) -> str: if isinstance(e, httpx.HTTPStatusError): status = e.response.status_code error_map = { 401: "Authentication failed.", 404: "Model not found.", 429: "Rate limit exceeded.", 502: "Backend temporarily unavailable.", 503: "Service unavailable.", } return error_map.get(status, f"HTTP {status}") elif isinstance(e, httpx.TimeoutException): return "Request timed out." elif isinstance(e, httpx.ConnectError): return "Cannot connect to API." return str(e) async def _request_with_retry( payload: dict[str, Any], models_to_try: list[str], ) -> tuple[dict[str, Any], str, list[str]]: """Try request with retries and model fallback. Returns (result, used_model, errors).""" errors: list[str] = [] for model in models_to_try: payload["model"] = model for attempt in range(MAX_RETRIES): try: result = await _make_api_request( "/v1/chat/completions", method="POST", json_data=payload, stream=True ) if result.get("content"): return result, model, errors errors.append(f"{model}: Empty response (attempt {attempt + 1})") except Exception as e: errors.append(f"{model}: {_format_error(e)} (attempt {attempt + 1})") return {"content": "", "reasoning": None}, models_to_try[-1], errors @mcp.tool( name="gemini_chat", annotations={ "title": "Chat with Gemini", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": False, "openWorldHint": True, }, ) async def gemini_chat(params: GeminiChatInput) -> str: """ Send a message to Google Gemini and get a response. Args: params (GeminiChatInput): Chat parameters including: - prompt (str): The prompt to send - file (Optional[list[str]]): Files to include (text, code, images) - session_id (Optional[str]): Session ID for multi-turn chat, use 'last' for recent - model (Optional[str]): Override model selection - system_prompt (Optional[str]): System context - temperature (Optional[float]): Creativity (0.0-2.0) - max_tokens (Optional[int]): Max response length - response_format: Output format - 'markdown' or 'json' Returns: str: Response with SESSION_ID for continuation. Examples: - Simple: prompt="What is AI?" - With file: prompt="Review", file=["main.py"] - With image: prompt="Describe", file=["photo.jpg"] - Continue: prompt="Tell me more", session_id="last" """ try: messages: list[dict[str, Any]] = [] text_files: list[str] = [] image_files: list[str] = [] not_found: list[str] = [] total_chars = len(params.prompt) session_user_text = params.prompt history_chars = 0 # Handle session continuation session_id = params.session_id is_continuation = False if session_id: history = sessions.get(session_id) if history: messages.extend(history) history_chars = sessions.get_history_length(session_id) is_continuation = True session_id = sessions.get_actual_id(session_id) or session_id else: return f"Error: Session '{session_id}' not found." else: session_id = sessions.create() # Add system prompt if params.system_prompt and not is_continuation: messages.append({"role": "system", "content": params.system_prompt}) # Process files if params.file: resolved_files: list[str] = [] for file_path in params.file: if any(c in file_path for c in ["*", "?"]): matches = _resolve_glob_pattern(file_path) resolved_files.extend(matches) if matches else not_found.append(file_path) else: resolved = _resolve_file_path(file_path) resolved_files.append(resolved) if resolved else not_found.append(file_path) seen: set[str] = set() unique_files = [f for f in resolved_files if not (f in seen or seen.add(f))] if unique_files: content_parts, text_files, image_files, total_chars, session_user_text = ( _build_content_with_files(params.prompt, unique_files) ) user_message: dict[str, Any] = {"role": "user", "content": content_parts} elif not_found and not resolved_files: return f"Error: Files not found: {', '.join(not_found)}" else: user_message = {"role": "user", "content": params.prompt} else: user_message = {"role": "user", "content": params.prompt} messages.append(user_message) # Select model based on total content selected_model = _select_model(total_chars, history_chars, params.model) models_to_try = [selected_model] if selected_model != MODEL_FALLBACK: models_to_try.append(MODEL_FALLBACK) # Build payload (stream=True, thinking=high, web_search=True are fixed) payload: dict[str, Any] = { "model": selected_model, "messages": messages, "stream": True, "reasoning_effort": "high", "tools": [{"type": "google_search"}], } if params.temperature is not None: payload["temperature"] = params.temperature if params.max_tokens is not None: payload["max_tokens"] = params.max_tokens # Make request with retry result, used_model, errors = await _request_with_retry(payload, models_to_try) content = result.get("content", "") reasoning = result.get("reasoning") if not content: error_detail = "; ".join(errors) if errors else "Unknown error" return f"Error: All models failed. Details: {error_detail}" # Save to session (text only, no base64) sessions.append(session_id, session_user_text, content) # Format response if params.response_format == ResponseFormat.JSON: response_data: dict[str, Any] = { "session_id": session_id, "model": used_model, "content": content, } if reasoning: response_data["reasoning"] = reasoning if text_files or image_files: response_data["files"] = {"text": text_files, "images": image_files} if not_found: response_data["files_not_found"] = not_found if errors: response_data["retry_errors"] = errors return json.dumps(response_data, ensure_ascii=False, indent=2) else: lines = [f"## Gemini Response\n"] lines.append(f"**Session**: `{session_id}` | **Model**: `{used_model}`\n") if text_files or image_files: file_info = [] if text_files: file_info.append(f"{len(text_files)} text") if image_files: file_info.append(f"{len(image_files)} image") lines.append(f"**Files**: {', '.join(file_info)}\n") if not_found: lines.append(f"**Not found**: {', '.join(not_found)}\n") if errors: lines.append(f"**Retries**: {len(errors)}\n") if reasoning: lines.append("\n### Thinking\n") lines.append(f"{reasoning[:2000]}{'...' if len(reasoning) > 2000 else ''}\n") lines.append("\n### Response\n") lines.append(content) return "\n".join(lines) except Exception as e: return f"Error: {type(e).__name__}: {e}" @mcp.tool( name="gemini_list_models", annotations={ "title": "List Gemini Models", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True, }, ) async def gemini_list_models(params: GeminiListModelsInput) -> str: """List available Gemini models.""" try: result = await _make_api_request("/v1/models") if not isinstance(result, dict): return "Error: Unexpected response format" models = result.get("data", []) if params.filter_text: filter_lower = params.filter_text.lower() models = [ m for m in models if filter_lower in m.get("id", "").lower() or filter_lower in m.get("display_name", "").lower() ] if not models: return f"No models found{f' matching {params.filter_text}' if params.filter_text else ''}" if params.response_format == ResponseFormat.JSON: return json.dumps({"count": len(models), "models": models}, ensure_ascii=False, indent=2) else: lines = ["# Available Models\n", f"**Total**: {len(models)}\n"] for m in models: lines.append(f"- `{m.get('id')}` - {m.get('display_name', 'N/A')}") return "\n".join(lines) except Exception as e: return f"Error: {_format_error(e)}" def main() -> None: print("Gemini MCP Server starting...", file=sys.stderr) print(f"API: {API_BASE_URL} | Primary: {MODEL_PRIMARY} | Long: {MODEL_LONG_TEXT}", file=sys.stderr) mcp.run() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xumingjun5208/aistudio-gemini-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server