Skip to main content
Glama
mcp_server.py55.1 kB
import sys import logging import json from pathlib import Path from typing import List, Dict, Any, Optional, Union import datetime from fastmcp import FastMCP import os # Kafka imports from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType, NewTopic from kafka import KafkaConsumer, TopicPartition from kafka.errors import KafkaError, NoBrokersAvailable, KafkaTimeoutError, TopicAlreadyExistsError from typing import Union, List, Dict import logging # mail imports import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import datetime import time import uuid # Temporal memory imports from temporal_memory import TemporalMemoryStore, TemporalEvent, TemporalAnalyzer # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) # Initialize MCP server mcp = FastMCP("Kafka MCP Server") # Temporal Memory Configuration TEMPORAL_RETENTION_HOURS = int(os.getenv('KAFKAIQ_TEMPORAL_RETENTION_HOURS', '24')) TEMPORAL_MAX_EVENTS = int(os.getenv('KAFKAIQ_TEMPORAL_MAX_EVENTS', '100000')) TEMPORAL_AUTO_PERSIST = os.getenv('KAFKAIQ_TEMPORAL_AUTO_PERSIST', 'true').lower() == 'true' TEMPORAL_PERSIST_PATH = os.getenv('KAFKAIQ_TEMPORAL_PERSIST_PATH', 'temporal_state.json') class KafkaInspector: """ Holds broker details (object-level) and exposes single-responsibility utilities. """ def __init__( self, bootstrap_servers: Union[str, List[str]], client_id: str = "kafka-inspector", request_timeout_ms: int = 45000, # keep > session_timeout_ms api_version_auto_timeout_ms: int = 8000, ): self.bootstrap_servers = bootstrap_servers self.client_id = client_id self.request_timeout_ms = request_timeout_ms self.api_version_auto_timeout_ms = api_version_auto_timeout_ms self._admin = KafkaAdminClient( bootstrap_servers=self.bootstrap_servers, client_id=self.client_id, request_timeout_ms=self.request_timeout_ms, api_version_auto_timeout_ms=self.api_version_auto_timeout_ms, ) def get_topic_config(self, topic_name: str) -> Dict[str, str]: """ Get configuration for a specific topic. """ try: # Create ConfigResource for TOPIC type resource = ConfigResource(ConfigResourceType.TOPIC, topic_name) # Get configurations configs_result = self._admin.describe_configs([resource]) # Extract configuration values config_dict: Dict[str, str] = {} if resource in configs_result: config_entries = configs_result[resource] for config_name, config_entry in config_entries.items(): # Handle different kafka-python versions if hasattr(config_entry, "value"): config_dict[config_name] = config_entry.value else: # Fallback for older versions config_dict[config_name] = str(config_entry) return config_dict except Exception as e: logger.error(f"[GET_TOPIC_CONFIG] failed for topic {topic_name}: {e}") # Check if topic exists first try: topics = self.list_topics(include_internal=True) if topic_name not in topics: logger.error(f"Topic '{topic_name}' does not exist") return {"error": f"Topic '{topic_name}' does not exist"} except Exception: pass return {"error": f"Failed to retrieve config: {str(e)}"} def list_topics(self, include_internal: bool = False) -> List[str]: """ Return topic names in the cluster. By default skips internal topics (e.g., __consumer_offsets). """ try: topics_meta = self._admin.describe_topics() # all topics names: List[str] = [] for t in topics_meta: if not include_internal and t.get("is_internal", False): continue names.append(t["topic"]) return sorted(names) except KafkaError as e: logger.error(f"[LIST_TOPICS] failed: {e}") return [] def describe_topic(self, topic_name: str) -> Dict[str, Any]: """ Get detailed information about a specific topic. """ try: topics_meta = self._admin.describe_topics([topic_name]) if topics_meta: return topics_meta[0] return {} except KafkaError as e: logger.error(f"[DESCRIBE_TOPIC] failed for topic {topic_name}: {e}") return {} def create_topic( self, topic_name: str, num_partitions: int = 1, replication_factor: int = 1, topic_configs: Optional[Dict[str, str]] = None ) -> bool: """ Create a new topic. """ try: topic = NewTopic( name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor, topic_configs=topic_configs or {} ) result = self._admin.create_topics([topic]) # Wait for the operation to complete for topic_name, future in result.items(): try: future.result() # The result itself is None logger.info(f"Topic {topic_name} created successfully") return True except TopicAlreadyExistsError: logger.warning(f"Topic {topic_name} already exists") return False except Exception as e: logger.error(f"Failed to create topic {topic_name}: {e}") return False except KafkaError as e: logger.error(f"[CREATE_TOPIC] failed: {e}") return False def delete_topic(self, topic_name: str) -> bool: """ Delete a topic. """ try: result = self._admin.delete_topics([topic_name]) for topic_name, future in result.items(): try: future.result() logger.info(f"Topic {topic_name} deleted successfully") return True except Exception as e: logger.error(f"Failed to delete topic {topic_name}: {e}") return False except KafkaError as e: logger.error(f"[DELETE_TOPIC] failed: {e}") return False def get_topic_config(self, topic_name: str) -> Dict[str, str]: """ Get configuration for a specific topic. """ try: resource = ConfigResource(ConfigResourceType.TOPIC, topic_name) configs = self._admin.describe_configs([resource]) if resource in configs: return {config.name: config.value for config in configs[resource].values()} return {} except KafkaError as e: logger.error(f"[GET_TOPIC_CONFIG] failed for topic {topic_name}: {e}") return {} def get_cluster_details(self) -> Dict[str, Any]: """ Get comprehensive cluster health and metadata information. """ try: import time # Get cluster metadata metadata = self._admin._client.cluster cluster_id = getattr(metadata, 'cluster_id', 'unknown') # Get broker information brokers = list(metadata.brokers()) broker_count = len(brokers) # Get topic information topics = self.list_topics(include_internal=False) topics_count = len(topics) # Check partition health offline_partition_count = 0 under_replicated_count = 0 total_partitions = 0 try: # Get detailed topic metadata for health check all_topics_meta = self._admin.describe_topics() for topic_meta in all_topics_meta: if topic_meta.get("is_internal", False): continue partitions = topic_meta.get("partitions", []) total_partitions += len(partitions) for partition in partitions: # Check if partition is offline (no leader) if partition.get("leader") is None or partition.get("leader") == -1: offline_partition_count += 1 # Check if partition is under-replicated replicas = partition.get("replicas", []) isr = partition.get("isr", []) # in-sync replicas if len(isr) < len(replicas): under_replicated_count += 1 except Exception as e: logger.warning(f"Could not get detailed partition health: {e}") # Determine if single broker setup single_broker = broker_count == 1 return { "cluster_id": cluster_id, "broker_count": broker_count, "brokers": [{"id": b.nodeId, "host": b.host, "port": b.port} for b in brokers], "topics_count": topics_count, "total_partitions": total_partitions, "health": { "offline_partition_count": offline_partition_count, "under_replicated_count": under_replicated_count, "single_broker": single_broker }, "timestamp": int(time.time() * 1000) } except Exception as e: logger.error(f"[GET_CLUSTER_DETAILS] failed: {e}") return {"error": f"Failed to get cluster details: {str(e)}"} def get_consumer_lag(self, group_id: str, topic_name: str, print_output: bool = True) -> Dict[str, Any]: """ Get consumer lag information for a specific group and topic. """ consumer = None try: # Get consumer group offsets offsets = self._admin.list_consumer_group_offsets(group_id) # Filter for the specific topic topic_partitions = [tp for tp in offsets.keys() if tp.topic == topic_name] if not topic_partitions: return {"total_lag": 0, "partitions": [], "message": f"No offsets found for topic {topic_name}"} # Get high water marks using a single consumer with proper config consumer = KafkaConsumer( bootstrap_servers=self.bootstrap_servers, client_id=f"{self.client_id}-lag-check", group_id=None, # Don't join a consumer group enable_auto_commit=False, # Don't auto-commit consumer_timeout_ms=5000, # 5 second timeout api_version_auto_timeout_ms=3000, # Quick API version detection request_timeout_ms=10000, # 10 second request timeout metadata_max_age_ms=30000, # Refresh metadata every 30 seconds auto_offset_reset='latest' ) partition_lags = [] total_lag = 0 # Get all high water marks at once for efficiency all_partitions = list(topic_partitions) consumer.assign(all_partitions) # Get end offsets for all partitions at once end_offsets = consumer.end_offsets(all_partitions) for tp in topic_partitions: try: # Get consumer offset consumer_offset = offsets[tp].offset if offsets[tp] else 0 # Get high water mark from end_offsets high_water_mark = end_offsets.get(tp, 0) # Calculate lag lag = max(0, high_water_mark - consumer_offset) total_lag += lag partition_lags.append({ "partition": tp.partition, "consumer_offset": consumer_offset, "high_water_mark": high_water_mark, "lag": lag }) except Exception as e: logger.warning(f"Could not get lag for partition {tp.partition}: {e}") partition_lags.append({ "partition": tp.partition, "consumer_offset": 0, "high_water_mark": 0, "lag": 0, "error": str(e) }) result = { "group_id": group_id, "topic": topic_name, "total_lag": total_lag, "partitions": partition_lags } if print_output: logger.info(f"Consumer lag for group '{group_id}', topic '{topic_name}': {total_lag}") return result except Exception as e: logger.error(f"[GET_CONSUMER_LAG] failed: {e}") return {"error": f"Failed to get consumer lag: {str(e)}", "total_lag": 0} finally: # Always close the consumer if consumer: try: consumer.close() except Exception as e: logger.warning(f"Error closing consumer: {e}") def alert_summary(self) -> dict: """ Auto-discovers cluster signals + lag and returns OK/WARN/CRITICAL without external params. Built-in thresholds (tune here if needed): - offline_crit: any offline partition -> CRITICAL - urp_warn/urp_crit: under-replicated partitions thresholds - lag_warn/lag_crit: per (group, topic) lag thresholds """ import time from kafka.errors import KafkaError # ---- thresholds (edit here if you want different defaults) ---- offline_crit = 1 urp_warn, urp_crit = 1, 5 lag_warn, lag_crit = 10_000, 100_000 single_broker_warn = True def rank(level: str) -> int: return {"OK": 0, "WARN": 1, "CRITICAL": 2}[level] overall = "OK" signals: list[dict] = [] # ---- cluster health (uses your existing helper) ---- cluster = self.get_cluster_details() if "error" in cluster: return { "status": "CRITICAL", "signals": [{"level": "CRITICAL", "code": "CLUSTER_UNREACHABLE", "message": cluster["error"], "data": {}}], "summary": {"cluster_id": None}, "timestamp": int(time.time() * 1000), } cid = cluster.get("cluster_id") brokers = cluster.get("broker_count", 0) topics_count = cluster.get("topics_count", 0) urp = cluster.get("health", {}).get("under_replicated_count", 0) offline = cluster.get("health", {}).get("offline_partition_count", 0) single_broker = cluster.get("health", {}).get("single_broker", False) # offline partitions -> critical if offline >= offline_crit: signals.append({ "level": "CRITICAL", "code": "OFFLINE_PARTITIONS", "message": f"{offline} partition(s) offline (no leader)", "data": {"offline_partitions": offline}, }) overall = "CRITICAL" # URP if urp >= urp_crit: lvl = "CRITICAL" elif urp >= urp_warn: lvl = "WARN" else: lvl = "OK" if lvl != "OK": signals.append({ "level": lvl, "code": "UNDER_REPLICATED", "message": f"{urp} partition(s) under-replicated", "data": {"under_replicated": urp}, }) if rank(lvl) > rank(overall): overall = lvl # single broker (optional warning) if single_broker_warn and single_broker: signals.append({ "level": "WARN", "code": "SINGLE_BROKER", "message": "Cluster has a single broker (no replication/failover)", "data": {"broker_count": brokers}, }) if rank("WARN") > rank(overall): overall = "WARN" # ---- auto-discover groups & topics they actually consume ---- lag_summaries: list[dict] = [] try: groups = self._admin.list_consumer_groups() # [(group_id, protocol_type)] group_ids = [g[0] for g in groups] logger.info(f"Found {len(group_ids)} consumer groups: {group_ids}") for gid in group_ids: try: # Add timeout protection for each group offsets = self._admin.list_consumer_group_offsets(gid) # {TopicPartition: OffsetAndMetadata} topics_for_group = sorted({tp.topic for tp in offsets.keys()}) logger.info(f"Group '{gid}' consumes topics: {topics_for_group}") for topic in topics_for_group: try: lag = self.get_consumer_lag(gid, topic, print_output=False) # Handle error case if "error" in lag: logger.warning(f"Failed to get lag for group='{gid}', topic='{topic}': {lag['error']}") total = 0 else: total = int(lag.get("total_lag", 0)) if total >= lag_crit: lvl = "CRITICAL" elif total >= lag_warn: lvl = "WARN" else: lvl = "OK" lag_summaries.append({ "group": gid, "topic": topic, "total_lag": total, "level": lvl, }) if lvl != "OK": signals.append({ "level": lvl, "code": "CONSUMER_LAG", "message": f"High lag group='{gid}', topic='{topic}': {total}", "data": {"group": gid, "topic": topic, "total_lag": total}, }) if rank(lvl) > rank(overall): overall = lvl except Exception as topic_e: logger.warning(f"Failed to process lag for group='{gid}', topic='{topic}': {topic_e}") lag_summaries.append({ "group": gid, "topic": topic, "total_lag": 0, "level": "UNKNOWN", "error": str(topic_e) }) except Exception as group_e: logger.warning(f"Failed to process consumer group '{gid}': {group_e}") signals.append({ "level": "WARN", "code": "GROUP_PROCESSING_FAILED", "message": f"Could not process consumer group '{gid}': {group_e}", "data": {"group": gid}, }) if rank("WARN") > rank(overall): overall = "WARN" except KafkaError as e: signals.append({ "level": "WARN", "code": "LAG_DISCOVERY_FAILED", "message": f"Could not enumerate groups/offsets: {e}", "data": {}, }) if rank("WARN") > rank(overall): overall = "WARN" except Exception as e: logger.error(f"Unexpected error in lag discovery: {e}") signals.append({ "level": "WARN", "code": "LAG_DISCOVERY_ERROR", "message": f"Unexpected error during lag discovery: {e}", "data": {}, }) if rank("WARN") > rank(overall): overall = "WARN" return { "status": overall, "signals": signals, "summary": { "cluster_id": cid, "broker_count": brokers, "topics_count": topics_count, "under_replicated": urp, "offline_partitions": offline, "single_broker": single_broker, "lag": lag_summaries, }, "timestamp": int(time.time() * 1000), } # Global Kafka inspector instance kafka_inspector = None # Global Temporal Memory instances temporal_store: Optional[TemporalMemoryStore] = None temporal_analyzer: Optional[TemporalAnalyzer] = None # --- Temporal Memory Helper Functions --- def log_temporal_event( event_type: str, severity: str, code: str, message: str, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None ) -> None: """ Log an event to temporal memory if store is initialized. Args: event_type: ALERT | METRIC_SNAPSHOT | ACTION | HEALTH_CHECK severity: OK | WARN | CRITICAL | INFO code: Event code message: Human-readable message data: Event-specific data metadata: Optional metadata """ global temporal_store if temporal_store is None: return # Temporal memory not initialized try: event = TemporalEvent( event_id=str(uuid.uuid4()), event_type=event_type, timestamp=int(time.time() * 1000), severity=severity, code=code, message=message, data=data, metadata=metadata or {} ) temporal_store.add_event(event) except Exception as e: logger.warning(f"Failed to log temporal event: {e}") # --- MCP Tools --- @mcp.tool def initialize_kafka_connection( bootstrap_servers: str, client_id: str = "kafka-mcp-client", request_timeout_ms: int = 45000, api_version_auto_timeout_ms: int = 8000 ) -> str: """ Initialize connection to Kafka cluster. Args: bootstrap_servers: Comma-separated list of Kafka broker addresses (e.g., "localhost:9092") client_id: Client identifier for this connection request_timeout_ms: Request timeout in milliseconds api_version_auto_timeout_ms: API version auto-detection timeout in milliseconds Returns: Status message indicating success or failure """ global kafka_inspector, temporal_store, temporal_analyzer try: servers = [s.strip() for s in bootstrap_servers.split(',')] kafka_inspector = KafkaInspector( bootstrap_servers=servers, client_id=client_id, request_timeout_ms=request_timeout_ms, api_version_auto_timeout_ms=api_version_auto_timeout_ms ) # Initialize temporal memory store temporal_store = TemporalMemoryStore( retention_hours=TEMPORAL_RETENTION_HOURS, max_events=TEMPORAL_MAX_EVENTS ) temporal_analyzer = TemporalAnalyzer(temporal_store) # Try to import previous state if auto-persist is enabled if TEMPORAL_AUTO_PERSIST and Path(TEMPORAL_PERSIST_PATH).exists(): if temporal_store.import_state(TEMPORAL_PERSIST_PATH): logger.info(f"Restored temporal state from {TEMPORAL_PERSIST_PATH}") logger.info("Temporal memory initialized with trend tracking enabled") return f"Successfully connected to Kafka cluster at {bootstrap_servers} (Temporal memory: ENABLED)" except Exception as e: return f"Failed to connect to Kafka cluster: {str(e)}" @mcp.tool def list_kafka_topics(include_internal: bool = False) -> str: """ List all topics in the Kafka cluster. Args: include_internal: Whether to include internal topics like __consumer_offsets Returns: JSON string containing list of topic names """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: topics = kafka_inspector.list_topics(include_internal=include_internal) return json.dumps({ "topics": topics, "count": len(topics), "include_internal": include_internal }) except Exception as e: return json.dumps({"error": f"Failed to list topics: {str(e)}"}) @mcp.tool def describe_kafka_topic(topic_name: str) -> str: """ Get detailed information about a specific Kafka topic. Args: topic_name: Name of the topic to describe Returns: JSON string containing topic metadata """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: topic_info = kafka_inspector.describe_topic(topic_name) return json.dumps(topic_info, indent=2) except Exception as e: return json.dumps({"error": f"Failed to describe topic {topic_name}: {str(e)}"}) @mcp.tool def create_kafka_topic( topic_name: str, num_partitions: int = 1, replication_factor: int = 1, topic_configs: str = "{}" ) -> str: """ Create a new Kafka topic. Args: topic_name: Name of the topic to create num_partitions: Number of partitions for the topic replication_factor: Replication factor for the topic topic_configs: JSON string of topic configurations Returns: Status message indicating success or failure """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: configs = json.loads(topic_configs) if topic_configs != "{}" else None success = kafka_inspector.create_topic( topic_name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor, topic_configs=configs ) if success: return json.dumps({ "status": "success", "message": f"Topic '{topic_name}' created successfully", "topic_name": topic_name, "num_partitions": num_partitions, "replication_factor": replication_factor }) else: return json.dumps({ "status": "failed", "message": f"Failed to create topic '{topic_name}'" }) except Exception as e: return json.dumps({"error": f"Failed to create topic: {str(e)}"}) @mcp.tool def delete_kafka_topic(topic_name: str) -> str: """ Delete a Kafka topic. Args: topic_name: Name of the topic to delete Returns: Status message indicating success or failure """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: success = kafka_inspector.delete_topic(topic_name) if success: return json.dumps({ "status": "success", "message": f"Topic '{topic_name}' deleted successfully" }) else: return json.dumps({ "status": "failed", "message": f"Failed to delete topic '{topic_name}'" }) except Exception as e: return json.dumps({"error": f"Failed to delete topic: {str(e)}"}) @mcp.tool def get_kafka_topic_config(topic_name: str) -> str: """ Get configuration settings for a specific Kafka topic. Args: topic_name: Name of the topic to get configuration for Returns: JSON string containing topic configuration """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: config = kafka_inspector.get_topic_config(topic_name) # Handle empty config case if not config: return json.dumps({ "topic_name": topic_name, "configuration": {}, "message": f"No configuration found for topic '{topic_name}' or topic does not exist" }, indent=2) return json.dumps({ "topic_name": topic_name, "configuration": config, "config_count": len(config) }, indent=2) except Exception as e: logger.error(f"Error in get_kafka_topic_config: {str(e)}") return json.dumps({ "error": f"Failed to get topic config: {str(e)}", "topic_name": topic_name, "suggestion": "Check if topic exists and you have proper permissions" }) @mcp.tool def kafka_health_check() -> str: """ Check the health status of the Kafka connection. Returns: JSON string containing health status information """ if kafka_inspector is None: return json.dumps({ "status": "disconnected", "message": "Kafka connection not initialized" }) try: # Try to list topics as a health check topics = kafka_inspector.list_topics() return json.dumps({ "status": "healthy", "message": "Kafka connection is working", "bootstrap_servers": kafka_inspector.bootstrap_servers, "client_id": kafka_inspector.client_id, "topics_count": len(topics) }) except Exception as e: return json.dumps({ "status": "unhealthy", "message": f"Kafka connection error: {str(e)}" }) @mcp.tool def get_cluster_details() -> str: """ Get comprehensive cluster health and metadata information. Returns: JSON string containing detailed cluster information including broker count, topics, and partition health """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: cluster_info = kafka_inspector.get_cluster_details() return json.dumps(cluster_info, indent=2) except Exception as e: return json.dumps({"error": f"Failed to get cluster details: {str(e)}"}) @mcp.tool def get_consumer_lag(group_id: str, topic_name: str) -> str: """ Get consumer lag information for a specific consumer group and topic. Args: group_id: Consumer group ID to check topic_name: Topic name to analyze lag for Returns: JSON string containing lag information per partition and total lag """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: lag_info = kafka_inspector.get_consumer_lag(group_id, topic_name, print_output=False) return json.dumps(lag_info, indent=2) except Exception as e: return json.dumps({"error": f"Failed to get consumer lag: {str(e)}"}) @mcp.tool def kafka_alert_summary() -> str: """ Get comprehensive cluster health alert summary with automatic threshold-based analysis. Returns: JSON string containing overall status (OK/WARN/CRITICAL), detected issues, and detailed metrics. Automatically analyzes: - Offline partitions - Under-replicated partitions - Consumer lag across all groups and topics - Single broker warnings """ if kafka_inspector is None: return json.dumps({ "status": "CRITICAL", "error": "Kafka connection not initialized. Please call initialize_kafka_connection first." }) try: alert_summary = kafka_inspector.alert_summary() # Log to temporal memory if temporal_store: # Log overall health check log_temporal_event( event_type="HEALTH_CHECK", severity=alert_summary.get("status", "OK"), code="CLUSTER_HEALTH", message=f"Cluster health check: {alert_summary.get('status')}", data=alert_summary.get("summary", {}) ) # Log each signal as an alert for signal in alert_summary.get("signals", []): log_temporal_event( event_type="ALERT", severity=signal.get("level", "INFO"), code=signal.get("code", "UNKNOWN"), message=signal.get("message", ""), data=signal.get("data", {}) ) # Log consumer lag as metrics for lag_info in alert_summary.get("summary", {}).get("lag", []): log_temporal_event( event_type="METRIC_SNAPSHOT", severity=lag_info.get("level", "OK"), code=f"CONSUMER_LAG_{lag_info.get('group')}_{lag_info.get('topic')}", message=f"Consumer lag for group '{lag_info.get('group')}' on topic '{lag_info.get('topic')}'", data={"value": lag_info.get("total_lag", 0), "group": lag_info.get("group"), "topic": lag_info.get("topic")} ) return json.dumps(alert_summary, indent=2) except Exception as e: return json.dumps({ "status": "CRITICAL", "error": f"Failed to generate alert summary: {str(e)}" }) @mcp.tool def get_broker_resources(broker_id: str) -> str: """ Get resource information for a specific Kafka broker. Args: broker_id: Broker ID or hostname to get resource information for Returns: JSON string containing broker resource details (RAM, storage, cores) """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: # TODO: Implement actual resource monitoring integration # This would typically connect(external sys) to: # - Prometheus/Grafana for metrics # - JMX for Kafka-specific metrics # - System monitoring tools (htop, iostat, etc.) # - Cloud provider APIs (AWS CloudWatch, etc.) # - Kubernetes metrics if running in K8s # For now, return hardcoded realistic values broker_resources = { "ram_gb": 32, "storage_gb": 1000, "cores": 8 } return json.dumps({ "broker_id": broker_id, "resources": broker_resources, "status": "active", "note": "Hardcoded values - integrate with monitoring system" }, indent=2) except Exception as e: return json.dumps({ "error": f"Failed to get broker resources: {str(e)}", "broker_id": broker_id }) @mcp.tool def send_email_notification(recipient_email: str, content: str, subject: str = "KafkaIQ Notification") -> str: """ Send email notification using Gmail SMTP. Args: recipient_email: Email address to send notification to content: Email content/body subject: Email subject line (optional) Returns: JSON string indicating email send status """ try: # Validate inputs first if not recipient_email or "@" not in recipient_email: return json.dumps({ "status": "failed", "error": "Invalid recipient email address" }) if not content.strip(): return json.dumps({ "status": "failed", "error": "Email content cannot be empty" }) # Gmail SMTP settings smtp_server = "smtp.gmail.com" smtp_port = 587 # Email credentials or get from env sender_email = os.getenv('KAFKAIQ_GMAIL_USER', 'DEFAULT_EMAIL') sender_password = os.getenv('KAFKAIQ_GMAIL_PASSWORD', 'DEFAULT_PASSWORD') # Create email message message = MIMEMultipart() message["From"] = sender_email message["To"] = recipient_email message["Subject"] = subject # Simple text email body email_body = f"""KafkaIQ Notification =================== {content} --- Sent by KafkaIQ at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}""" message.attach(MIMEText(email_body, "plain")) # Send email via Gmail SMTP server = smtplib.SMTP(smtp_server, smtp_port) server.starttls() # Enable encryption server.login(sender_email, sender_password) # Send the email text = message.as_string() server.sendmail(sender_email, recipient_email, text) server.quit() return json.dumps({ "status": "sent", "message": "Email sent successfully", "recipient": recipient_email, "subject": subject, "timestamp": datetime.datetime.now().isoformat() }) except smtplib.SMTPAuthenticationError: return json.dumps({ "status": "failed", "error": "Gmail authentication failed. Check email/password or enable 2FA and use App Password" }) except Exception as e: return json.dumps({ "status": "failed", "error": f"Failed to send email: {str(e)}" }) @mcp.tool def broker_leadership_distribution(include_internal: bool = False) -> str: """ Return how many partitions each broker is the leader for. Example Output: { "total_partitions": int, "distribution": { broker_id: count }, "by_topic": { topic: { broker_id: [partitions...] } } } """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: topics_meta = kafka_inspector._admin.describe_topics() except Exception as e: return json.dumps({"error": f"describe_topics failed: {e}"}) distribution = {} by_topic = {} total = 0 for t in topics_meta: if not include_internal and t.get("is_internal", False): continue topic_name = t["topic"] by_topic[topic_name] = {} for p in t.get("partitions", []) or []: leader = p.get("leader", -1) if leader == -1: continue # skip offline partitions total += 1 distribution[leader] = distribution.get(leader, 0) + 1 by_topic[topic_name].setdefault(leader, []).append(p["partition"]) result = { "total_partitions": total, "distribution": distribution, "by_topic": by_topic, } return json.dumps(result, indent=2) @mcp.tool def get_offline_partitions(include_internal: bool = False) -> str: """ Returns all offline partitions (leader == -1). Example Output: { "offline_count": int, "by_topic": { topic: [partition_ids...] } } """ if kafka_inspector is None: return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."}) try: topics_meta = kafka_inspector._admin.describe_topics() except Exception as e: return json.dumps({"error": f"describe_topics failed: {e}"}) result = {"offline_count": 0, "by_topic": {}} for t in topics_meta: if not include_internal and t.get("is_internal", False): continue topic_name = t["topic"] partitions = t.get("partitions", []) or [] for p in partitions: if p.get("leader", -1) == -1: result["offline_count"] += 1 result["by_topic"].setdefault(topic_name, []).append(p["partition"]) return json.dumps(result, indent=2) # --- Temporal Memory MCP Tools --- @mcp.tool def get_temporal_health_summary(time_window_hours: int = 1) -> str: """ Get health status trend over a specified time window. Args: time_window_hours: Time window in hours (default: 1) Returns: JSON string with health trend analysis including status changes and alert frequencies """ if temporal_store is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: now = int(time.time() * 1000) start_time = now - (time_window_hours * 3600 * 1000) # Get all health check events health_events = temporal_store.get_events( start_time=start_time, event_type="HEALTH_CHECK", code="CLUSTER_HEALTH" ) # Get all alerts in window alerts = temporal_store.get_events( start_time=start_time, event_type="ALERT" ) # Analyze status changes status_changes = [] if len(health_events) >= 2: prev_status = None for event in reversed(health_events): # Oldest to newest current_status = event.severity if prev_status and prev_status != current_status: status_changes.append({ "from": prev_status, "to": current_status, "timestamp": event.timestamp }) prev_status = current_status # Count alert frequencies alert_freq = {} for alert in alerts: code = alert.code alert_freq[code] = alert_freq.get(code, 0) + 1 # Current vs historical comparison current_status = health_events[0].severity if health_events else "UNKNOWN" return json.dumps({ "time_window_hours": time_window_hours, "health_checks_count": len(health_events), "current_status": current_status, "status_changes": status_changes, "status_change_count": len(status_changes), "alerts_count": len(alerts), "alert_frequencies": alert_freq, "top_alerts": sorted(alert_freq.items(), key=lambda x: x[1], reverse=True)[:5] }, indent=2) except Exception as e: return json.dumps({"error": f"Failed to generate temporal health summary: {str(e)}"}) @mcp.tool def get_recurring_issues(min_occurrences: int = 3, time_window_hours: int = 1) -> str: """ Identify issues that occurred multiple times within a time window. Args: min_occurrences: Minimum number of occurrences to be considered recurring (default: 3) time_window_hours: Time window in hours (default: 1) Returns: JSON string with list of recurring issues and their statistics """ if temporal_store is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: recurring = temporal_store.get_recurring_issues( threshold=min_occurrences, time_window_hours=time_window_hours ) return json.dumps({ "time_window_hours": time_window_hours, "min_occurrences": min_occurrences, "recurring_issues_count": len(recurring), "issues": recurring }, indent=2) except Exception as e: return json.dumps({"error": f"Failed to detect recurring issues: {str(e)}"}) @mcp.tool def get_issue_frequency(issue_code: str, time_window_hours: int = 24) -> str: """ Count occurrences of a specific issue over time with hourly breakdown. Args: issue_code: Code of the issue to track (e.g., "CONSUMER_LAG", "OFFLINE_PARTITIONS") time_window_hours: Time window in hours (default: 24) Returns: JSON string with frequency count and hourly breakdown """ if temporal_store is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: now = int(time.time() * 1000) start_time = now - (time_window_hours * 3600 * 1000) events = temporal_store.get_events( start_time=start_time, code=issue_code ) # Hourly breakdown hourly = {} for event in events: hour = event.timestamp // (3600 * 1000) hourly[hour] = hourly.get(hour, 0) + 1 return json.dumps({ "issue_code": issue_code, "time_window_hours": time_window_hours, "total_occurrences": len(events), "hourly_breakdown": hourly, "first_occurrence": events[-1].timestamp if events else None, "last_occurrence": events[0].timestamp if events else None }, indent=2) except Exception as e: return json.dumps({"error": f"Failed to get issue frequency: {str(e)}"}) @mcp.tool def get_lag_trend_analysis(group_id: str, topic_name: str, time_window_hours: int = 6) -> str: """ Analyze consumer lag trend for a specific group/topic over time. Args: group_id: Consumer group ID topic_name: Topic name time_window_hours: Time window for analysis (default: 6) Returns: JSON string with trend analysis (min, max, avg, current, direction) """ if temporal_store is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: code = f"CONSUMER_LAG_{group_id}_{topic_name}" trend = temporal_store.get_trend_analysis(code, time_window_hours) # Add spike detection if analyzer is available if temporal_analyzer: spikes = temporal_analyzer.detect_lag_spikes( group_id, topic_name, time_window_hours ) trend["spike_analysis"] = spikes return json.dumps(trend, indent=2) except Exception as e: return json.dumps({"error": f"Failed to analyze lag trend: {str(e)}"}) @mcp.tool def get_partition_health_history(topic_name: str, time_window_hours: int = 12) -> str: """ Track partition online/offline transitions and identify flapping partitions. Args: topic_name: Topic name to analyze time_window_hours: Time window in hours (default: 12) Returns: JSON string with partition health history and flapping detection """ if temporal_store is None or temporal_analyzer is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: # Detect flapping partitions flapping = temporal_analyzer.detect_partition_flapping(time_window_hours) # Filter for specific topic topic_flapping = [p for p in flapping if p["topic"] == topic_name] return json.dumps({ "topic_name": topic_name, "time_window_hours": time_window_hours, "flapping_partitions_count": len(topic_flapping), "flapping_partitions": topic_flapping }, indent=2) except Exception as e: return json.dumps({"error": f"Failed to get partition health history: {str(e)}"}) @mcp.tool def detect_transient_vs_persistent(issue_code: str, persistence_threshold_minutes: int = 30) -> str: """ Classify if an issue is transient (short-lived) or persistent (long-lasting). Args: issue_code: Code of the issue to classify persistence_threshold_minutes: Duration threshold in minutes (default: 30) Returns: JSON string with classification and confidence score """ if temporal_store is None or temporal_analyzer is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: classification = temporal_analyzer.is_transient_vs_persistent( issue_code, persistence_threshold_minutes ) return json.dumps(classification, indent=2) except Exception as e: return json.dumps({"error": f"Failed to classify issue: {str(e)}"}) @mcp.tool def export_temporal_state(filepath: str = "temporal_state.json") -> str: """ Export temporal memory to a JSON file for backup or analysis. Args: filepath: Path to save the export file (default: temporal_state.json) Returns: JSON string with export status """ if temporal_store is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: success = temporal_store.export_state(filepath) if success: stats = temporal_store.get_statistics() return json.dumps({ "status": "success", "message": f"Exported temporal state to {filepath}", "filepath": filepath, "events_exported": stats["total_events"] }, indent=2) else: return json.dumps({ "status": "failed", "message": "Failed to export temporal state" }) except Exception as e: return json.dumps({"error": f"Failed to export state: {str(e)}"}) @mcp.tool def import_temporal_state(filepath: str = "temporal_state.json") -> str: """ Import previously exported temporal memory from a JSON file. Args: filepath: Path to the export file to import (default: temporal_state.json) Returns: JSON string with import status """ if temporal_store is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: success = temporal_store.import_state(filepath) if success: stats = temporal_store.get_statistics() return json.dumps({ "status": "success", "message": f"Imported temporal state from {filepath}", "filepath": filepath, "events_imported": stats["total_events"] }, indent=2) else: return json.dumps({ "status": "failed", "message": f"Failed to import temporal state from {filepath}" }) except Exception as e: return json.dumps({"error": f"Failed to import state: {str(e)}"}) @mcp.tool def get_temporal_statistics() -> str: """ Get statistics about the temporal memory store. Returns: JSON string with memory statistics """ if temporal_store is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: stats = temporal_store.get_statistics() return json.dumps(stats, indent=2) except Exception as e: return json.dumps({"error": f"Failed to get statistics: {str(e)}"}) @mcp.tool def clear_temporal_memory(confirm: bool = False) -> str: """ Clear all temporal memory (requires confirmation for safety). Args: confirm: Must be True to actually clear memory Returns: JSON string with clear status """ if temporal_store is None: return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."}) try: if not confirm: return json.dumps({ "status": "cancelled", "message": "Clear operation cancelled. Set confirm=True to proceed." }) success = temporal_store.clear(confirm=True) if success: return json.dumps({ "status": "success", "message": "Temporal memory cleared successfully" }) else: return json.dumps({ "status": "failed", "message": "Failed to clear temporal memory" }) except Exception as e: return json.dumps({"error": f"Failed to clear memory: {str(e)}"}) # --- Main entrypoint --- def main(): try: if len(sys.argv) > 1 and sys.argv[1] == "http": logger.info("Starting Kafka MCP server in HTTP mode...") mcp.run(transport="streamable-http", host="127.0.0.1", port=8080) else: # Default to stdio for Claude Desktop integration logger.info("Starting Kafka MCP server in stdio mode...") mcp.run(transport="stdio") except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {str(e)}") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server