We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import sys
import logging
import json
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
import datetime
from fastmcp import FastMCP
import os
# Kafka imports
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType, NewTopic
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError, NoBrokersAvailable, KafkaTimeoutError, TopicAlreadyExistsError
from typing import Union, List, Dict
import logging
# mail imports
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import datetime
import time
import uuid
# Temporal memory imports
from temporal_memory import TemporalMemoryStore, TemporalEvent, TemporalAnalyzer
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# Initialize MCP server
mcp = FastMCP("Kafka MCP Server")
# Temporal Memory Configuration
TEMPORAL_RETENTION_HOURS = int(os.getenv('KAFKAIQ_TEMPORAL_RETENTION_HOURS', '24'))
TEMPORAL_MAX_EVENTS = int(os.getenv('KAFKAIQ_TEMPORAL_MAX_EVENTS', '100000'))
TEMPORAL_AUTO_PERSIST = os.getenv('KAFKAIQ_TEMPORAL_AUTO_PERSIST', 'true').lower() == 'true'
TEMPORAL_PERSIST_PATH = os.getenv('KAFKAIQ_TEMPORAL_PERSIST_PATH', 'temporal_state.json')
class KafkaInspector:
"""
Holds broker details (object-level) and exposes single-responsibility utilities.
"""
def __init__(
self,
bootstrap_servers: Union[str, List[str]],
client_id: str = "kafka-inspector",
request_timeout_ms: int = 45000, # keep > session_timeout_ms
api_version_auto_timeout_ms: int = 8000,
):
self.bootstrap_servers = bootstrap_servers
self.client_id = client_id
self.request_timeout_ms = request_timeout_ms
self.api_version_auto_timeout_ms = api_version_auto_timeout_ms
self._admin = KafkaAdminClient(
bootstrap_servers=self.bootstrap_servers,
client_id=self.client_id,
request_timeout_ms=self.request_timeout_ms,
api_version_auto_timeout_ms=self.api_version_auto_timeout_ms,
)
def get_topic_config(self, topic_name: str) -> Dict[str, str]:
"""
Get configuration for a specific topic.
"""
try:
# Create ConfigResource for TOPIC type
resource = ConfigResource(ConfigResourceType.TOPIC, topic_name)
# Get configurations
configs_result = self._admin.describe_configs([resource])
# Extract configuration values
config_dict: Dict[str, str] = {}
if resource in configs_result:
config_entries = configs_result[resource]
for config_name, config_entry in config_entries.items():
# Handle different kafka-python versions
if hasattr(config_entry, "value"):
config_dict[config_name] = config_entry.value
else:
# Fallback for older versions
config_dict[config_name] = str(config_entry)
return config_dict
except Exception as e:
logger.error(f"[GET_TOPIC_CONFIG] failed for topic {topic_name}: {e}")
# Check if topic exists first
try:
topics = self.list_topics(include_internal=True)
if topic_name not in topics:
logger.error(f"Topic '{topic_name}' does not exist")
return {"error": f"Topic '{topic_name}' does not exist"}
except Exception:
pass
return {"error": f"Failed to retrieve config: {str(e)}"}
def list_topics(self, include_internal: bool = False) -> List[str]:
"""
Return topic names in the cluster.
By default skips internal topics (e.g., __consumer_offsets).
"""
try:
topics_meta = self._admin.describe_topics() # all topics
names: List[str] = []
for t in topics_meta:
if not include_internal and t.get("is_internal", False):
continue
names.append(t["topic"])
return sorted(names)
except KafkaError as e:
logger.error(f"[LIST_TOPICS] failed: {e}")
return []
def describe_topic(self, topic_name: str) -> Dict[str, Any]:
"""
Get detailed information about a specific topic.
"""
try:
topics_meta = self._admin.describe_topics([topic_name])
if topics_meta:
return topics_meta[0]
return {}
except KafkaError as e:
logger.error(f"[DESCRIBE_TOPIC] failed for topic {topic_name}: {e}")
return {}
def create_topic(
self,
topic_name: str,
num_partitions: int = 1,
replication_factor: int = 1,
topic_configs: Optional[Dict[str, str]] = None
) -> bool:
"""
Create a new topic.
"""
try:
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs=topic_configs or {}
)
result = self._admin.create_topics([topic])
# Wait for the operation to complete
for topic_name, future in result.items():
try:
future.result() # The result itself is None
logger.info(f"Topic {topic_name} created successfully")
return True
except TopicAlreadyExistsError:
logger.warning(f"Topic {topic_name} already exists")
return False
except Exception as e:
logger.error(f"Failed to create topic {topic_name}: {e}")
return False
except KafkaError as e:
logger.error(f"[CREATE_TOPIC] failed: {e}")
return False
def delete_topic(self, topic_name: str) -> bool:
"""
Delete a topic.
"""
try:
result = self._admin.delete_topics([topic_name])
for topic_name, future in result.items():
try:
future.result()
logger.info(f"Topic {topic_name} deleted successfully")
return True
except Exception as e:
logger.error(f"Failed to delete topic {topic_name}: {e}")
return False
except KafkaError as e:
logger.error(f"[DELETE_TOPIC] failed: {e}")
return False
def get_topic_config(self, topic_name: str) -> Dict[str, str]:
"""
Get configuration for a specific topic.
"""
try:
resource = ConfigResource(ConfigResourceType.TOPIC, topic_name)
configs = self._admin.describe_configs([resource])
if resource in configs:
return {config.name: config.value for config in configs[resource].values()}
return {}
except KafkaError as e:
logger.error(f"[GET_TOPIC_CONFIG] failed for topic {topic_name}: {e}")
return {}
def get_cluster_details(self) -> Dict[str, Any]:
"""
Get comprehensive cluster health and metadata information.
"""
try:
import time
# Get cluster metadata
metadata = self._admin._client.cluster
cluster_id = getattr(metadata, 'cluster_id', 'unknown')
# Get broker information
brokers = list(metadata.brokers())
broker_count = len(brokers)
# Get topic information
topics = self.list_topics(include_internal=False)
topics_count = len(topics)
# Check partition health
offline_partition_count = 0
under_replicated_count = 0
total_partitions = 0
try:
# Get detailed topic metadata for health check
all_topics_meta = self._admin.describe_topics()
for topic_meta in all_topics_meta:
if topic_meta.get("is_internal", False):
continue
partitions = topic_meta.get("partitions", [])
total_partitions += len(partitions)
for partition in partitions:
# Check if partition is offline (no leader)
if partition.get("leader") is None or partition.get("leader") == -1:
offline_partition_count += 1
# Check if partition is under-replicated
replicas = partition.get("replicas", [])
isr = partition.get("isr", []) # in-sync replicas
if len(isr) < len(replicas):
under_replicated_count += 1
except Exception as e:
logger.warning(f"Could not get detailed partition health: {e}")
# Determine if single broker setup
single_broker = broker_count == 1
return {
"cluster_id": cluster_id,
"broker_count": broker_count,
"brokers": [{"id": b.nodeId, "host": b.host, "port": b.port} for b in brokers],
"topics_count": topics_count,
"total_partitions": total_partitions,
"health": {
"offline_partition_count": offline_partition_count,
"under_replicated_count": under_replicated_count,
"single_broker": single_broker
},
"timestamp": int(time.time() * 1000)
}
except Exception as e:
logger.error(f"[GET_CLUSTER_DETAILS] failed: {e}")
return {"error": f"Failed to get cluster details: {str(e)}"}
def get_consumer_lag(self, group_id: str, topic_name: str, print_output: bool = True) -> Dict[str, Any]:
"""
Get consumer lag information for a specific group and topic.
"""
consumer = None
try:
# Get consumer group offsets
offsets = self._admin.list_consumer_group_offsets(group_id)
# Filter for the specific topic
topic_partitions = [tp for tp in offsets.keys() if tp.topic == topic_name]
if not topic_partitions:
return {"total_lag": 0, "partitions": [], "message": f"No offsets found for topic {topic_name}"}
# Get high water marks using a single consumer with proper config
consumer = KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
client_id=f"{self.client_id}-lag-check",
group_id=None, # Don't join a consumer group
enable_auto_commit=False, # Don't auto-commit
consumer_timeout_ms=5000, # 5 second timeout
api_version_auto_timeout_ms=3000, # Quick API version detection
request_timeout_ms=10000, # 10 second request timeout
metadata_max_age_ms=30000, # Refresh metadata every 30 seconds
auto_offset_reset='latest'
)
partition_lags = []
total_lag = 0
# Get all high water marks at once for efficiency
all_partitions = list(topic_partitions)
consumer.assign(all_partitions)
# Get end offsets for all partitions at once
end_offsets = consumer.end_offsets(all_partitions)
for tp in topic_partitions:
try:
# Get consumer offset
consumer_offset = offsets[tp].offset if offsets[tp] else 0
# Get high water mark from end_offsets
high_water_mark = end_offsets.get(tp, 0)
# Calculate lag
lag = max(0, high_water_mark - consumer_offset)
total_lag += lag
partition_lags.append({
"partition": tp.partition,
"consumer_offset": consumer_offset,
"high_water_mark": high_water_mark,
"lag": lag
})
except Exception as e:
logger.warning(f"Could not get lag for partition {tp.partition}: {e}")
partition_lags.append({
"partition": tp.partition,
"consumer_offset": 0,
"high_water_mark": 0,
"lag": 0,
"error": str(e)
})
result = {
"group_id": group_id,
"topic": topic_name,
"total_lag": total_lag,
"partitions": partition_lags
}
if print_output:
logger.info(f"Consumer lag for group '{group_id}', topic '{topic_name}': {total_lag}")
return result
except Exception as e:
logger.error(f"[GET_CONSUMER_LAG] failed: {e}")
return {"error": f"Failed to get consumer lag: {str(e)}", "total_lag": 0}
finally:
# Always close the consumer
if consumer:
try:
consumer.close()
except Exception as e:
logger.warning(f"Error closing consumer: {e}")
def alert_summary(self) -> dict:
"""
Auto-discovers cluster signals + lag and returns OK/WARN/CRITICAL without external params.
Built-in thresholds (tune here if needed):
- offline_crit: any offline partition -> CRITICAL
- urp_warn/urp_crit: under-replicated partitions thresholds
- lag_warn/lag_crit: per (group, topic) lag thresholds
"""
import time
from kafka.errors import KafkaError
# ---- thresholds (edit here if you want different defaults) ----
offline_crit = 1
urp_warn, urp_crit = 1, 5
lag_warn, lag_crit = 10_000, 100_000
single_broker_warn = True
def rank(level: str) -> int:
return {"OK": 0, "WARN": 1, "CRITICAL": 2}[level]
overall = "OK"
signals: list[dict] = []
# ---- cluster health (uses your existing helper) ----
cluster = self.get_cluster_details()
if "error" in cluster:
return {
"status": "CRITICAL",
"signals": [{"level": "CRITICAL", "code": "CLUSTER_UNREACHABLE", "message": cluster["error"], "data": {}}],
"summary": {"cluster_id": None},
"timestamp": int(time.time() * 1000),
}
cid = cluster.get("cluster_id")
brokers = cluster.get("broker_count", 0)
topics_count = cluster.get("topics_count", 0)
urp = cluster.get("health", {}).get("under_replicated_count", 0)
offline = cluster.get("health", {}).get("offline_partition_count", 0)
single_broker = cluster.get("health", {}).get("single_broker", False)
# offline partitions -> critical
if offline >= offline_crit:
signals.append({
"level": "CRITICAL",
"code": "OFFLINE_PARTITIONS",
"message": f"{offline} partition(s) offline (no leader)",
"data": {"offline_partitions": offline},
})
overall = "CRITICAL"
# URP
if urp >= urp_crit:
lvl = "CRITICAL"
elif urp >= urp_warn:
lvl = "WARN"
else:
lvl = "OK"
if lvl != "OK":
signals.append({
"level": lvl,
"code": "UNDER_REPLICATED",
"message": f"{urp} partition(s) under-replicated",
"data": {"under_replicated": urp},
})
if rank(lvl) > rank(overall):
overall = lvl
# single broker (optional warning)
if single_broker_warn and single_broker:
signals.append({
"level": "WARN",
"code": "SINGLE_BROKER",
"message": "Cluster has a single broker (no replication/failover)",
"data": {"broker_count": brokers},
})
if rank("WARN") > rank(overall):
overall = "WARN"
# ---- auto-discover groups & topics they actually consume ----
lag_summaries: list[dict] = []
try:
groups = self._admin.list_consumer_groups() # [(group_id, protocol_type)]
group_ids = [g[0] for g in groups]
logger.info(f"Found {len(group_ids)} consumer groups: {group_ids}")
for gid in group_ids:
try:
# Add timeout protection for each group
offsets = self._admin.list_consumer_group_offsets(gid) # {TopicPartition: OffsetAndMetadata}
topics_for_group = sorted({tp.topic for tp in offsets.keys()})
logger.info(f"Group '{gid}' consumes topics: {topics_for_group}")
for topic in topics_for_group:
try:
lag = self.get_consumer_lag(gid, topic, print_output=False)
# Handle error case
if "error" in lag:
logger.warning(f"Failed to get lag for group='{gid}', topic='{topic}': {lag['error']}")
total = 0
else:
total = int(lag.get("total_lag", 0))
if total >= lag_crit:
lvl = "CRITICAL"
elif total >= lag_warn:
lvl = "WARN"
else:
lvl = "OK"
lag_summaries.append({
"group": gid,
"topic": topic,
"total_lag": total,
"level": lvl,
})
if lvl != "OK":
signals.append({
"level": lvl,
"code": "CONSUMER_LAG",
"message": f"High lag group='{gid}', topic='{topic}': {total}",
"data": {"group": gid, "topic": topic, "total_lag": total},
})
if rank(lvl) > rank(overall):
overall = lvl
except Exception as topic_e:
logger.warning(f"Failed to process lag for group='{gid}', topic='{topic}': {topic_e}")
lag_summaries.append({
"group": gid,
"topic": topic,
"total_lag": 0,
"level": "UNKNOWN",
"error": str(topic_e)
})
except Exception as group_e:
logger.warning(f"Failed to process consumer group '{gid}': {group_e}")
signals.append({
"level": "WARN",
"code": "GROUP_PROCESSING_FAILED",
"message": f"Could not process consumer group '{gid}': {group_e}",
"data": {"group": gid},
})
if rank("WARN") > rank(overall):
overall = "WARN"
except KafkaError as e:
signals.append({
"level": "WARN",
"code": "LAG_DISCOVERY_FAILED",
"message": f"Could not enumerate groups/offsets: {e}",
"data": {},
})
if rank("WARN") > rank(overall):
overall = "WARN"
except Exception as e:
logger.error(f"Unexpected error in lag discovery: {e}")
signals.append({
"level": "WARN",
"code": "LAG_DISCOVERY_ERROR",
"message": f"Unexpected error during lag discovery: {e}",
"data": {},
})
if rank("WARN") > rank(overall):
overall = "WARN"
return {
"status": overall,
"signals": signals,
"summary": {
"cluster_id": cid,
"broker_count": brokers,
"topics_count": topics_count,
"under_replicated": urp,
"offline_partitions": offline,
"single_broker": single_broker,
"lag": lag_summaries,
},
"timestamp": int(time.time() * 1000),
}
# Global Kafka inspector instance
kafka_inspector = None
# Global Temporal Memory instances
temporal_store: Optional[TemporalMemoryStore] = None
temporal_analyzer: Optional[TemporalAnalyzer] = None
# --- Temporal Memory Helper Functions ---
def log_temporal_event(
event_type: str,
severity: str,
code: str,
message: str,
data: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""
Log an event to temporal memory if store is initialized.
Args:
event_type: ALERT | METRIC_SNAPSHOT | ACTION | HEALTH_CHECK
severity: OK | WARN | CRITICAL | INFO
code: Event code
message: Human-readable message
data: Event-specific data
metadata: Optional metadata
"""
global temporal_store
if temporal_store is None:
return # Temporal memory not initialized
try:
event = TemporalEvent(
event_id=str(uuid.uuid4()),
event_type=event_type,
timestamp=int(time.time() * 1000),
severity=severity,
code=code,
message=message,
data=data,
metadata=metadata or {}
)
temporal_store.add_event(event)
except Exception as e:
logger.warning(f"Failed to log temporal event: {e}")
# --- MCP Tools ---
@mcp.tool
def initialize_kafka_connection(
bootstrap_servers: str,
client_id: str = "kafka-mcp-client",
request_timeout_ms: int = 45000,
api_version_auto_timeout_ms: int = 8000
) -> str:
"""
Initialize connection to Kafka cluster.
Args:
bootstrap_servers: Comma-separated list of Kafka broker addresses (e.g., "localhost:9092")
client_id: Client identifier for this connection
request_timeout_ms: Request timeout in milliseconds
api_version_auto_timeout_ms: API version auto-detection timeout in milliseconds
Returns:
Status message indicating success or failure
"""
global kafka_inspector, temporal_store, temporal_analyzer
try:
servers = [s.strip() for s in bootstrap_servers.split(',')]
kafka_inspector = KafkaInspector(
bootstrap_servers=servers,
client_id=client_id,
request_timeout_ms=request_timeout_ms,
api_version_auto_timeout_ms=api_version_auto_timeout_ms
)
# Initialize temporal memory store
temporal_store = TemporalMemoryStore(
retention_hours=TEMPORAL_RETENTION_HOURS,
max_events=TEMPORAL_MAX_EVENTS
)
temporal_analyzer = TemporalAnalyzer(temporal_store)
# Try to import previous state if auto-persist is enabled
if TEMPORAL_AUTO_PERSIST and Path(TEMPORAL_PERSIST_PATH).exists():
if temporal_store.import_state(TEMPORAL_PERSIST_PATH):
logger.info(f"Restored temporal state from {TEMPORAL_PERSIST_PATH}")
logger.info("Temporal memory initialized with trend tracking enabled")
return f"Successfully connected to Kafka cluster at {bootstrap_servers} (Temporal memory: ENABLED)"
except Exception as e:
return f"Failed to connect to Kafka cluster: {str(e)}"
@mcp.tool
def list_kafka_topics(include_internal: bool = False) -> str:
"""
List all topics in the Kafka cluster.
Args:
include_internal: Whether to include internal topics like __consumer_offsets
Returns:
JSON string containing list of topic names
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
topics = kafka_inspector.list_topics(include_internal=include_internal)
return json.dumps({
"topics": topics,
"count": len(topics),
"include_internal": include_internal
})
except Exception as e:
return json.dumps({"error": f"Failed to list topics: {str(e)}"})
@mcp.tool
def describe_kafka_topic(topic_name: str) -> str:
"""
Get detailed information about a specific Kafka topic.
Args:
topic_name: Name of the topic to describe
Returns:
JSON string containing topic metadata
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
topic_info = kafka_inspector.describe_topic(topic_name)
return json.dumps(topic_info, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to describe topic {topic_name}: {str(e)}"})
@mcp.tool
def create_kafka_topic(
topic_name: str,
num_partitions: int = 1,
replication_factor: int = 1,
topic_configs: str = "{}"
) -> str:
"""
Create a new Kafka topic.
Args:
topic_name: Name of the topic to create
num_partitions: Number of partitions for the topic
replication_factor: Replication factor for the topic
topic_configs: JSON string of topic configurations
Returns:
Status message indicating success or failure
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
configs = json.loads(topic_configs) if topic_configs != "{}" else None
success = kafka_inspector.create_topic(
topic_name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs=configs
)
if success:
return json.dumps({
"status": "success",
"message": f"Topic '{topic_name}' created successfully",
"topic_name": topic_name,
"num_partitions": num_partitions,
"replication_factor": replication_factor
})
else:
return json.dumps({
"status": "failed",
"message": f"Failed to create topic '{topic_name}'"
})
except Exception as e:
return json.dumps({"error": f"Failed to create topic: {str(e)}"})
@mcp.tool
def delete_kafka_topic(topic_name: str) -> str:
"""
Delete a Kafka topic.
Args:
topic_name: Name of the topic to delete
Returns:
Status message indicating success or failure
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
success = kafka_inspector.delete_topic(topic_name)
if success:
return json.dumps({
"status": "success",
"message": f"Topic '{topic_name}' deleted successfully"
})
else:
return json.dumps({
"status": "failed",
"message": f"Failed to delete topic '{topic_name}'"
})
except Exception as e:
return json.dumps({"error": f"Failed to delete topic: {str(e)}"})
@mcp.tool
def get_kafka_topic_config(topic_name: str) -> str:
"""
Get configuration settings for a specific Kafka topic.
Args:
topic_name: Name of the topic to get configuration for
Returns:
JSON string containing topic configuration
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
config = kafka_inspector.get_topic_config(topic_name)
# Handle empty config case
if not config:
return json.dumps({
"topic_name": topic_name,
"configuration": {},
"message": f"No configuration found for topic '{topic_name}' or topic does not exist"
}, indent=2)
return json.dumps({
"topic_name": topic_name,
"configuration": config,
"config_count": len(config)
}, indent=2)
except Exception as e:
logger.error(f"Error in get_kafka_topic_config: {str(e)}")
return json.dumps({
"error": f"Failed to get topic config: {str(e)}",
"topic_name": topic_name,
"suggestion": "Check if topic exists and you have proper permissions"
})
@mcp.tool
def kafka_health_check() -> str:
"""
Check the health status of the Kafka connection.
Returns:
JSON string containing health status information
"""
if kafka_inspector is None:
return json.dumps({
"status": "disconnected",
"message": "Kafka connection not initialized"
})
try:
# Try to list topics as a health check
topics = kafka_inspector.list_topics()
return json.dumps({
"status": "healthy",
"message": "Kafka connection is working",
"bootstrap_servers": kafka_inspector.bootstrap_servers,
"client_id": kafka_inspector.client_id,
"topics_count": len(topics)
})
except Exception as e:
return json.dumps({
"status": "unhealthy",
"message": f"Kafka connection error: {str(e)}"
})
@mcp.tool
def get_cluster_details() -> str:
"""
Get comprehensive cluster health and metadata information.
Returns:
JSON string containing detailed cluster information including broker count, topics, and partition health
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
cluster_info = kafka_inspector.get_cluster_details()
return json.dumps(cluster_info, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to get cluster details: {str(e)}"})
@mcp.tool
def get_consumer_lag(group_id: str, topic_name: str) -> str:
"""
Get consumer lag information for a specific consumer group and topic.
Args:
group_id: Consumer group ID to check
topic_name: Topic name to analyze lag for
Returns:
JSON string containing lag information per partition and total lag
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
lag_info = kafka_inspector.get_consumer_lag(group_id, topic_name, print_output=False)
return json.dumps(lag_info, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to get consumer lag: {str(e)}"})
@mcp.tool
def kafka_alert_summary() -> str:
"""
Get comprehensive cluster health alert summary with automatic threshold-based analysis.
Returns:
JSON string containing overall status (OK/WARN/CRITICAL), detected issues, and detailed metrics.
Automatically analyzes:
- Offline partitions
- Under-replicated partitions
- Consumer lag across all groups and topics
- Single broker warnings
"""
if kafka_inspector is None:
return json.dumps({
"status": "CRITICAL",
"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."
})
try:
alert_summary = kafka_inspector.alert_summary()
# Log to temporal memory
if temporal_store:
# Log overall health check
log_temporal_event(
event_type="HEALTH_CHECK",
severity=alert_summary.get("status", "OK"),
code="CLUSTER_HEALTH",
message=f"Cluster health check: {alert_summary.get('status')}",
data=alert_summary.get("summary", {})
)
# Log each signal as an alert
for signal in alert_summary.get("signals", []):
log_temporal_event(
event_type="ALERT",
severity=signal.get("level", "INFO"),
code=signal.get("code", "UNKNOWN"),
message=signal.get("message", ""),
data=signal.get("data", {})
)
# Log consumer lag as metrics
for lag_info in alert_summary.get("summary", {}).get("lag", []):
log_temporal_event(
event_type="METRIC_SNAPSHOT",
severity=lag_info.get("level", "OK"),
code=f"CONSUMER_LAG_{lag_info.get('group')}_{lag_info.get('topic')}",
message=f"Consumer lag for group '{lag_info.get('group')}' on topic '{lag_info.get('topic')}'",
data={"value": lag_info.get("total_lag", 0), "group": lag_info.get("group"), "topic": lag_info.get("topic")}
)
return json.dumps(alert_summary, indent=2)
except Exception as e:
return json.dumps({
"status": "CRITICAL",
"error": f"Failed to generate alert summary: {str(e)}"
})
@mcp.tool
def get_broker_resources(broker_id: str) -> str:
"""
Get resource information for a specific Kafka broker.
Args:
broker_id: Broker ID or hostname to get resource information for
Returns:
JSON string containing broker resource details (RAM, storage, cores)
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
# TODO: Implement actual resource monitoring integration
# This would typically connect(external sys) to:
# - Prometheus/Grafana for metrics
# - JMX for Kafka-specific metrics
# - System monitoring tools (htop, iostat, etc.)
# - Cloud provider APIs (AWS CloudWatch, etc.)
# - Kubernetes metrics if running in K8s
# For now, return hardcoded realistic values
broker_resources = {
"ram_gb": 32,
"storage_gb": 1000,
"cores": 8
}
return json.dumps({
"broker_id": broker_id,
"resources": broker_resources,
"status": "active",
"note": "Hardcoded values - integrate with monitoring system"
}, indent=2)
except Exception as e:
return json.dumps({
"error": f"Failed to get broker resources: {str(e)}",
"broker_id": broker_id
})
@mcp.tool
def send_email_notification(recipient_email: str, content: str, subject: str = "KafkaIQ Notification") -> str:
"""
Send email notification using Gmail SMTP.
Args:
recipient_email: Email address to send notification to
content: Email content/body
subject: Email subject line (optional)
Returns:
JSON string indicating email send status
"""
try:
# Validate inputs first
if not recipient_email or "@" not in recipient_email:
return json.dumps({
"status": "failed",
"error": "Invalid recipient email address"
})
if not content.strip():
return json.dumps({
"status": "failed",
"error": "Email content cannot be empty"
})
# Gmail SMTP settings
smtp_server = "smtp.gmail.com"
smtp_port = 587
# Email credentials or get from env
sender_email = os.getenv('KAFKAIQ_GMAIL_USER', 'DEFAULT_EMAIL')
sender_password = os.getenv('KAFKAIQ_GMAIL_PASSWORD', 'DEFAULT_PASSWORD')
# Create email message
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = recipient_email
message["Subject"] = subject
# Simple text email body
email_body = f"""KafkaIQ Notification
===================
{content}
---
Sent by KafkaIQ at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""
message.attach(MIMEText(email_body, "plain"))
# Send email via Gmail SMTP
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls() # Enable encryption
server.login(sender_email, sender_password)
# Send the email
text = message.as_string()
server.sendmail(sender_email, recipient_email, text)
server.quit()
return json.dumps({
"status": "sent",
"message": "Email sent successfully",
"recipient": recipient_email,
"subject": subject,
"timestamp": datetime.datetime.now().isoformat()
})
except smtplib.SMTPAuthenticationError:
return json.dumps({
"status": "failed",
"error": "Gmail authentication failed. Check email/password or enable 2FA and use App Password"
})
except Exception as e:
return json.dumps({
"status": "failed",
"error": f"Failed to send email: {str(e)}"
})
@mcp.tool
def broker_leadership_distribution(include_internal: bool = False) -> str:
"""
Return how many partitions each broker is the leader for.
Example Output:
{
"total_partitions": int,
"distribution": { broker_id: count },
"by_topic": { topic: { broker_id: [partitions...] } }
}
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
topics_meta = kafka_inspector._admin.describe_topics()
except Exception as e:
return json.dumps({"error": f"describe_topics failed: {e}"})
distribution = {}
by_topic = {}
total = 0
for t in topics_meta:
if not include_internal and t.get("is_internal", False):
continue
topic_name = t["topic"]
by_topic[topic_name] = {}
for p in t.get("partitions", []) or []:
leader = p.get("leader", -1)
if leader == -1:
continue # skip offline partitions
total += 1
distribution[leader] = distribution.get(leader, 0) + 1
by_topic[topic_name].setdefault(leader, []).append(p["partition"])
result = {
"total_partitions": total,
"distribution": distribution,
"by_topic": by_topic,
}
return json.dumps(result, indent=2)
@mcp.tool
def get_offline_partitions(include_internal: bool = False) -> str:
"""
Returns all offline partitions (leader == -1).
Example Output:
{
"offline_count": int,
"by_topic": { topic: [partition_ids...] }
}
"""
if kafka_inspector is None:
return json.dumps({"error": "Kafka connection not initialized. Please call initialize_kafka_connection first."})
try:
topics_meta = kafka_inspector._admin.describe_topics()
except Exception as e:
return json.dumps({"error": f"describe_topics failed: {e}"})
result = {"offline_count": 0, "by_topic": {}}
for t in topics_meta:
if not include_internal and t.get("is_internal", False):
continue
topic_name = t["topic"]
partitions = t.get("partitions", []) or []
for p in partitions:
if p.get("leader", -1) == -1:
result["offline_count"] += 1
result["by_topic"].setdefault(topic_name, []).append(p["partition"])
return json.dumps(result, indent=2)
# --- Temporal Memory MCP Tools ---
@mcp.tool
def get_temporal_health_summary(time_window_hours: int = 1) -> str:
"""
Get health status trend over a specified time window.
Args:
time_window_hours: Time window in hours (default: 1)
Returns:
JSON string with health trend analysis including status changes and alert frequencies
"""
if temporal_store is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
now = int(time.time() * 1000)
start_time = now - (time_window_hours * 3600 * 1000)
# Get all health check events
health_events = temporal_store.get_events(
start_time=start_time,
event_type="HEALTH_CHECK",
code="CLUSTER_HEALTH"
)
# Get all alerts in window
alerts = temporal_store.get_events(
start_time=start_time,
event_type="ALERT"
)
# Analyze status changes
status_changes = []
if len(health_events) >= 2:
prev_status = None
for event in reversed(health_events): # Oldest to newest
current_status = event.severity
if prev_status and prev_status != current_status:
status_changes.append({
"from": prev_status,
"to": current_status,
"timestamp": event.timestamp
})
prev_status = current_status
# Count alert frequencies
alert_freq = {}
for alert in alerts:
code = alert.code
alert_freq[code] = alert_freq.get(code, 0) + 1
# Current vs historical comparison
current_status = health_events[0].severity if health_events else "UNKNOWN"
return json.dumps({
"time_window_hours": time_window_hours,
"health_checks_count": len(health_events),
"current_status": current_status,
"status_changes": status_changes,
"status_change_count": len(status_changes),
"alerts_count": len(alerts),
"alert_frequencies": alert_freq,
"top_alerts": sorted(alert_freq.items(), key=lambda x: x[1], reverse=True)[:5]
}, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to generate temporal health summary: {str(e)}"})
@mcp.tool
def get_recurring_issues(min_occurrences: int = 3, time_window_hours: int = 1) -> str:
"""
Identify issues that occurred multiple times within a time window.
Args:
min_occurrences: Minimum number of occurrences to be considered recurring (default: 3)
time_window_hours: Time window in hours (default: 1)
Returns:
JSON string with list of recurring issues and their statistics
"""
if temporal_store is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
recurring = temporal_store.get_recurring_issues(
threshold=min_occurrences,
time_window_hours=time_window_hours
)
return json.dumps({
"time_window_hours": time_window_hours,
"min_occurrences": min_occurrences,
"recurring_issues_count": len(recurring),
"issues": recurring
}, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to detect recurring issues: {str(e)}"})
@mcp.tool
def get_issue_frequency(issue_code: str, time_window_hours: int = 24) -> str:
"""
Count occurrences of a specific issue over time with hourly breakdown.
Args:
issue_code: Code of the issue to track (e.g., "CONSUMER_LAG", "OFFLINE_PARTITIONS")
time_window_hours: Time window in hours (default: 24)
Returns:
JSON string with frequency count and hourly breakdown
"""
if temporal_store is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
now = int(time.time() * 1000)
start_time = now - (time_window_hours * 3600 * 1000)
events = temporal_store.get_events(
start_time=start_time,
code=issue_code
)
# Hourly breakdown
hourly = {}
for event in events:
hour = event.timestamp // (3600 * 1000)
hourly[hour] = hourly.get(hour, 0) + 1
return json.dumps({
"issue_code": issue_code,
"time_window_hours": time_window_hours,
"total_occurrences": len(events),
"hourly_breakdown": hourly,
"first_occurrence": events[-1].timestamp if events else None,
"last_occurrence": events[0].timestamp if events else None
}, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to get issue frequency: {str(e)}"})
@mcp.tool
def get_lag_trend_analysis(group_id: str, topic_name: str, time_window_hours: int = 6) -> str:
"""
Analyze consumer lag trend for a specific group/topic over time.
Args:
group_id: Consumer group ID
topic_name: Topic name
time_window_hours: Time window for analysis (default: 6)
Returns:
JSON string with trend analysis (min, max, avg, current, direction)
"""
if temporal_store is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
code = f"CONSUMER_LAG_{group_id}_{topic_name}"
trend = temporal_store.get_trend_analysis(code, time_window_hours)
# Add spike detection if analyzer is available
if temporal_analyzer:
spikes = temporal_analyzer.detect_lag_spikes(
group_id, topic_name, time_window_hours
)
trend["spike_analysis"] = spikes
return json.dumps(trend, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to analyze lag trend: {str(e)}"})
@mcp.tool
def get_partition_health_history(topic_name: str, time_window_hours: int = 12) -> str:
"""
Track partition online/offline transitions and identify flapping partitions.
Args:
topic_name: Topic name to analyze
time_window_hours: Time window in hours (default: 12)
Returns:
JSON string with partition health history and flapping detection
"""
if temporal_store is None or temporal_analyzer is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
# Detect flapping partitions
flapping = temporal_analyzer.detect_partition_flapping(time_window_hours)
# Filter for specific topic
topic_flapping = [p for p in flapping if p["topic"] == topic_name]
return json.dumps({
"topic_name": topic_name,
"time_window_hours": time_window_hours,
"flapping_partitions_count": len(topic_flapping),
"flapping_partitions": topic_flapping
}, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to get partition health history: {str(e)}"})
@mcp.tool
def detect_transient_vs_persistent(issue_code: str, persistence_threshold_minutes: int = 30) -> str:
"""
Classify if an issue is transient (short-lived) or persistent (long-lasting).
Args:
issue_code: Code of the issue to classify
persistence_threshold_minutes: Duration threshold in minutes (default: 30)
Returns:
JSON string with classification and confidence score
"""
if temporal_store is None or temporal_analyzer is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
classification = temporal_analyzer.is_transient_vs_persistent(
issue_code, persistence_threshold_minutes
)
return json.dumps(classification, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to classify issue: {str(e)}"})
@mcp.tool
def export_temporal_state(filepath: str = "temporal_state.json") -> str:
"""
Export temporal memory to a JSON file for backup or analysis.
Args:
filepath: Path to save the export file (default: temporal_state.json)
Returns:
JSON string with export status
"""
if temporal_store is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
success = temporal_store.export_state(filepath)
if success:
stats = temporal_store.get_statistics()
return json.dumps({
"status": "success",
"message": f"Exported temporal state to {filepath}",
"filepath": filepath,
"events_exported": stats["total_events"]
}, indent=2)
else:
return json.dumps({
"status": "failed",
"message": "Failed to export temporal state"
})
except Exception as e:
return json.dumps({"error": f"Failed to export state: {str(e)}"})
@mcp.tool
def import_temporal_state(filepath: str = "temporal_state.json") -> str:
"""
Import previously exported temporal memory from a JSON file.
Args:
filepath: Path to the export file to import (default: temporal_state.json)
Returns:
JSON string with import status
"""
if temporal_store is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
success = temporal_store.import_state(filepath)
if success:
stats = temporal_store.get_statistics()
return json.dumps({
"status": "success",
"message": f"Imported temporal state from {filepath}",
"filepath": filepath,
"events_imported": stats["total_events"]
}, indent=2)
else:
return json.dumps({
"status": "failed",
"message": f"Failed to import temporal state from {filepath}"
})
except Exception as e:
return json.dumps({"error": f"Failed to import state: {str(e)}"})
@mcp.tool
def get_temporal_statistics() -> str:
"""
Get statistics about the temporal memory store.
Returns:
JSON string with memory statistics
"""
if temporal_store is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
stats = temporal_store.get_statistics()
return json.dumps(stats, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to get statistics: {str(e)}"})
@mcp.tool
def clear_temporal_memory(confirm: bool = False) -> str:
"""
Clear all temporal memory (requires confirmation for safety).
Args:
confirm: Must be True to actually clear memory
Returns:
JSON string with clear status
"""
if temporal_store is None:
return json.dumps({"error": "Temporal memory not initialized. Please call initialize_kafka_connection first."})
try:
if not confirm:
return json.dumps({
"status": "cancelled",
"message": "Clear operation cancelled. Set confirm=True to proceed."
})
success = temporal_store.clear(confirm=True)
if success:
return json.dumps({
"status": "success",
"message": "Temporal memory cleared successfully"
})
else:
return json.dumps({
"status": "failed",
"message": "Failed to clear temporal memory"
})
except Exception as e:
return json.dumps({"error": f"Failed to clear memory: {str(e)}"})
# --- Main entrypoint ---
def main():
try:
if len(sys.argv) > 1 and sys.argv[1] == "http":
logger.info("Starting Kafka MCP server in HTTP mode...")
mcp.run(transport="streamable-http", host="127.0.0.1", port=8080)
else:
# Default to stdio for Claude Desktop integration
logger.info("Starting Kafka MCP server in stdio mode...")
mcp.run(transport="stdio")
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()