Skip to main content
Glama

Bedrock MCP Agent

glue_mcp_server.py18.8 kB
#!/usr/bin/env python3 """ MCP Server para AWS Glue Catalog integrado con Bedrock MCP Agent Proporciona herramientas para consultar el catálogo de datos de AWS Glue """ import json import boto3 from botocore.exceptions import ClientError, NoCredentialsError from typing import Any, Dict, List, Optional import os from dotenv import load_dotenv # Cargar variables de entorno load_dotenv() try: from mcp.server.fastmcp import FastMCP mcp_available = True except ImportError: print("⚠️ Módulo MCP no disponible. Instala con: pip install mcp") mcp_available = False class GlueCatalogMCP: """ Clase principal para el servidor MCP de AWS Glue Catalog """ def __init__(self): self.aws_region = os.getenv('AWS_DEFAULT_REGION', 'us-east-1') self.mcp = FastMCP("AWS Glue Catalog Server") if mcp_available else None self._setup_tools() def get_glue_client(self): """ Crea y retorna un cliente de AWS Glue con las credenciales configuradas. """ try: # Usar credenciales desde variables de entorno o AWS CLI client = boto3.client('glue', region_name=self.aws_region) return client except Exception as e: print(f"❌ Error creando cliente de Glue: {e}") return None def _setup_tools(self): """Configura las herramientas MCP""" if not self.mcp: return @self.mcp.tool() def list_glue_databases(): """ Lista todas las bases de datos en el catálogo de AWS Glue. """ return self.list_databases() @self.mcp.tool() def get_database_details(database_name: str): """ Obtiene los detalles de una base de datos específica en el catálogo de Glue. Args: database_name: Nombre de la base de datos """ return self.get_database_info(database_name) @self.mcp.tool() def list_tables_in_database(database_name: str): """ Lista todas las tablas en una base de datos específica del catálogo de Glue. Args: database_name: Nombre de la base de datos """ return self.list_tables(database_name) @self.mcp.tool() def get_table_details(database_name: str, table_name: str): """ Obtiene los detalles completos de una tabla específica. Args: database_name: Nombre de la base de datos table_name: Nombre de la tabla """ return self.get_table_info(database_name, table_name) @self.mcp.tool() def search_tables_by_name(search_term: str): """ Busca tablas en todas las bases de datos que contengan el término de búsqueda. Args: search_term: Término a buscar en los nombres de las tablas """ return self.search_tables(search_term) @self.mcp.tool() def get_glue_catalog_stats(): """ Obtiene estadísticas generales del catálogo de Glue. """ return self.get_catalog_statistics() def list_databases(self) -> str: """Lista todas las bases de datos en el catálogo de AWS Glue""" try: glue_client = self.get_glue_client() if not glue_client: return json.dumps({ "error": "No se pudo crear el cliente de AWS Glue. Verifica las credenciales.", "status": "failed" }, indent=2) response = glue_client.get_databases() databases = [] for db in response.get('DatabaseList', []): databases.append({ "name": db.get('Name', ''), "description": db.get('Description', 'Sin descripción'), "location_uri": db.get('LocationUri', ''), "create_time": str(db.get('CreateTime', '')), "parameters": db.get('Parameters', {}) }) return json.dumps({ "status": "success", "region": self.aws_region, "total_databases": len(databases), "databases": databases }, indent=2, default=str) except ClientError as e: error_code = e.response['Error']['Code'] error_message = e.response['Error']['Message'] return json.dumps({ "error": f"Error de AWS Glue [{error_code}]: {error_message}", "status": "failed" }, indent=2) except NoCredentialsError: return json.dumps({ "error": "Credenciales de AWS no encontradas o inválidas", "status": "failed" }, indent=2) except Exception as e: return json.dumps({ "error": f"Error inesperado: {str(e)}", "status": "failed" }, indent=2) def get_database_info(self, database_name: str) -> str: """Obtiene información detallada de una base de datos""" try: glue_client = self.get_glue_client() if not glue_client: return json.dumps({ "error": "No se pudo crear el cliente de AWS Glue.", "status": "failed" }, indent=2) response = glue_client.get_database(Name=database_name) db = response.get('Database', {}) database_info = { "name": db.get('Name', ''), "description": db.get('Description', 'Sin descripción'), "location_uri": db.get('LocationUri', ''), "create_time": str(db.get('CreateTime', '')), "parameters": db.get('Parameters', {}), "catalog_id": db.get('CatalogId', '') } return json.dumps({ "status": "success", "database": database_info }, indent=2, default=str) except ClientError as e: error_code = e.response['Error']['Code'] error_message = e.response['Error']['Message'] return json.dumps({ "error": f"Error de AWS Glue [{error_code}]: {error_message}", "status": "failed" }, indent=2) except Exception as e: return json.dumps({ "error": f"Error inesperado: {str(e)}", "status": "failed" }, indent=2) def list_tables(self, database_name: str) -> str: """Lista todas las tablas en una base de datos específica""" try: glue_client = self.get_glue_client() if not glue_client: return json.dumps({ "error": "No se pudo crear el cliente de AWS Glue.", "status": "failed" }, indent=2) # Usar paginación para obtener todas las tablas paginator = glue_client.get_paginator('get_tables') page_iterator = paginator.paginate(DatabaseName=database_name) tables = [] for page in page_iterator: for table in page.get('TableList', []): tables.append({ "name": table.get('Name', ''), "database_name": table.get('DatabaseName', ''), "owner": table.get('Owner', ''), "create_time": str(table.get('CreateTime', '')), "update_time": str(table.get('UpdateTime', '')), "table_type": table.get('TableType', ''), "storage_descriptor": { "location": table.get('StorageDescriptor', {}).get('Location', ''), "input_format": table.get('StorageDescriptor', {}).get('InputFormat', ''), "output_format": table.get('StorageDescriptor', {}).get('OutputFormat', ''), "serde_info": table.get('StorageDescriptor', {}).get('SerdeInfo', {}) }, "parameters": table.get('Parameters', {}), "partition_keys": [ { "name": key.get('Name', ''), "type": key.get('Type', ''), "comment": key.get('Comment', '') } for key in table.get('PartitionKeys', []) ] }) return json.dumps({ "status": "success", "database_name": database_name, "total_tables": len(tables), "tables": tables }, indent=2, default=str) except ClientError as e: error_code = e.response['Error']['Code'] error_message = e.response['Error']['Message'] return json.dumps({ "error": f"Error de AWS Glue [{error_code}]: {error_message}", "status": "failed" }, indent=2) except Exception as e: return json.dumps({ "error": f"Error inesperado: {str(e)}", "status": "failed" }, indent=2) def get_table_info(self, database_name: str, table_name: str) -> str: """Obtiene información detallada de una tabla específica""" try: glue_client = self.get_glue_client() if not glue_client: return json.dumps({ "error": "No se pudo crear el cliente de AWS Glue.", "status": "failed" }, indent=2) response = glue_client.get_table(DatabaseName=database_name, Name=table_name) table = response.get('Table', {}) storage_desc = table.get('StorageDescriptor', {}) # Obtener información de las columnas columns = [] for col in storage_desc.get('Columns', []): columns.append({ "name": col.get('Name', ''), "type": col.get('Type', ''), "comment": col.get('Comment', '') }) table_details = { "name": table.get('Name', ''), "database_name": table.get('DatabaseName', ''), "owner": table.get('Owner', ''), "create_time": str(table.get('CreateTime', '')), "update_time": str(table.get('UpdateTime', '')), "last_access_time": str(table.get('LastAccessTime', '')), "table_type": table.get('TableType', ''), "storage_descriptor": { "columns": columns, "location": storage_desc.get('Location', ''), "input_format": storage_desc.get('InputFormat', ''), "output_format": storage_desc.get('OutputFormat', ''), "compressed": storage_desc.get('Compressed', False), "number_of_buckets": storage_desc.get('NumberOfBuckets', 0), "serde_info": storage_desc.get('SerdeInfo', {}), "bucket_columns": storage_desc.get('BucketColumns', []), "sort_columns": storage_desc.get('SortColumns', []) }, "partition_keys": [ { "name": key.get('Name', ''), "type": key.get('Type', ''), "comment": key.get('Comment', '') } for key in table.get('PartitionKeys', []) ], "parameters": table.get('Parameters', {}), "retention": table.get('Retention', 0) } return json.dumps({ "status": "success", "table": table_details }, indent=2, default=str) except ClientError as e: error_code = e.response['Error']['Code'] error_message = e.response['Error']['Message'] return json.dumps({ "error": f"Error de AWS Glue [{error_code}]: {error_message}", "status": "failed" }, indent=2) except Exception as e: return json.dumps({ "error": f"Error inesperado: {str(e)}", "status": "failed" }, indent=2) def search_tables(self, search_term: str) -> str: """Busca tablas por nombre en todas las bases de datos""" try: glue_client = self.get_glue_client() if not glue_client: return json.dumps({ "error": "No se pudo crear el cliente de AWS Glue.", "status": "failed" }, indent=2) # Primero obtener todas las bases de datos db_response = glue_client.get_databases() databases = [db['Name'] for db in db_response.get('DatabaseList', [])] found_tables = [] # Buscar en cada base de datos for db_name in databases: try: paginator = glue_client.get_paginator('get_tables') page_iterator = paginator.paginate(DatabaseName=db_name) for page in page_iterator: for table in page.get('TableList', []): table_name = table.get('Name', '') if search_term.lower() in table_name.lower(): found_tables.append({ "database_name": db_name, "table_name": table_name, "table_type": table.get('TableType', ''), "create_time": str(table.get('CreateTime', '')), "location": table.get('StorageDescriptor', {}).get('Location', '') }) except ClientError: # Si no se puede acceder a una base de datos, continuar continue return json.dumps({ "status": "success", "search_term": search_term, "total_found": len(found_tables), "tables": found_tables }, indent=2, default=str) except Exception as e: return json.dumps({ "error": f"Error inesperado: {str(e)}", "status": "failed" }, indent=2) def get_catalog_statistics(self) -> str: """Obtiene estadísticas generales del catálogo de Glue""" try: glue_client = self.get_glue_client() if not glue_client: return json.dumps({ "error": "No se pudo crear el cliente de AWS Glue.", "status": "failed" }, indent=2) # Obtener bases de datos db_response = glue_client.get_databases() databases = db_response.get('DatabaseList', []) total_tables = 0 database_stats = [] # Contar tablas por base de datos for db in databases: db_name = db['Name'] try: table_response = glue_client.get_tables(DatabaseName=db_name) table_count = len(table_response.get('TableList', [])) total_tables += table_count database_stats.append({ "database_name": db_name, "table_count": table_count, "description": db.get('Description', 'Sin descripción') }) except ClientError: database_stats.append({ "database_name": db_name, "table_count": "Error al acceder", "description": db.get('Description', 'Sin descripción') }) return json.dumps({ "status": "success", "region": self.aws_region, "catalog_stats": { "total_databases": len(databases), "total_tables": total_tables, "database_breakdown": database_stats } }, indent=2, default=str) except Exception as e: return json.dumps({ "error": f"Error inesperado: {str(e)}", "status": "failed" }, indent=2) def run_server(self): """Ejecuta el servidor MCP""" if not self.mcp: print("❌ Servidor MCP no disponible. Instala el módulo MCP.") return print("🚀 Iniciando MCP Server para AWS Glue Catalog...") print(f"🌍 Región configurada: {self.aws_region}") self.mcp.run() # Integración con el agente principal def integrate_glue_mcp_with_bedrock(bedrock_agent): """ Integra las funcionalidades de Glue MCP con el agente de Bedrock """ glue_mcp = GlueCatalogMCP() # Añadir métodos de Glue al agente de Bedrock bedrock_agent.glue_list_databases = glue_mcp.list_databases bedrock_agent.glue_get_database_info = glue_mcp.get_database_info bedrock_agent.glue_list_tables = glue_mcp.list_tables bedrock_agent.glue_get_table_info = glue_mcp.get_table_info bedrock_agent.glue_search_tables = glue_mcp.search_tables bedrock_agent.glue_get_catalog_stats = glue_mcp.get_catalog_statistics return glue_mcp def main(): """Función principal para ejecutar el servidor MCP standalone""" print("🧠 AWS Glue Catalog MCP Server") print("=" * 40) if not mcp_available: print("❌ Para usar el servidor MCP, instala:") print(" pip install mcp") return glue_mcp = GlueCatalogMCP() # Ejecutar servidor MCP glue_mcp.run_server() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/harold-moncaleano/bedrock-mcp-agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server