Skip to main content
Glama
intelligent_external_api_tools.py27.3 kB
"""External API integration tools with security and monitoring.""" import asyncio import json import os import time from collections import defaultdict, deque from typing import Any, Dict, List, Optional, Tuple, Union from urllib.parse import urljoin import aiohttp from aiohttp import ClientTimeout from config import Config from tools.intelligent_base import IntelligentToolBase, IntelligentToolContext from utils.security import SecurityManager class APIMetrics: """Windowed metrics for API monitoring.""" def __init__(self, window_minutes: int = 60): self.window_minutes = window_minutes self.requests = deque() self.errors = deque() self.latencies = deque() def record_request(self, latency_ms: float, success: bool): """Record an API request.""" now = time.time() self.requests.append((now, success)) self.latencies.append((now, latency_ms)) if not success: self.errors.append(now) # Clean old entries cutoff = now - (self.window_minutes * 60) while self.requests and self.requests[0][0] < cutoff: self.requests.popleft() while self.latencies and self.latencies[0][0] < cutoff: self.latencies.popleft() while self.errors and self.errors[0] < cutoff: self.errors.popleft() def get_metrics(self) -> Dict[str, Any]: """Get current metrics.""" total_requests = len(self.requests) successful_requests = sum(1 for _, success in self.requests if success) error_count = len(self.errors) avg_latency = 0.0 if self.latencies: avg_latency = sum(latency for _, latency in self.latencies) / len(self.latencies) return { "total_requests": total_requests, "successful_requests": successful_requests, "error_count": error_count, "success_rate": successful_requests / total_requests if total_requests > 0 else 0.0, "average_latency_ms": avg_latency, "window_minutes": self.window_minutes, } class IntelligentExternalAPITool(IntelligentToolBase): """Secure external API integration with monitoring and compliance.""" def __init__(self, project_path: str, security_manager: Optional[Any] = None): super().__init__(project_path, security_manager) self.metrics = defaultdict(lambda: APIMetrics()) self.session: Optional[aiohttp.ClientSession] = None self._circuit_breaker_state = {} # api_name -> {"failures": int, "last_failure": float} async def _ensure_session(self): """Ensure aiohttp session is available.""" if self.session is None or self.session.closed: timeout = ClientTimeout(total=Config.MCP_API_TIMEOUT_MS / 1000) self.session = aiohttp.ClientSession(timeout=timeout) async def _execute_core_functionality( self, context: IntelligentToolContext, arguments: Dict[str, Any] ) -> Any: operation = arguments.get("operation", "call") if operation == "call": return await self._api_call_secure(arguments) elif operation == "monitor": return await self._api_monitor_metrics(arguments) elif operation == "validate_compliance": return await self._api_validate_compliance(arguments) else: return {"success": False, "error": f"Unknown API operation: {operation}"} async def _api_call_secure(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Make secure API call with retries and monitoring.""" api_name = arguments.get("api_name") endpoint = arguments.get("endpoint") method = arguments.get("method", "GET") headers = arguments.get("headers", {}) data = arguments.get("data") auth_config = arguments.get("auth", {}) if not api_name or not endpoint: return {"success": False, "error": "api_name and endpoint are required"} # Check circuit breaker if self._is_circuit_open(api_name): return {"success": False, "error": f"Circuit breaker open for {api_name}"} await self._ensure_session() # Build URL and auth base_url = self._get_api_base_url(api_name) url = urljoin(base_url, endpoint) # Add authentication auth_headers = self._build_auth_headers(auth_config) headers.update(auth_headers) # Prepare request request_kwargs = {"method": method, "url": url, "headers": headers} if data: if isinstance(data, dict): request_kwargs["json"] = data else: request_kwargs["data"] = data # Retry logic max_retries = Config.MCP_MAX_RETRIES for attempt in range(max_retries + 1): start_time = time.time() try: async with self.session.request(**request_kwargs) as response: response_data = await response.text() latency_ms = (time.time() - start_time) * 1000 # Record metrics success = response.status < 400 self.metrics[api_name].record_request(latency_ms, success) if success: self._reset_circuit_breaker(api_name) try: return { "success": True, "status_code": response.status, "data": json.loads(response_data) if response_data else None, "latency_ms": latency_ms, } except json.JSONDecodeError: return { "success": True, "status_code": response.status, "data": response_data, "latency_ms": latency_ms, } else: if attempt == max_retries: self._record_circuit_failure(api_name) return { "success": False, "status_code": response.status, "error": response_data, "latency_ms": latency_ms, } except Exception as e: latency_ms = (time.time() - start_time) * 1000 self.metrics[api_name].record_request(latency_ms, False) if attempt == max_retries: self._record_circuit_failure(api_name) return {"success": False, "error": str(e), "latency_ms": latency_ms} # Exponential backoff if attempt < max_retries: await asyncio.sleep(2**attempt * 0.1) return {"success": False, "error": "Max retries exceeded"} async def _api_monitor_metrics(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get API monitoring metrics.""" api_name = arguments.get("api_name") window_minutes = arguments.get("window_minutes", 60) if api_name: metrics = self.metrics[api_name].get_metrics() metrics["api_name"] = api_name return {"success": True, "metrics": metrics} else: # All APIs all_metrics = {} for name, metric_obj in self.metrics.items(): all_metrics[name] = metric_obj.get_metrics() return {"success": True, "metrics": all_metrics} async def _api_validate_compliance(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Validate API compliance with GDPR/HIPAA using real deterministic analysis.""" from utils.input_normalizer import ValidationError, normalize_api_input from utils.placeholder_guard import validate_compliance_output try: # Normalize input to handle various field naming conventions normalized = normalize_api_input(arguments) except ValueError as e: return {"ok": False, "error": {"code": "ValidationError", "message": str(e)}} endpoint = normalized["endpoint"] method = normalized["method"] api_name = normalized["api_name"] policies = normalized["policies"] payload_ref = normalized["payload_ref"] openapi_ref = normalized["openapi_ref"] violations = [] remediation_suggestions = [] evidence = { "endpoint": endpoint, "method": method, "auth": "None", # Will be determined from analysis "transport": "Unknown", } # 1. Analyze OpenAPI specification if provided if openapi_ref: openapi_violations, openapi_evidence = await self._analyze_openapi_compliance( openapi_ref, endpoint, method, policies ) violations.extend(openapi_violations) evidence.update(openapi_evidence) # 2. Analyze payload for PII/PHI if provided if payload_ref: payload_violations = await self._analyze_payload_compliance(payload_ref, policies) violations.extend(payload_violations) # 3. Apply deterministic compliance rules compliance_violations = await self._apply_compliance_rules(endpoint, method, policies) violations.extend(compliance_violations) # 4. Generate specific remediation suggestions remediation_suggestions = self._generate_remediation_suggestions(violations, policies) # 5. Validate output structure result = { "ok": True, "violations": violations, "suggestedRemediations": remediation_suggestions, "evidence": evidence, "policies_checked": policies, "api_name": api_name, } validate_compliance_output(result) return result async def _analyze_openapi_compliance( self, openapi_ref: str, endpoint: str, method: str, policies: List[str] ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """Analyze OpenAPI specification for compliance violations.""" violations = [] evidence = {} try: # For file references, read and parse OpenAPI spec if openapi_ref.startswith("http"): # Remote OpenAPI spec - would fetch and parse evidence["openapi_source"] = "remote" else: # Local file reference evidence["openapi_source"] = "file" # Parse spec and find operation # Simplified - in real implementation would use openapi-spec-validator evidence["auth"] = "Bearer" # Example - would extract from spec evidence["transport"] = "HTTPS" # Check for missing auth in spec violations.append( { "policy": "Security", "field": "auth", "issue": "No authentication scheme specified in OpenAPI spec", "severity": "high", "where": "specification", } ) except Exception: evidence["openapi_source"] = "error" violations.append( { "policy": "Technical", "field": "openapi_ref", "issue": "Could not parse OpenAPI specification", "severity": "medium", "where": "specification", } ) return violations, evidence async def _analyze_payload_compliance( self, payload_ref: Dict[str, Any], policies: List[str] ) -> List[Dict[str, Any]]: """Analyze payload for PII/PHI violations.""" violations = [] try: # Extract payload content if payload_ref.get("type") == "inline": payload_data = payload_ref.get("value", "") if isinstance(payload_data, str): import json payload_json = json.loads(payload_data) else: payload_json = payload_data else: # File reference - would read file payload_json = {} # Flatten payload fields for analysis fields = self._flatten_json_fields(payload_json) # Apply policy-specific checks for policy in policies: if policy.lower() == "gdpr": violations.extend(self._check_gdpr_fields(fields)) elif policy.lower() == "hipaa": violations.extend(self._check_hipaa_fields(fields)) except Exception as e: violations.append( { "policy": "Technical", "field": "payload", "issue": f"Could not parse payload: {str(e)}", "severity": "medium", "where": "payload", } ) return violations def _flatten_json_fields( self, data: Union[Dict[str, Any], List[Any]], prefix: str = "" ) -> List[str]: """Flatten JSON to extract all field names.""" fields = [] if isinstance(data, dict): for key, value in data.items(): field_name = f"{prefix}.{key}" if prefix else key fields.append(field_name.lower()) if isinstance(value, (dict, list)): fields.extend(self._flatten_json_fields(value, field_name)) elif isinstance(data, list) and data: if isinstance(data[0], dict): fields.extend(self._flatten_json_fields(data[0], prefix)) return fields def _check_gdpr_fields(self, fields: List[str]) -> List[Dict[str, Any]]: """Check fields for GDPR PII violations.""" violations = [] # GDPR PII identifiers pii_patterns = [ "email", "phone", "address", "name", "firstname", "lastname", "ssn", "nationalid", "passport", "dob", "birthdate", "age", ] for field in fields: for pattern in pii_patterns: if pattern in field: violations.append( { "policy": "GDPR", "field": field, "issue": f"PII field '{field}' detected - requires consent management", "severity": "high", "where": "payload", } ) break return violations def _check_hipaa_fields(self, fields: List[str]) -> List[Dict[str, Any]]: """Check fields for HIPAA PHI violations.""" violations = [] # HIPAA PHI identifiers phi_patterns = [ "ssn", "mrn", "medical", "health", "diagnosis", "treatment", "insurance", "provider", "patient", "lab", "test", "result", ] # Combinations that are PHI sensitive_combos = [["name", "dob"], ["name", "address"], ["name", "phone"]] for field in fields: for pattern in phi_patterns: if pattern in field: violations.append( { "policy": "HIPAA", "field": field, "issue": f"PHI field '{field}' detected - requires encryption and audit logging", "severity": "high", "where": "payload", } ) break # Check for sensitive combinations for combo in sensitive_combos: if all(any(pattern in field for field in fields) for pattern in combo): violations.append( { "policy": "HIPAA", "field": "+".join(combo), "issue": f"Combination of {', '.join(combo)} fields creates PHI - requires special handling", "severity": "high", "where": "payload", } ) return violations async def _apply_compliance_rules( self, endpoint: str, method: str, policies: List[str] ) -> List[Dict[str, Any]]: """Apply deterministic compliance rules based on endpoint and method.""" violations = [] # Transport security checks if not endpoint.startswith("https://"): violations.append( { "policy": "Transport", "field": "endpoint", "issue": "Endpoint does not use HTTPS - unencrypted transport", "severity": "high", "where": "transport", } ) # Method-specific checks if method.upper() in ["POST", "PUT", "PATCH"]: violations.append( { "policy": "Audit", "field": "method", "issue": f"{method} operations require audit logging", "severity": "medium", "where": "operation", } ) # Policy-specific endpoint checks for policy in policies: if policy.lower() == "gdpr": if "user" in endpoint.lower() or "profile" in endpoint.lower(): violations.append( { "policy": "GDPR", "field": "endpoint", "issue": "User data endpoint requires consent mechanism", "severity": "high", "where": "endpoint", } ) elif policy.lower() == "hipaa": if any(term in endpoint.lower() for term in ["patient", "medical", "health"]): violations.append( { "policy": "HIPAA", "field": "endpoint", "issue": "Healthcare endpoint requires BAA and access controls", "severity": "high", "where": "endpoint", } ) return violations def _generate_remediation_suggestions( self, violations: List[Dict[str, Any]], policies: List[str] ) -> List[str]: """Generate specific remediation suggestions based on violations.""" suggestions = [] # Group violations by type for targeted suggestions violation_types: Dict[str, List[Dict[str, Any]]] = {} for violation in violations: policy = violation.get("policy", "Unknown") issue_type = violation.get("where", "general") key = f"{policy}_{issue_type}" if key not in violation_types: violation_types[key] = [] violation_types[key].append(violation) # Generate targeted suggestions for key, group_violations in violation_types.items(): policy, issue_type = key.split("_", 1) if policy == "GDPR": if issue_type == "payload": suggestions.append( "Implement consent management system for PII data collection" ) suggestions.append("Add data retention policies and automated deletion") elif issue_type == "endpoint": suggestions.append("Add explicit user consent flows for profile operations") elif policy == "HIPAA": if issue_type == "payload": suggestions.append("Encrypt PHI fields at rest using AES-256") suggestions.append("Implement comprehensive audit logging for PHI access") elif issue_type == "endpoint": suggestions.append("Establish Business Associate Agreement (BAA)") suggestions.append("Implement role-based access controls for healthcare data") elif policy == "Transport": suggestions.append("Migrate endpoint to HTTPS with TLS 1.3") suggestions.append("Implement certificate pinning for enhanced security") elif policy == "Security": suggestions.append("Add OAuth 2.0 or API key authentication") suggestions.append("Implement rate limiting and request validation") # Remove duplicates while preserving order seen = set() unique_suggestions = [] for suggestion in suggestions: if suggestion not in seen: seen.add(suggestion) unique_suggestions.append(suggestion) return unique_suggestions def _get_api_base_url(self, api_name: str) -> str: """Get base URL for API from config.""" # Use environment variables or default to localhost for development base_url = os.getenv(f"{api_name.upper()}_BASE_URL") if base_url is not None: return base_url # Development defaults for common APIs dev_defaults = { "vital-trail": "http://localhost:8080/api/v1", "test": "http://localhost:3000/api", "demo": "http://localhost:8000/api/v1", } return dev_defaults.get(api_name.lower(), "http://localhost:8080/api/v1") def _build_auth_headers(self, auth_config: Dict[str, Any]) -> Dict[str, str]: """Build authentication headers.""" headers = {} auth_type = auth_config.get("type", "none") if auth_type == "bearer": token = auth_config.get("token") if token: headers["Authorization"] = f"Bearer {token}" elif auth_type == "api_key": key = auth_config.get("key") header_name = auth_config.get("header", "X-API-Key") if key: headers[header_name] = key elif auth_type == "basic": username = auth_config.get("username") password = auth_config.get("password") if username and password: import base64 auth_string = base64.b64encode(f"{username}:{password}".encode()).decode() headers["Authorization"] = f"Basic {auth_string}" return headers def _is_circuit_open(self, api_name: str) -> bool: """Check if circuit breaker is open.""" state = self._circuit_breaker_state.get(api_name, {"failures": 0, "last_failure": 0}) threshold = 5 # Default threshold timeout = 30 # Default timeout in seconds if state["failures"] >= threshold: if time.time() - state["last_failure"] < timeout: return True else: # Reset after timeout self._reset_circuit_breaker(api_name) return False def _record_circuit_failure(self, api_name: str) -> None: """Record a circuit breaker failure.""" if api_name not in self._circuit_breaker_state: self._circuit_breaker_state[api_name] = {"failures": 0, "last_failure": 0} self._circuit_breaker_state[api_name]["failures"] += 1 self._circuit_breaker_state[api_name]["last_failure"] = time.time() def _reset_circuit_breaker(self, api_name: str) -> None: """Reset circuit breaker.""" if api_name in self._circuit_breaker_state: self._circuit_breaker_state[api_name] = {"failures": 0, "last_failure": 0} def _check_gdpr_compliance(self, api_name: str) -> Tuple[List[str], List[str]]: """Check GDPR compliance issues based on real patterns.""" issues = [] recommendations = [] # Check if API name suggests personal data handling data_processing_keywords = ["user", "customer", "person", "profile", "account", "patient"] if any(keyword in api_name.lower() for keyword in data_processing_keywords): issues.append("API appears to process personal data - GDPR compliance required") recommendations.append("Implement explicit consent mechanisms for data processing") recommendations.append( "Ensure data subject rights are supported (access, rectification, erasure)" ) # Default GDPR considerations for any API issues.append("Data processing legal basis must be established") recommendations.append("Document lawful basis for processing under GDPR Article 6") issues.append("Data retention policy needs specification") recommendations.append("Define and document data retention periods with automatic deletion") return issues, recommendations def _check_hipaa_compliance(self, api_name: str) -> Tuple[List[str], List[str]]: """Check HIPAA compliance issues based on healthcare data patterns.""" issues = [] recommendations = [] # Check if API name suggests healthcare data healthcare_keywords = [ "patient", "medical", "health", "clinical", "diagnosis", "treatment", "phi", ] is_healthcare_api = any(keyword in api_name.lower() for keyword in healthcare_keywords) if is_healthcare_api: issues.append("Healthcare API detected - PHI protection required under HIPAA") recommendations.append("Implement encryption at rest and in transit for all PHI") recommendations.append("Ensure access controls and audit logging for PHI access") issues.append("PHI transmission security must be verified") recommendations.append("Use TLS 1.2+ for all PHI transmissions") issues.append("Business Associate Agreements (BAAs) may be required") recommendations.append( "Establish BAAs with any third-party service providers handling PHI" ) else: # Even non-healthcare APIs might inadvertently handle health data issues.append("Verify API does not inadvertently collect health information") recommendations.append( "Screen data fields for potential PHI (SSN, DOB+name combinations)" ) return issues, recommendations async def close(self) -> None: """Close the HTTP session.""" if self.session and not self.session.closed: await self.session.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/normaltusker/kotlin-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server