from mcp.server.fastmcp import FastMCP
import pyodbc
from typing import List, Optional, Dict, Any
import os
from dotenv import load_dotenv
# Cargar variables de entorno
load_dotenv()
# Inicializar servidor MCP
mcp = FastMCP("SQLServer-RetailPOS")
# Configuración de la base de datos SQL Server
DB_SERVER = os.getenv("LEGACY_DB_SERVER", "DESKTOP-P1HCOEO\\SQLEXPRESS")
DB_NAME = os.getenv("LEGACY_DB_NAME", "RetailPOS_DB")
DB_USER = os.getenv("LEGACY_DB_USER", "sa")
DB_PASSWORD = os.getenv("LEGACY_DB_PASSWORD", "melvi1234")
DB_DRIVER = os.getenv("LEGACY_DB_DRIVER", "{ODBC Driver 17 for SQL Server}")
def get_connection():
"""Establecer conexión a SQL Server con timeout extendido"""
try:
connection_string = f"""
DRIVER={DB_DRIVER};
SERVER={DB_SERVER};
DATABASE={DB_NAME};
UID={DB_USER};
PWD={DB_PASSWORD};
TrustServerCertificate=yes;
Connection Timeout=10;
Login Timeout=10;
"""
conn = pyodbc.connect(connection_string, timeout=10)
return conn
except pyodbc.Error as e:
error_msg = str(e)
if "timeout" in error_msg.lower():
raise Exception(f"Timeout al conectar a SQL Server {DB_SERVER}. Verifica que SQL Server esté corriendo y sea accesible.")
elif "login failed" in error_msg.lower():
raise Exception(f"Error de autenticación. Verifica usuario '{DB_USER}' y contraseña.")
elif "cannot open database" in error_msg.lower():
raise Exception(f"No se puede abrir la base de datos '{DB_NAME}'. Verifica que exista.")
else:
raise Exception(f"Error conectando a SQL Server: {error_msg}")
except Exception as e:
raise Exception(f"Error inesperado al conectar: {str(e)}")
@mcp.tool()
async def execute_sql_query(query: str) -> Dict[str, Any]:
"""
Ejecutar una consulta SQL en la base de datos RetailPOS_DB.
Ejemplos:
- "SELECT TOP 10 * FROM Products"
- "SELECT COUNT(*) FROM Sales WHERE SaleDate >= GETDATE()-30"
- "SELECT * FROM Customers WHERE City = 'Lima'"
"""
try:
conn = get_connection()
cursor = conn.cursor()
# Ejecutar consulta
cursor.execute(query)
# Obtener nombres de columnas
columns = [column[0] for column in cursor.description]
# Obtener resultados
rows = cursor.fetchall()
# Convertir a lista de diccionarios
results = []
for row in rows:
result_dict = {}
for i, column in enumerate(columns):
value = row[i]
# Convertir tipos de datos para JSON serialización
if isinstance(value, bytes):
result_dict[column] = str(value)
elif hasattr(value, 'strftime'):
result_dict[column] = value.isoformat()
else:
result_dict[column] = value
results.append(result_dict)
cursor.close()
conn.close()
return {
"success": True,
"query": query,
"columns": columns,
"results": results,
"row_count": len(results)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"query": query
}
@mcp.tool()
async def get_database_schema() -> Dict[str, Any]:
"""
Obtener el esquema completo de la base de datos RetailPOS_DB.
Muestra todas las tablas, columnas, tipos de datos y relaciones.
"""
try:
conn = get_connection()
cursor = conn.cursor()
schema = {}
# Obtener todas las tablas
cursor.execute("""
SELECT TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME
""")
tables = cursor.fetchall()
for table in tables:
table_name = table[0]
# Obtener columnas de cada tabla
cursor.execute(f"""
SELECT
COLUMN_NAME,
DATA_TYPE,
IS_NULLABLE,
COLUMN_DEFAULT,
CHARACTER_MAXIMUM_LENGTH
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
ORDER BY ORDINAL_POSITION
""")
columns = cursor.fetchall()
schema[table_name] = {
"columns": [
{
"name": col[0],
"type": col[1],
"nullable": col[2] == "YES",
"default": col[3],
"max_length": col[4]
}
for col in columns
]
}
cursor.close()
conn.close()
return {
"success": True,
"database": DB_NAME,
"server": DB_SERVER,
"schema": schema,
"table_count": len(schema)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@mcp.tool()
async def get_tables_list() -> Dict[str, Any]:
"""
Obtener la lista de todas las tablas en la base de datos.
"""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT TABLE_NAME,
(SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = t.TABLE_NAME) as column_count
FROM INFORMATION_SCHEMA.TABLES t
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME
""")
tables = [{"name": row[0], "columns": row[1]} for row in cursor.fetchall()]
cursor.close()
conn.close()
return {
"success": True,
"database": DB_NAME,
"tables": tables,
"total_tables": len(tables)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@mcp.tool()
async def get_table_data(table_name: str, limit: int = 10) -> Dict[str, Any]:
"""
Obtener datos de una tabla específica.
Args:
table_name: Nombre de la tabla
limit: Número máximo de filas a retornar (default: 10)
"""
try:
conn = get_connection()
cursor = conn.cursor()
# Verificar si la tabla existe
cursor.execute("""
SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = ? AND TABLE_TYPE = 'BASE TABLE'
""", table_name)
if cursor.fetchone()[0] == 0:
return {
"success": False,
"error": f"La tabla '{table_name}' no existe"
}
# Obtener datos
query = f"SELECT TOP {limit} * FROM {table_name}"
cursor.execute(query)
# Obtener nombres de columnas
columns = [column[0] for column in cursor.description]
# Obtener resultados
rows = cursor.fetchall()
# Convertir a lista de diccionarios
results = []
for row in rows:
result_dict = {}
for i, column in enumerate(columns):
value = row[i]
if isinstance(value, bytes):
result_dict[column] = str(value)
elif hasattr(value, 'strftime'):
result_dict[column] = value.isoformat()
else:
result_dict[column] = value
results.append(result_dict)
cursor.close()
conn.close()
return {
"success": True,
"table": table_name,
"columns": columns,
"results": results,
"row_count": len(results),
"query": query
}
except Exception as e:
return {
"success": False,
"error": str(e),
"table": table_name
}
@mcp.resource("sqlserver://tables")
async def get_tables_resource() -> str:
"""Recurso que devuelve la lista de tablas en formato JSON."""
result = await get_tables_list()
return str(result)
@mcp.resource("sqlserver://schema")
async def get_schema_resource() -> str:
"""Recurso que devuelve el esquema completo en formato JSON."""
result = await get_database_schema()
return str(result)
if __name__ == "__main__":
mcp.run()