HANA Cloud MCP Server

""" HANA Cloud MCP Server --------------------- This implementation provides a Model Context Protocol (MCP) server for HANA Cloud DB that can be integrated with Cursor IDE. """ import os import json import logging from flask import Flask, request, jsonify from hdbcli import dbapi from contextlib import contextmanager import pandas as pd from typing import Dict, List, Any, Optional, Tuple from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) app = Flask(__name__) # HANA Cloud connection configuration HANA_CONFIG = { 'address': os.environ.get('HANA_HOST'), 'port': '443', 'user': os.environ.get('HANA_USER'), 'password': os.environ.get('HANA_PASSWORD'), 'encrypt': True, # Always use encryption for cloud connections 'sslValidateCertificate': True } # MCP Server Configuration MCP_CONFIG = { 'MODEL_SCHEMA': os.environ.get('MODEL_SCHEMA', 'MCP_MODELS'), 'CONTEXT_SCHEMA': os.environ.get('CONTEXT_SCHEMA', 'MCP_CONTEXTS'), 'PROTOCOL_SCHEMA': os.environ.get('PROTOCOL_SCHEMA', 'MCP_PROTOCOLS'), 'max_connections': int(os.environ.get('MAX_CONNECTIONS', '10')), } # --------- Connection Management --------- @contextmanager def get_hana_connection(): """Context manager for HANA Cloud DB connections""" conn = None try: conn = dbapi.connect( address=HANA_CONFIG['address'], port=HANA_CONFIG['port'], user=HANA_CONFIG['user'], password=HANA_CONFIG['password'], encrypt=HANA_CONFIG['encrypt'], sslValidateCertificate=HANA_CONFIG['sslValidateCertificate'] ) logger.info(f"Connected to HANA Cloud at {HANA_CONFIG['address']}:{HANA_CONFIG['port']}") yield conn except dbapi.Error as e: logger.error(f"HANA connection error: {e}") raise finally: if conn: conn.close() logger.info("HANA connection closed") # --------- Model Layer --------- class ModelManager: """Manages predictive models and their metadata""" @staticmethod def register_model(model_name: str, model_metadata: Dict) -> bool: """ Register a new model in the model registry Args: model_name: Name of the model model_metadata: Dictionary containing model structure and metadata Returns: bool: Success status """ try: with get_hana_connection() as conn: cursor = conn.cursor() # Check if model already exists cursor.execute(f""" SELECT COUNT(*) FROM {MCP_CONFIG['MODEL_SCHEMA']}.MODEL_REGISTRY WHERE MODEL_NAME = ? """, (model_name,)) if cursor.fetchone()[0] > 0: logger.warning(f"Model {model_name} already exists") return False # Create model entry cursor.execute(f""" INSERT INTO {MCP_CONFIG['MODEL_SCHEMA']}.MODEL_REGISTRY (MODEL_NAME, MODEL_VERSION, MODEL_TYPE, FRAMEWORK, CREATION_DATE, METADATA, ACTIVE) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, ?, ?) """, ( model_name, model_metadata.get('version', '1.0.0'), model_metadata.get('type', 'classification'), model_metadata.get('framework', 'sklearn'), json.dumps(model_metadata), model_metadata.get('active', True) )) conn.commit() logger.info(f"Model {model_name} registered successfully") return True except Exception as e: logger.error(f"Error registering model {model_name}: {e}") return False @staticmethod def get_model(model_name: str) -> Optional[Dict]: """Retrieve a model by name""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(f""" SELECT MODEL_NAME, MODEL_VERSION, MODEL_TYPE, FRAMEWORK, CREATION_DATE, METADATA, ACTIVE FROM {MCP_CONFIG['MODEL_SCHEMA']}.MODEL_REGISTRY WHERE MODEL_NAME = ? """, (model_name,)) result = cursor.fetchone() if not result: return None return { 'model_name': result[0], 'version': result[1], 'type': result[2], 'framework': result[3], 'creation_date': result[4].isoformat() if result[4] else None, 'metadata': json.loads(result[5]) if result[5] else {}, 'active': bool(result[6]) } except Exception as e: logger.error(f"Error retrieving model {model_name}: {e}") return None @staticmethod def list_models(active_only: bool = False) -> List[Dict]: """List all registered models""" try: with get_hana_connection() as conn: cursor = conn.cursor() query = f""" SELECT MODEL_NAME, MODEL_VERSION, MODEL_TYPE, FRAMEWORK, CREATION_DATE, ACTIVE FROM {MCP_CONFIG['MODEL_SCHEMA']}.MODEL_REGISTRY """ if active_only: query += " WHERE ACTIVE = TRUE" query += " ORDER BY MODEL_NAME" cursor.execute(query) models = [] for row in cursor.fetchall(): models.append({ 'model_name': row[0], 'version': row[1], 'type': row[2], 'framework': row[3], 'creation_date': row[4].isoformat() if row[4] else None, 'active': bool(row[5]) }) return models except Exception as e: logger.error(f"Error listing models: {e}") return [] @staticmethod def update_model_status(model_name: str, active: bool) -> bool: """Update a model's active status""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(f""" UPDATE {MCP_CONFIG['MODEL_SCHEMA']}.MODEL_REGISTRY SET ACTIVE = ? WHERE MODEL_NAME = ? """, (active, model_name)) if cursor.rowcount > 0: conn.commit() logger.info(f"Model {model_name} status updated to {'active' if active else 'inactive'}") return True logger.warning(f"Model {model_name} not found for status update") return False except Exception as e: logger.error(f"Error updating model {model_name} status: {e}") return False @staticmethod def delete_model(model_name: str) -> bool: """Delete a model by name""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(f""" DELETE FROM {MCP_CONFIG['MODEL_SCHEMA']}.MODEL_REGISTRY WHERE MODEL_NAME = ? """, (model_name,)) if cursor.rowcount > 0: conn.commit() logger.info(f"Model {model_name} deleted successfully") return True logger.warning(f"Model {model_name} not found for deletion") return False except Exception as e: logger.error(f"Error deleting model {model_name}: {e}") return False # --------- Context Layer --------- class ContextManager: """Manages execution contexts for models""" @staticmethod def create_context(context_name: str, model_name: str, context_config: Dict) -> bool: """ Create a new execution context for a model Args: context_name: Name of the context model_name: Reference to the registered model context_config: Configuration for context behavior Returns: bool: Success status """ try: with get_hana_connection() as conn: cursor = conn.cursor() # Check if context already exists cursor.execute(f""" SELECT COUNT(*) FROM {MCP_CONFIG['CONTEXT_SCHEMA']}.CONTEXT_REGISTRY WHERE CONTEXT_NAME = ? """, (context_name,)) if cursor.fetchone()[0] > 0: logger.warning(f"Context {context_name} already exists") return False # Check if model exists model = ModelManager.get_model(model_name) if not model: logger.error(f"Model {model_name} not found") return False # Create context entry cursor.execute(f""" INSERT INTO {MCP_CONFIG['CONTEXT_SCHEMA']}.CONTEXT_REGISTRY (CONTEXT_NAME, MODEL_NAME, CONTEXT_TYPE, CONFIG, CREATED_AT, ACTIVE) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, ?) """, ( context_name, model_name, context_config.get('type', 'inference'), json.dumps(context_config), context_config.get('active', True) )) conn.commit() logger.info(f"Context {context_name} created successfully") return True except Exception as e: logger.error(f"Error creating context {context_name}: {e}") return False @staticmethod def get_context(context_name: str) -> Optional[Dict]: """Retrieve context configuration by name""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(f""" SELECT c.CONTEXT_NAME, c.MODEL_NAME, c.CONTEXT_TYPE, c.CONFIG, c.CREATED_AT, c.ACTIVE FROM {MCP_CONFIG['CONTEXT_SCHEMA']}.CONTEXT_REGISTRY c WHERE c.CONTEXT_NAME = ? """, (context_name,)) result = cursor.fetchone() if not result: return None return { 'context_name': result[0], 'model_name': result[1], 'context_type': result[2], 'config': json.loads(result[3]) if result[3] else {}, 'created_at': result[4].isoformat() if result[4] else None, 'active': bool(result[5]) } except Exception as e: logger.error(f"Error retrieving context {context_name}: {e}") return None @staticmethod def list_contexts(active_only: bool = False) -> List[Dict]: """List all available contexts with basic info""" try: with get_hana_connection() as conn: cursor = conn.cursor() query = f""" SELECT CONTEXT_NAME, MODEL_NAME, CONTEXT_TYPE, CREATED_AT, ACTIVE FROM {MCP_CONFIG['CONTEXT_SCHEMA']}.CONTEXT_REGISTRY """ if active_only: query += " WHERE ACTIVE = TRUE" query += " ORDER BY CONTEXT_NAME" cursor.execute(query) contexts = [] for row in cursor.fetchall(): contexts.append({ 'context_name': row[0], 'model_name': row[1], 'context_type': row[2], 'created_at': row[3].isoformat() if row[3] else None, 'active': bool(row[4]) }) return contexts except Exception as e: logger.error(f"Error listing contexts: {e}") return [] @staticmethod def update_context_status(context_name: str, active: bool) -> bool: """Update a context's active status""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(f""" UPDATE {MCP_CONFIG['CONTEXT_SCHEMA']}.CONTEXT_REGISTRY SET ACTIVE = ? WHERE CONTEXT_NAME = ? """, (active, context_name)) if cursor.rowcount > 0: conn.commit() logger.info(f"Context {context_name} status updated to {'active' if active else 'inactive'}") return True logger.warning(f"Context {context_name} not found for status update") return False except Exception as e: logger.error(f"Error updating context {context_name} status: {e}") return False @staticmethod def delete_context(context_name: str) -> bool: """Delete a context by name""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(f""" DELETE FROM {MCP_CONFIG['CONTEXT_SCHEMA']}.CONTEXT_REGISTRY WHERE CONTEXT_NAME = ? """, (context_name,)) if cursor.rowcount > 0: conn.commit() logger.info(f"Context {context_name} deleted successfully") return True logger.warning(f"Context {context_name} not found for deletion") return False except Exception as e: logger.error(f"Error deleting context {context_name}: {e}") return False @staticmethod def execute_in_context(context_name: str, input_data: Dict) -> Optional[Dict]: """ Execute a request in the specified context Args: context_name: Name of the context to use input_data: Input data for the execution Returns: Optional[Dict]: Result of the execution or None on error """ try: context = ContextManager.get_context(context_name) if not context: logger.error(f"Context {context_name} not found") return None # Get associated model model = ModelManager.get_model(context['model_name']) if not model: logger.error(f"Model {context['model_name']} not found") return None # Determine execution type based on context context_type = context['context_type'] if context_type == 'inference': # Execute inference using Protocol Manager return ProtocolManager.execute_protocol( model=model, context=context, input_data=input_data, operation='inference' ) elif context_type == 'training': # Execute training using Protocol Manager return ProtocolManager.execute_protocol( model=model, context=context, input_data=input_data, operation='training' ) else: logger.error(f"Unsupported context type: {context_type}") return None except Exception as e: logger.error(f"Error executing in context {context_name}: {e}") return None # --------- Protocol Layer --------- class ProtocolManager: """Manages protocol execution for models and contexts""" @staticmethod def register_protocol(protocol_name: str, protocol_type: str, implementation: Dict) -> bool: """ Register a new protocol implementation Args: protocol_name: Name of the protocol protocol_type: Type of the protocol (e.g., 'inference', 'training') implementation: Protocol implementation details Returns: bool: Success status """ try: with get_hana_connection() as conn: cursor = conn.cursor() # Check if protocol already exists cursor.execute(f""" SELECT COUNT(*) FROM {MCP_CONFIG['PROTOCOL_SCHEMA']}.PROTOCOL_REGISTRY WHERE PROTOCOL_NAME = ? """, (protocol_name,)) if cursor.fetchone()[0] > 0: logger.warning(f"Protocol {protocol_name} already exists") return False # Create protocol entry cursor.execute(f""" INSERT INTO {MCP_CONFIG['PROTOCOL_SCHEMA']}.PROTOCOL_REGISTRY (PROTOCOL_NAME, PROTOCOL_TYPE, IMPLEMENTATION, CREATED_AT, ACTIVE) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?) """, ( protocol_name, protocol_type, json.dumps(implementation), implementation.get('active', True) )) conn.commit() logger.info(f"Protocol {protocol_name} registered successfully") return True except Exception as e: logger.error(f"Error registering protocol {protocol_name}: {e}") return False @staticmethod def get_protocol(protocol_name: str) -> Optional[Dict]: """Retrieve protocol by name""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(f""" SELECT PROTOCOL_NAME, PROTOCOL_TYPE, IMPLEMENTATION, CREATED_AT, ACTIVE FROM {MCP_CONFIG['PROTOCOL_SCHEMA']}.PROTOCOL_REGISTRY WHERE PROTOCOL_NAME = ? """, (protocol_name,)) result = cursor.fetchone() if not result: return None return { 'protocol_name': result[0], 'protocol_type': result[1], 'implementation': json.loads(result[2]) if result[2] else {}, 'created_at': result[3].isoformat() if result[3] else None, 'active': bool(result[4]) } except Exception as e: logger.error(f"Error retrieving protocol {protocol_name}: {e}") return None @staticmethod def list_protocols(protocol_type: Optional[str] = None) -> List[Dict]: """List all registered protocols""" try: with get_hana_connection() as conn: cursor = conn.cursor() query = f""" SELECT PROTOCOL_NAME, PROTOCOL_TYPE, CREATED_AT, ACTIVE FROM {MCP_CONFIG['PROTOCOL_SCHEMA']}.PROTOCOL_REGISTRY """ params = [] if protocol_type: query += " WHERE PROTOCOL_TYPE = ?" params.append(protocol_type) query += " ORDER BY PROTOCOL_NAME" cursor.execute(query, params) protocols = [] for row in cursor.fetchall(): protocols.append({ 'protocol_name': row[0], 'protocol_type': row[1], 'created_at': row[2].isoformat() if row[2] else None, 'active': bool(row[3]) }) return protocols except Exception as e: logger.error(f"Error listing protocols: {e}") return [] @staticmethod def delete_protocol(protocol_name: str) -> bool: """Delete a protocol by name""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(f""" DELETE FROM {MCP_CONFIG['PROTOCOL_SCHEMA']}.PROTOCOL_REGISTRY WHERE PROTOCOL_NAME = ? """, (protocol_name,)) if cursor.rowcount > 0: conn.commit() logger.info(f"Protocol {protocol_name} deleted successfully") return True logger.warning(f"Protocol {protocol_name} not found for deletion") return False except Exception as e: logger.error(f"Error deleting protocol {protocol_name}: {e}") return False @staticmethod def execute_protocol(model: Dict, context: Dict, input_data: Dict, operation: str) -> Dict: """ Execute a protocol based on model, context, and operation Args: model: Model configuration context: Context configuration input_data: Input data for the execution operation: Type of operation ('inference', 'training', etc.) Returns: Dict: Result of the protocol execution """ try: # Determine protocol handler based on model framework and operation framework = model.get('framework', 'sklearn') protocol_type = f"{framework}_{operation}" # Find applicable protocol with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(f""" SELECT PROTOCOL_NAME, IMPLEMENTATION FROM {MCP_CONFIG['PROTOCOL_SCHEMA']}.PROTOCOL_REGISTRY WHERE PROTOCOL_TYPE = ? AND ACTIVE = TRUE ORDER BY CREATED_AT DESC LIMIT 1 """, (protocol_type,)) result = cursor.fetchone() if not result: logger.error(f"No active protocol found for {protocol_type}") return {"error": f"No protocol available for {framework} {operation}"} protocol_name = result[0] implementation = json.loads(result[1]) if result[1] else {} # Extract relevant config from context and model context_config = context.get('config', {}) model_metadata = model.get('metadata', {}) # Prepare execution parameters execution_params = { 'protocol_name': protocol_name, 'model_name': model['model_name'], 'model_version': model.get('version', '1.0.0'), 'context_name': context['context_name'], 'input_data': input_data, 'model_metadata': model_metadata, 'context_config': context_config, 'protocol_implementation': implementation } # Execute the appropriate protocol handler if operation == 'inference': return ProtocolManager._handle_inference(execution_params) elif operation == 'training': return ProtocolManager._handle_training(execution_params) else: logger.error(f"Unsupported operation: {operation}") return {"error": f"Unsupported operation: {operation}"} except Exception as e: error_msg = str(e) logger.error(f"Error executing protocol: {error_msg}") return {"error": error_msg} @staticmethod def _handle_inference(params: Dict) -> Dict: """Handle inference protocol execution""" try: implementation = params.get('protocol_implementation', {}) handler_type = implementation.get('handler_type', 'sql') if handler_type == 'sql': # SQL-based inference (using HANA PAL or stored procedures) sql_template = implementation.get('sql_template', '') if not sql_template: return {"error": "Missing SQL template in protocol implementation"} # Replace template variables sql = ProtocolManager._render_sql_template( template=sql_template, model_name=params.get('model_name', ''), input_data=params.get('input_data', {}) ) # Execute SQL with get_hana_connection() as conn: result_df = pd.read_sql(sql, conn) return result_df.to_dict(orient='records') elif handler_type == 'python': # Python-based inference # For this example, we'll just return a mock result # In a real system, this would load and execute the model # Mock inference result return { "predictions": [ {"id": 1, "prediction": 0.95, "label": "positive"}, {"id": 2, "prediction": 0.30, "label": "negative"} ], "model_name": params.get('model_name', ''), "model_version": params.get('model_version', ''), "execution_time_ms": 125 } else: return {"error": f"Unsupported inference handler type: {handler_type}"} except Exception as e: error_msg = str(e) logger.error(f"Error in inference protocol: {error_msg}") return {"error": error_msg} @staticmethod def _handle_training(params: Dict) -> Dict: """Handle training protocol execution""" try: implementation = params.get('protocol_implementation', {}) handler_type = implementation.get('handler_type', 'sql') if handler_type == 'sql': # SQL-based training (using HANA PAL or stored procedures) sql_template = implementation.get('sql_template', '') if not sql_template: return {"error": "Missing SQL template in protocol implementation"} # Replace template variables sql = ProtocolManager._render_sql_template( template=sql_template, model_name=params.get('model_name', ''), input_data=params.get('input_data', {}) ) # Execute SQL with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute(sql) conn.commit() return { "status": "success", "message": f"Training completed for model {params.get('model_name', '')}" } elif handler_type == 'python': # Python-based training # For this example, we'll just return a mock result # In a real system, this would train and save the model # Mock training result return { "status": "success", "model_name": params.get('model_name', ''), "model_version": params.get('model_version', ''), "training_metrics": { "accuracy": 0.92, "f1_score": 0.91, "auc": 0.95 }, "execution_time_seconds": 75 } else: return {"error": f"Unsupported training handler type: {handler_type}"} except Exception as e: error_msg = str(e) logger.error(f"Error in training protocol: {error_msg}") return {"error": error_msg} @staticmethod def _render_sql_template(template: str, model_name: str, input_data: Dict) -> str: """Render SQL template with variables""" sql = template # Replace model name sql = sql.replace('{{model_name}}', model_name) # Replace input data variables for key, value in input_data.items(): placeholder = f'{{{{input.{key}}}}}' if isinstance(value, str): # Escape single quotes for string values value = value.replace("'", "''") sql = sql.replace(placeholder, f"'{value}'") elif isinstance(value, (int, float, bool)): sql = sql.replace(placeholder, str(value)) elif value is None: sql = sql.replace(placeholder, 'NULL') return sql # --------- API Routes --------- @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1 FROM DUMMY") result = cursor.fetchone() if result and result[0] == 1: return jsonify({ 'status': 'healthy', 'database': 'connected', 'version': '1.0.0' }) return jsonify({ 'status': 'unhealthy', 'database': 'error', 'message': 'Database check failed' }), 500 except Exception as e: return jsonify({ 'status': 'unhealthy', 'database': 'error', 'message': str(e) }), 500 # ----- Model API Endpoints ----- @app.route('/api/models', methods=['GET']) def list_models(): """List all models""" active_only = request.args.get('active_only', 'false').lower() == 'true' models = ModelManager.list_models(active_only) return jsonify(models) @app.route('/api/models/<model_name>', methods=['GET']) def get_model(model_name): """Get model by name""" model = ModelManager.get_model(model_name) if model: return jsonify(model) return jsonify({'error': f'Model {model_name} not found'}), 404 @app.route('/api/models', methods=['POST']) def register_model(): """Register a new model""" data = request.json if not data or 'model_name' not in data or 'model_metadata' not in data: return jsonify({'error': 'Missing required parameters'}), 400 result = ModelManager.register_model(data['model_name'], data['model_metadata']) if result: return jsonify({ 'success': True, 'message': f"Model {data['model_name']} registered successfully" }), 201 return jsonify({'error': f"Failed to register model {data['model_name']}"}), 400 @app.route('/api/models/<model_name>/status', methods=['PUT']) def update_model_status(model_name): """Update a model's active status""" data = request.json if not data or 'active' not in data: return jsonify({'error': 'Missing active parameter'}), 400 active = data['active'] result = ModelManager.update_model_status(model_name, active) if result: return jsonify({ 'success': True, 'message': f"Model {model_name} status updated to {'active' if active else 'inactive'}" }) return jsonify({'error': f"Model {model_name} not found or couldn't be updated"}), 404 @app.route('/api/models/<model_name>', methods=['DELETE']) def delete_model(model_name): """Delete a model""" result = ModelManager.delete_model(model_name) if result: return jsonify({'success': True, 'message': f"Model {model_name} deleted successfully"}) return jsonify({'error': f"Model {model_name} not found or couldn't be deleted"}), 404 # ----- Context API Endpoints ----- @app.route('/api/contexts', methods=['GET']) def list_contexts(): """List all contexts""" active_only = request.args.get('active_only', 'false').lower() == 'true' contexts = ContextManager.list_contexts(active_only) return jsonify(contexts) @app.route('/api/contexts/<context_name>', methods=['GET']) def get_context(context_name): """Get context by name""" context = ContextManager.get_context(context_name) if context: return jsonify(context) return jsonify({'error': f'Context {context_name} not found'}), 404 @app.route('/api/contexts', methods=['POST']) def create_context(): """Create a new context""" data = request.json if not data or 'context_name' not in data or 'model_name' not in data or 'context_config' not in data: return jsonify({'error': 'Missing required parameters'}), 400 result = ContextManager.create_context(data['context_name'], data['model_name'], data['context_config']) if result: return jsonify({ 'success': True, 'message': f"Context {data['context_name']} created successfully" }), 201 return jsonify({'error': f"Failed to create context {data['context_name']}"}), 400 @app.route('/api/contexts/<context_name>/status', methods=['PUT']) def update_context_status(context_name): """Update a context's active status""" data = request.json if not data or 'active' not in data: return jsonify({'error': 'Missing active parameter'}), 400 active = data['active'] result = ContextManager.update_context_status(context_name, active) if result: return jsonify({ 'success': True, 'message': f"Context {context_name} status updated to {'active' if active else 'inactive'}" }) return jsonify({'error': f"Context {context_name} not found or couldn't be updated"}), 404 @app.route('/api/contexts/<context_name>', methods=['DELETE']) def delete_context(context_name): """Delete a context""" result = ContextManager.delete_context(context_name) if result: return jsonify({'success': True, 'message': f"Context {context_name} deleted successfully"}) return jsonify({'error': f"Context {context_name} not found or couldn't be deleted"}), 404 @app.route('/api/contexts/<context_name>/execute', methods=['POST']) def execute_in_context(context_name): """Execute a request in a context""" data = request.json if not data: return jsonify({'error': 'Missing request data'}), 400 result = ContextManager.execute_in_context(context_name, data) if result is None: return jsonify({'error': f"Failed to execute in context {context_name}"}), 400 return jsonify(result) # ----- Protocol API Endpoints ----- @app.route('/api/protocols', methods=['GET']) def list_protocols(): """List all protocols""" protocol_type = request.args.get('type') protocols = ProtocolManager.list_protocols(protocol_type) return jsonify(protocols) @app.route('/api/protocols/<protocol_name>', methods=['GET']) def get_protocol(protocol_name): """Get protocol by name""" protocol = ProtocolManager.get_protocol(protocol_name) if protocol: return jsonify(protocol) return jsonify({'error': f'Protocol {protocol_name} not found'}), 404 @app.route('/api/protocols', methods=['POST']) def register_protocol(): """Register a new protocol""" data = request.json if not data or 'protocol_name' not in data or 'protocol_type' not in data or 'implementation' not in data: return jsonify({'error': 'Missing required parameters'}), 400 result = ProtocolManager.register_protocol( data['protocol_name'], data['protocol_type'], data['implementation']) if result: return jsonify({ 'success': True, 'message': f"Protocol {data['protocol_name']} registered successfully" }), 201 return jsonify({'error': f"Failed to register protocol {data['protocol_name']}"}), 400 @app.route('/api/protocols/<protocol_name>', methods=['DELETE']) def delete_protocol(protocol_name): """Delete a protocol""" result = ProtocolManager.delete_protocol(protocol_name) if result: return jsonify({'success': True, 'message': f"Protocol {protocol_name} deleted successfully"}) return jsonify({'error': f"Protocol {protocol_name} not found or couldn't be deleted"}), 404 # ----- Schema Management ----- def initialize_schemas(): """Initialize required schemas and tables if they don't exist""" try: with get_hana_connection() as conn: cursor = conn.cursor() # First check if schemas exist existing_schemas = {} for schema in ['MODEL_SCHEMA', 'CONTEXT_SCHEMA', 'PROTOCOL_SCHEMA']: schema_name = MCP_CONFIG[schema] cursor.execute(""" SELECT COUNT(*) FROM SYS.SCHEMAS WHERE SCHEMA_NAME = ? """, (schema_name,)) count = cursor.fetchone()[0] existing_schemas[schema_name] = count > 0 # Create schemas if they don't exist and user has privileges for schema in ['MODEL_SCHEMA', 'CONTEXT_SCHEMA', 'PROTOCOL_SCHEMA']: schema_name = MCP_CONFIG[schema] if not existing_schemas[schema_name]: try: cursor.execute(f""" CREATE SCHEMA {schema_name} """) logger.info(f"Created schema {schema_name}") except dbapi.Error as e: logger.warning(f"Could not create schema {schema_name}. Using existing schema. Error: {e}") # If we can't create the schema, we'll try to use it if it exists pass # Attempt to create tables in each schema try: # Check if tables exist first def table_exists(schema, table): cursor.execute(""" SELECT COUNT(*) FROM SYS.TABLES WHERE SCHEMA_NAME = ? AND TABLE_NAME = ? """, (schema, table)) return cursor.fetchone()[0] > 0 # Create Model Registry table if it doesn't exist if not table_exists(MCP_CONFIG['MODEL_SCHEMA'], 'MODEL_REGISTRY'): cursor.execute(f""" CREATE TABLE {MCP_CONFIG['MODEL_SCHEMA']}.MODEL_REGISTRY ( MODEL_ID INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, MODEL_NAME VARCHAR(255) NOT NULL UNIQUE, MODEL_VERSION VARCHAR(50) NOT NULL, MODEL_TYPE VARCHAR(50) NOT NULL, FRAMEWORK VARCHAR(50) NOT NULL, CREATION_DATE TIMESTAMP NOT NULL, METADATA NCLOB, ACTIVE BOOLEAN DEFAULT TRUE ) """) logger.info(f"Created table {MCP_CONFIG['MODEL_SCHEMA']}.MODEL_REGISTRY") # Create Context Registry table if it doesn't exist if not table_exists(MCP_CONFIG['CONTEXT_SCHEMA'], 'CONTEXT_REGISTRY'): cursor.execute(f""" CREATE TABLE {MCP_CONFIG['CONTEXT_SCHEMA']}.CONTEXT_REGISTRY ( CONTEXT_ID INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, CONTEXT_NAME VARCHAR(255) NOT NULL UNIQUE, MODEL_NAME VARCHAR(255) NOT NULL, CONTEXT_TYPE VARCHAR(50) NOT NULL, CONFIG NCLOB, CREATED_AT TIMESTAMP NOT NULL, ACTIVE BOOLEAN DEFAULT TRUE, FOREIGN KEY (MODEL_NAME) REFERENCES {MCP_CONFIG['MODEL_SCHEMA']}.MODEL_REGISTRY(MODEL_NAME) ON DELETE CASCADE ) """) logger.info(f"Created table {MCP_CONFIG['CONTEXT_SCHEMA']}.CONTEXT_REGISTRY") # Create Protocol Registry table if it doesn't exist if not table_exists(MCP_CONFIG['PROTOCOL_SCHEMA'], 'PROTOCOL_REGISTRY'): cursor.execute(f""" CREATE TABLE {MCP_CONFIG['PROTOCOL_SCHEMA']}.PROTOCOL_REGISTRY ( PROTOCOL_ID INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, PROTOCOL_NAME VARCHAR(255) NOT NULL UNIQUE, PROTOCOL_TYPE VARCHAR(50) NOT NULL, IMPLEMENTATION NCLOB, CREATED_AT TIMESTAMP NOT NULL, ACTIVE BOOLEAN DEFAULT TRUE ) """) logger.info(f"Created table {MCP_CONFIG['PROTOCOL_SCHEMA']}.PROTOCOL_REGISTRY") conn.commit() logger.info("Tables initialized successfully") return True except dbapi.Error as e: logger.error(f"Error creating tables: {e}") logger.error("Please ensure you have the necessary privileges and the schemas exist") return False except Exception as e: logger.error(f"Error initializing schemas: {e}") return False # ----- MCP Server Configuration API ----- @app.route('/api/config', methods=['GET']) def get_config(): """Get current MCP server configuration""" config = { 'schemas': { 'model_schema': MCP_CONFIG['MODEL_SCHEMA'], 'context_schema': MCP_CONFIG['CONTEXT_SCHEMA'], 'protocol_schema': MCP_CONFIG['PROTOCOL_SCHEMA'] }, 'connection': { 'host': HANA_CONFIG['address'], 'port': HANA_CONFIG['port'], 'encrypt': HANA_CONFIG['encrypt'] }, 'version': '1.0.0', 'max_connections': MCP_CONFIG['max_connections'] } return jsonify(config) @app.route('/api/config/test-connection', methods=['GET']) def test_connection(): """Test database connection""" try: with get_hana_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT @@version AS version") result = cursor.fetchone() version = result[0] if result else "Unknown" return jsonify({ 'status': 'success', 'message': 'Successfully connected to HANA Cloud', 'version': version }) except Exception as e: return jsonify({ 'status': 'error', 'message': f'Connection failed: {str(e)}' }), 500 # ----- Main Application Entry Point ----- if __name__ == '__main__': # Initialize schemas and tables if not initialize_schemas(): logger.warning("Schema initialization failed. Some features may not work correctly.") # Start the Flask application port = int(os.environ.get('PORT', 5001)) app.run(host='0.0.0.0', port=port, debug=False)