import os
import asyncio
import json
import sys
from typing import Any
from dotenv import load_dotenv
import google.generativeai as genai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Load environment variables
load_dotenv()
# Configure Gemini
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
print("Warning: GEMINI_API_KEY not found in .env. Please add it to use the agent.")
genai.configure(api_key=api_key)
async def get_user_input(prompt: str) -> str:
"""Non-blocking input using to_thread."""
return await asyncio.to_thread(input, prompt)
async def run_agent():
# Use the current Python executable (from the venv) to run the server
# This is more robust on Windows than relying on 'uv'
python_exe = sys.executable
server_params = StdioServerParameters(
command=python_exe,
args=["server.py"],
env=os.environ.copy()
)
print("\n--- MCP Agent Initialized (Windows/VENV Mode) ---")
print(f"Using Python: {python_exe}")
print("Connecting to MCP server...")
try:
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Fetch tools from MCP server
mcp_tools = await session.list_tools()
print(f"Connected. Tools (MCP): {[t.name for t in mcp_tools.tools]}")
# Dynamic model selection
print("\n🔍 Buscando modelos disponibles...")
# genai.list_models() is sync, run in thread to avoid blocking MCP session
models = await asyncio.to_thread(genai.list_models)
available_models = [m.name for m in models if 'generateContent' in m.supported_generation_methods]
print("\nModelos Disponibles:")
for i, m_name in enumerate(available_models[:10]):
print(f" [{i}] {m_name}")
default_model = "models/gemini-2.0-flash-exp" # Updated to a more common default
if default_model not in available_models:
# fallback to flash 2.0 or first available
available_flash = [m for m in available_models if "flash" in m]
default_model = available_flash[0] if available_flash else available_models[0]
choice = await get_user_input(f"\nElige un modelo (índice) o Enter para '{default_model}': ")
selected_model = default_model
if choice.strip().isdigit():
idx = int(choice.strip())
if 0 <= idx < len(available_models):
selected_model = available_models[idx]
print(f"✅ Usando modelo: {selected_model}")
# 1. Mapear herramientas de MCP a formato Gemini
tools_for_gemini = [
{
"function_declarations": [
{
"name": t.name,
"description": t.description,
"parameters": t.inputSchema
} for t in mcp_tools.tools
]
}
]
# 2. Configurar modelo con herramientas
model = genai.GenerativeModel(
model_name=selected_model,
tools=tools_for_gemini,
system_instruction=(
"Eres un Ingeniero Senior Fullstack. Tienes acceso directo al sistema de archivos y terminal vía MCP. "
"Regla de oro: Antes de usar una herramienta, narra en español qué vas a hacer. "
"IMPORTANTE: El resultado de tus herramientas debe ser analizado para dar una respuesta coherente."
)
)
# Iniciamos el chat
chat = model.start_chat()
print("\n✅ Agente Autónomo Activo. Type 'exit' to quit.")
while True:
user_input = await get_user_input("\nUser: ")
if user_input.lower() in ["exit", "quit"]:
break
if not user_input.strip():
continue
print(f"Agent thinking...")
try:
# Bucle de interacción para manejar llamadas a herramientas manualmente
message = user_input
while True:
response = await chat.send_message_async(message)
# Comprobar si hay llamadas a funciones en el contenido
has_function_call = False
if response.candidates and response.candidates[0].content.parts:
for part in response.candidates[0].content.parts:
if part.function_call:
has_function_call = True
break
if not has_function_call:
print(f"\nAgent: {response.text}")
break
# Procesar cada llamada a función
tool_responses = []
for part in response.candidates[0].content.parts:
if fn := part.function_call:
print(f" [Auto-Action] Ejecutando {fn.name}...")
try:
# Llamada real al servidor MCP
tool_result = await session.call_tool(fn.name, dict(fn.args))
# Procesar resultado
result_text = ""
if hasattr(tool_result, 'content'):
result_text = "\n".join([c.text for c in tool_result.content if hasattr(c, 'text')])
else:
result_text = str(tool_result)
# Mostrar una previsualización
preview = (result_text[:100] + '...') if len(result_text) > 100 else result_text
print(f" [Output] {preview}")
tool_responses.append({
"function_response": {
"name": fn.name,
"response": {"result": result_text}
}
})
except Exception as e:
print(f" [Error] {e}")
tool_responses.append({
"function_response": {
"name": fn.name,
"response": {"error": str(e)}
}
})
# Enviamos los resultados de las herramientas de vuelta a Gemini
message = tool_responses
except Exception as e:
print(f"Agent Error: {e}")
except Exception as e:
print(f"Connection Error: {e}")
if __name__ == "__main__":
try:
asyncio.run(run_agent())
except KeyboardInterrupt:
print("\nAgent stopped.")
except Exception as e:
print(f"Critical Error: {e}")