Skip to main content
Glama
hingaibm

Data Intelligence MCP Server

by hingaibm
tool_helper_service.py19.9 kB
# Copyright [2025] [IBM] # Licensed under the Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0) # See the LICENSE file in the project root for license information. # This file has been modified with the assistance of IBM Bob AI Tool import json import re from enum import Enum from typing import Any, Dict, List, Optional, Union from app.core.auth import get_access_token from app.core.settings import settings from app.services.constants import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE from app.shared.exceptions.base import ExternalAPIError, ServiceError from app.shared.logging.utils import LOGGER from app.shared.utils.http_client import get_http_client def create_default_headers( content_type: str = JSON_CONTENT_TYPE, accept_type: str = JSON_CONTENT_TYPE, additional_headers: Optional[Dict[str, str]] = None, ) -> Dict[str, str]: """ Create headers for API requests. Args: content_type: Content type for the request accept_type: Accept type for the request include_auth: Whether to include authorization header additional_headers: Additional headers to include Returns: Dict[str, str]: Headers for the request """ headers = {"Content-Type": content_type, "accept": accept_type} if additional_headers: headers.update(additional_headers) return headers class HTTPMethod(Enum): GET = "GET" POST = "POST" PATCH = "PATCH" PUT = "PUT" DELETE = "DELETE" class ToolHelperService: """ Base class for tool service that provides common functionality for making API calls, handling errors. """ def __init__(self): self.base_url = settings.di_service_url self.resource_controller_url = settings.resource_controller_url self.ui_base_url = settings.ui_url self.http_client = get_http_client() async def execute_get_request( self, url: str, headers: Dict[str, str] = create_default_headers(), params: Optional[Dict[str, Any]] = None, tool_name: Optional[str] = None, ) -> Dict[str, Any]: """ Execute a GET request with authorization header and handle common error patterns. Args: url: URL for the request headers: Headers for the request params: Query parameters timeout: Request timeout in seconds tool_name: Name of the tool making the request (for error messages) Returns: Dict[str, Any]: JSON response Raises: ExternalAPIError: If the request fails ServiceError: If the response status code is 404 """ headers["Authorization"] = await get_access_token() try: response_json = await self.http_client.get( url=url, headers=headers, params=params ) return response_json except ExternalAPIError as e: LOGGER.error( f"{tool_name or 'Request'} to {url} failed with error: {str(e)}" ) raise self._format_exception(e, HTTPMethod.GET, tool_name) async def execute_post_request( self, url: str, headers: Dict[str, str] = create_default_headers(), json: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None, tool_name: Optional[str] = None, ) -> Dict[str, Any]: """ Execute a POST request with authorization header and handle common error patterns. Args: url: URL for the request headers: Headers for the request payload: JSON data for the request body params: Query parameters tool_name: Name of the tool making the request (for error messages) Returns: Dict[str, Any]: JSON response Raises: ExternalAPIError: If the request fails """ headers["Authorization"] = await get_access_token() try: response_json = await self.http_client.post( url=url, data=json, headers=headers, params=params ) return response_json except ExternalAPIError as e: LOGGER.error( f"{tool_name or 'Request'} to {url} failed with error: {str(e)}" ) raise self._format_exception(e, HTTPMethod.POST, tool_name) async def execute_patch_request( self, url: str, headers: Dict[str, str] = create_default_headers(content_type=JSON_PATCH_CONTENT_TYPE), json: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None, params: Optional[Dict[str, Any]] = None, tool_name: Optional[str] = None, ) -> Dict[str, Any]: """ Execute a PATCH request with authorization header and handle common error patterns. Args: url: URL for the request headers: Headers for the request json: JSON data for the request body (can be dict or list for JSON PATCH operations) params: Query parameters tool_name: Name of the tool making the request (for error messages) Returns: Dict[str, Any]: JSON response Raises: ExternalAPIError: If the request fails """ headers["Authorization"] = await get_access_token() try: response_json = await self.http_client.patch( url=url, data=json, headers=headers, params=params ) return response_json except ExternalAPIError as e: LOGGER.error( f"{tool_name or 'Request'} to {url} failed with error: {str(e)}" ) raise self._format_exception(e, HTTPMethod.PATCH, tool_name) def _format_exception( self, exception: ExternalAPIError, method: HTTPMethod, tool_name: Optional[str] = None, ) -> ExternalAPIError | ServiceError: """ Handle API error with smart formatting and truncation. Extracts status code, error type, and sanitizes error messages to provide user-friendly error messages while preventing information overload. Args: exception: ExternalAPIError method: HTTPMethod tool_name: Name of the tool making the request (for error messages) Returns: Dict[str, Any]: JSON response if successful Raises: ServiceError: If the response status code is 404 or 403 ExternalAPIError: For other error status codes """ status_code = self._parse_status_code(exception.message) error_detail = self._extract_error_detail(exception.message) error_category = self._get_error_category(status_code) sanitized_message = self._get_sanitized_message(status_code, error_detail) error_message = self._build_error_message( tool_name, error_category, status_code, sanitized_message ) return self._raise_formatted_error( status_code, method, error_message, exception.message ) def _parse_status_code(self, message: str) -> str | None: """Parse status code from exception message.""" status_code_match = re.search(r"HTTP error (\d+)", message) return status_code_match.group(1) if status_code_match else None def _extract_error_detail(self, message: str) -> str: """Extract error detail from exception message.""" error_detail_match = re.search(r"HTTP error \d+ for .+?: (.+)", message, re.DOTALL) if error_detail_match: return error_detail_match.group(1).strip() LOGGER.debug(f"Could not find error_detail pattern in exception.message: {message[:200]}") return "" def _get_error_category(self, status_code: str | None) -> str: """Get error category from status code.""" if status_code and status_code.isdigit(): return self._categorize_error(int(status_code)) return "Unknown Error" def _get_sanitized_message(self, status_code: str | None, error_detail: str) -> str: """Get sanitized message from error detail, skipping for 500 errors.""" if status_code == "500": return "" LOGGER.debug(f"Extracting message from error_detail (first 200 chars): {error_detail[:200]}") sanitized = self._extract_error_message(error_detail, max_length=250) LOGGER.debug(f"Extracted sanitized_message: {sanitized[:100] if sanitized else 'EMPTY'}") return sanitized def _build_error_message( self, tool_name: Optional[str], error_category: str, status_code: str | None, sanitized_message: str ) -> str: """Build formatted error message.""" tool_display = tool_name or 'request' base_message = f"Tool {tool_display} call failed: {error_category}" if status_code == "500": return f"{base_message} (Status: {status_code}). Internal server error." if status_code == "404": message = f"{base_message} (Status: {status_code}). Resource was not found." return f"{message} {sanitized_message}" if sanitized_message else message if status_code == "403": message = f"{base_message} (Status: {status_code}). Operation is forbidden." return f"{message} {sanitized_message}" if sanitized_message else message if status_code: message = f"{base_message} (Status: {status_code})" return f"{message}. {sanitized_message}" if sanitized_message else message # Fallback if we can't parse status code return f"{base_message}. {sanitized_message}" if sanitized_message else base_message def _raise_formatted_error( self, status_code: str | None, method: HTTPMethod, error_message: str, original_message: str ) -> ExternalAPIError | ServiceError: """Raise appropriate error type based on status code.""" LOGGER.error(f"{error_message}, Full exception: {original_message}") if status_code == "404" and method == HTTPMethod.GET: raise ServiceError(error_message) if status_code == "403": raise ServiceError(error_message) raise ExternalAPIError(error_message) def _categorize_error(self, status_code: int) -> str: """ Categorize HTTP error by status code. Args: status_code: HTTP status code Returns: str: Error category name """ if 400 <= status_code < 500: if status_code == 400: return "Validation Error" elif status_code == 401: return "Authentication Error" elif status_code == 403: return "Authorization Error" elif status_code == 404: return "Not Found Error" elif status_code == 422: return "Validation Error" else: return "Client Error" elif 500 <= status_code < 600: return "Server Error" else: return "Unknown Error" def _extract_error_message(self, error_detail: str, max_length: int = 250) -> str: """ Extract error message from error detail. If error_detail is JSON, tries to extract message from errors array or message field. Otherwise, returns empty string to show only categorized error. Args: error_detail: Original error detail (could be JSON string or plain text) max_length: Maximum length for the extracted message Returns: str: Extracted and sanitized error message, or empty string if no message found """ if not error_detail: return "" json_str = self._find_json_string(error_detail) if not json_str: return "" error_json = self._parse_json_safely(json_str) if error_json is None: return "" # Try to extract message from errors array first message = self._extract_message_from_errors_array(error_json, max_length) if message: return message # Fallback to top-level message field message = self._extract_message_from_top_level(error_json, max_length) if message: return message # No message found LOGGER.debug(f"Could not extract message from error_json. Keys: {list(error_json.keys()) if isinstance(error_json, dict) else 'not a dict'}") return "" def _find_json_string(self, error_detail: str) -> str: """Find and extract JSON string from error detail.""" for i, char in enumerate(error_detail): if char in ['{', '[']: return error_detail[i:] LOGGER.debug(f"No JSON found in error_detail: {error_detail[:100]}") return "" def _parse_json_safely(self, json_str: str) -> dict | list | None: """Parse JSON string safely, returning None on failure.""" try: return json.loads(json_str) except (json.JSONDecodeError, TypeError) as e: LOGGER.error(f"Failed to parse error_detail as JSON: {e}. JSON string: {json_str[:200]}") return None def _extract_message_from_errors_array(self, error_json: dict | list, max_length: int) -> str: """Extract message from errors array if present.""" if not isinstance(error_json, dict) or "errors" not in error_json: return "" errors = error_json.get("errors", []) if not isinstance(errors, list) or len(errors) == 0: LOGGER.debug(f"errors is not a list or is empty: {errors}") return "" first_error = errors[0] if not isinstance(first_error, dict) or "message" not in first_error: LOGGER.debug(f"first_error is not dict or has no message: {first_error}") return "" message = first_error.get("message", "") if message: LOGGER.debug(f"Extracted message from errors[0]: {message[:100]}") return self._sanitize_error_message(message, max_length) return "" def _extract_message_from_top_level(self, error_json: dict | list, max_length: int) -> str: """Extract message from top-level message field if present.""" if not isinstance(error_json, dict) or "message" not in error_json: return "" message = error_json.get("message", "") if message: LOGGER.debug(f"Extracted message from top-level: {message[:100]}") return self._sanitize_error_message(message, max_length) return "" def _sanitize_error_message(self, error_message: str, max_length: int = 250) -> str: """ Sanitize and truncate error message to prevent information overload and leak sensitive data. Args: error_message: Original error message max_length: Maximum length for the sanitized message Returns: str: Sanitized and truncated error message """ if not error_message: return "" sanitized = error_message # IMPORTANT: Order matters - sanitize most specific patterns first # Remove URLs first (but keep domain for context) - do this before token sanitization # to avoid partial matches in URLs sanitized = re.sub(r'https?://[^\s]+', '[URL]', sanitized) # Remove database connection strings (contains credentials, do before generic token matching) sanitized = re.sub(r'\b(?:postgresql|postgres|mysql|mongodb|redis|mssql|sqlserver|oracle|sqlite)://[^\s]+', '[DATABASE_URL]', sanitized, flags=re.IGNORECASE) # Remove potential AWS/Azure/GCP keys or ARNs (specific patterns before generic tokens) # Note: Using [a-z0-9] instead of [a-zA-Z0-9] since IGNORECASE flag makes case redundant # Note: Escaping - to avoid it being interpreted as a range operator (which would create duplicates with 0-9) sanitized = re.sub(r'\b(?:AKIA|ASIA|arn:aws|azure-|gcp-)[a-z0-9/_\-]+', '[CLOUD_CREDENTIAL]', sanitized, flags=re.IGNORECASE) # Remove Bearer tokens (Bearer followed by alphanumeric string) - specific pattern # Note: Using [a-z0-9] instead of [a-zA-Z0-9] since IGNORECASE flag makes case redundant # Note: Escaping - to avoid it being interpreted as a range operator sanitized = re.sub(r'\bBearer\s+[a-z0-9_\-]{20,}\b', 'Bearer [REDACTED]', sanitized, flags=re.IGNORECASE) # Handle Stripe-style keys (sk_live_..., pk_test_...) - specific pattern # Note: Using [a-z0-9] instead of [A-Za-z0-9] since IGNORECASE flag makes case redundant sanitized = re.sub(r'\b(sk|pk|rk)_(live|test|prod)_[a-z0-9]{24,}\b', r'\1_\2_[REDACTED]', sanitized, flags=re.IGNORECASE) # Remove potential API keys with prefixes (e.g., "apikey:abc123...", "token:...") # Note: api[_-]?key already matches "apikey" (when optional [_-]? is empty), so "apikey" is redundant sanitized = re.sub(r'\b(api[_-]?key|token|secret|password|pwd|credential|auth[_-]?token)[=:]\s*[^\s]+', r'\1=[REDACTED]', sanitized, flags=re.IGNORECASE) # Remove potential API keys, tokens, or secrets (long alphanumeric strings) # Catch strings of 32+ characters (common token/secret lengths) - do this after specific patterns sanitized = re.sub(r'\b[a-zA-Z0-9]{32,}\b', '[REDACTED]', sanitized) # Remove potential email addresses (but keep domain context) sanitized = re.sub(r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b', '[EMAIL]', sanitized) # Remove potential IP addresses (IPv4) sanitized = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}(?::\d+)?\b', '[IP]', sanitized) # Remove IPv6 addresses (simplified pattern) sanitized = re.sub(r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b', '[IP]', sanitized) # Remove potential file paths with sensitive names (e.g., /home/user/.ssh/id_rsa) # Match paths containing sensitive keywords or file extensions sanitized = re.sub(r'[/\\][^\s]*(?:passwd|password|secret|key|credential|token|\.env|\.pem|\.key|\.p12|\.pfx|id_rsa|id_dsa|id_ed25519)[^\s]*', '[FILE_PATH]', sanitized, flags=re.IGNORECASE) # Remove potential UUIDs (which might be sensitive IDs) - do before generic alphanumeric sanitized = re.sub(r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b', '[UUID]', sanitized, flags=re.IGNORECASE) # Remove potential credit card numbers (16 digits, possibly with spaces or dashes) sanitized = re.sub(r'\b(?:\d{4}[-\s]?){3}\d{4}\b', '[CREDIT_CARD]', sanitized) # Remove potential SSN (XXX-XX-XXXX) sanitized = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', sanitized) # Remove potential phone numbers (various formats) sanitized = re.sub(r'\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', '[PHONE]', sanitized) # Truncate if too long if len(sanitized) > max_length: sanitized = sanitized[:max_length].rsplit(' ', 1)[0] + "... (message truncated)" return sanitized.strip() tool_helper_service = ToolHelperService()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hingaibm/data-intelligence-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server