# KafkaIQ π§ β‘

**AI-Powered Kafka Management via Model Control Protocol (MCP)**
KafkaIQ is an intelligent Kafka management tool that exposes Apache Kafka operations through the Model Control Protocol (MCP), enabling AI assistants to seamlessly interact with and manage Kafka clusters through natural language commands.
## π Overview
Transform your Kafka operations with the power of AI. KafkaIQ acts as an intelligent middleware layer between AI systems and Kafka infrastructure, translating natural language requests into precise Kafka operations.
```
AI Assistant β MCP Protocol β KafkaIQ β Apache Kafka Cluster
```
## β¨ Features
- **π€ AI-Native Integration**: Built specifically for AI assistants using MCP protocol
- **β‘ Real-time Operations**: Instant topic management, configuration, and monitoring
- **π Intelligent Inspection**: Smart cluster analysis and health monitoring
- **π Advanced Analytics**: Consumer lag analysis and partition leadership tracking
- **π‘οΈ Proactive Alerting**: Automated health checks with CRITICAL/WARN/OK status
- **οΏ½ Temporal Memory**: Track alerts and trends over time, detect recurring issues (NEW!)
- **π Trend Analysis**: Distinguish transient vs persistent failures without ML
- **οΏ½π§ Email Notifications**: Built-in Gmail SMTP integration for alerts
- **π HTTP API**: RESTful streamable HTTP interface for easy integration
## π οΈ Available Tools
### Core Kafka Operations
| Tool | Description | Example Usage |
|------|-------------|---------------|
| `initialize_kafka_connection` | Connect to Kafka cluster | Connect to localhost:9092 |
| `list_kafka_topics` | List all topics | Show me all topics |
| `describe_kafka_topic` | Get topic details | Describe the 'orders' topic |
| `create_kafka_topic` | Create new topic | Create topic 'events' with 3 partitions |
| `delete_kafka_topic` | Delete topic | Remove the 'old-logs' topic |
| `get_kafka_topic_config` | Get topic configuration | Show config for 'user-data' |
| `kafka_health_check` | Check connection health | Is Kafka healthy? |
| `get_cluster_details` | Get cluster information | Show cluster overview |
| `get_consumer_lag` | Analyze consumer lag | Check lag for group 'processors' |
| `kafka_alert_summary` | Intelligent health analysis | Give me cluster health status |
| `broker_leadership_distribution` | Partition leadership analysis | Show broker partition distribution |
| `get_offline_partitions` | Find offline partitions | List all offline partitions |
| `get_broker_resources` | Get broker resources | Show resources for broker-1 |
| `send_email_notification` | Send email alerts | Email cluster status to admin |
### Temporal Intelligence Tools (NEW! π)
| Tool | Description | Example Usage |
|------|-------------|---------------|
| `get_temporal_health_summary` | Health trend analysis over time | Show health changes in last hour |
| `get_recurring_issues` | Detect repeated problems | Find issues that occurred 5+ times |
| `get_issue_frequency` | Count issue occurrences | How many lag spikes today? |
| `get_lag_trend_analysis` | Analyze consumer lag trends | Is lag increasing or decreasing? |
| `get_partition_health_history` | Track partition flapping | Which partitions keep going offline? |
| `detect_transient_vs_persistent` | Classify failure types | Is this issue temporary or ongoing? |
| `export_temporal_state` | Save temporal memory | Backup temporal data |
| `import_temporal_state` | Restore temporal memory | Load previous temporal state |
| `get_temporal_statistics` | Memory store statistics | Show temporal memory usage |
| `clear_temporal_memory` | Clear all temporal data | Reset temporal tracking |
## π¦ Installation
### Prerequisites
- Python 3.8+
- Apache Kafka cluster
- Gmail account (for email notifications)
### Setup
```bash
# Clone the repository
git clone https://github.com/ashfaqbs/kafkaiq.git
cd kafkaiq
# Install dependencies
pip install fastmcp kafka-python
# Set email credentials (optional)
export KAFKAIQ_GMAIL_USER="your-email@gmail.com"
export KAFKAIQ_GMAIL_PASSWORD="your-app-password"
# Run the server
python kafka_mcp_server.py
```
## π Quick Start
### 1. Start KafkaIQ Server
```bash
python kafka_mcp_server.py
```
Server starts on `http://127.0.0.1:8080`
### 2. Connect to Your AI Assistant
Configure your AI assistant to use the MCP endpoint at `http://127.0.0.1:8080`
### 3. Start Managing Kafka with Natural Language!
## π¬ Example Conversations
### Basic Operations
**Connection Setup:**
```
Human: Connect to my local Kafka cluster server : localhost:9092
AI β KafkaIQ: initialize_kafka_connection("localhost:9092")
KafkaIQ Response: "Successfully connected to Kafka cluster at localhost:9092"
AI β Human: "Connected to your Kafka cluster successfully!"
```
**Topic Management:**
```
Human: What topics exist in my cluster?
AI β KafkaIQ: list_kafka_topics()
KafkaIQ Response: {
"topics": ["orders", "users", "events", "payments"],
"count": 4,
"include_internal": false
}
AI β Human: "Your cluster has 4 topics: orders, users, events, and payments."
```
**Create New Topic:**
```
Human: Create a topic called 'notifications' with 5 partitions
AI β KafkaIQ: create_kafka_topic("notifications", 5, 1)
KafkaIQ Response: {
"status": "success",
"message": "Topic 'notifications' created successfully",
"topic_name": "notifications",
"num_partitions": 5,
"replication_factor": 1
}
AI β Human: "Successfully created topic 'notifications' with 5 partitions!"
```
### Health Monitoring
**Cluster Health Check:**
```
Human: How is my Kafka cluster doing?
AI β KafkaIQ: kafka_alert_summary()
KafkaIQ Response: {
"status": "OK",
"signals": [],
"summary": {
"cluster_id": "kafka-cluster-1",
"broker_count": 3,
"topics_count": 15,
"under_replicated": 0,
"offline_partitions": 0,
"single_broker": false,
"lag": [
{"group": "processors", "topic": "orders", "total_lag": 50, "level": "OK"},
{"group": "analytics", "topic": "events", "total_lag": 1250, "level": "OK"}
]
}
}
AI β Human: "Your Kafka cluster is healthy! 3 brokers running, 15 topics, no offline partitions or under-replicated partitions. Consumer lag looks good across all groups."
```
**Consumer Lag Analysis:**
```
Human: Check consumer lag for the 'analytics' group on 'events' topic
AI β KafkaIQ: get_consumer_lag("analytics", "events")
KafkaIQ Response: {
"group_id": "analytics",
"topic": "events",
"total_lag": 1250,
"partitions": [
{"partition": 0, "consumer_offset": 10000, "high_water_mark": 10500, "lag": 500},
{"partition": 1, "consumer_offset": 8000, "high_water_mark": 8750, "lag": 750}
]
}
AI β Human: "The 'analytics' group has a total lag of 1,250 messages on the 'events' topic. Partition 0 has 500 messages lag, partition 1 has 750 messages lag."
```
### Advanced Analytics
**Partition Leadership:**
```
Human: Show me how partitions are distributed across brokers
AI β KafkaIQ: broker_leadership_distribution()
KafkaIQ Response: {
"total_partitions": 45,
"distribution": {
"1": 15,
"2": 15,
"3": 15
},
"by_topic": {
"orders": {"1": [0, 3], "2": [1, 4], "3": [2, 5]},
"events": {"1": [0, 2], "2": [1, 3], "3": [4, 5]}
}
}
AI β Human: "Partition leadership is well-balanced! Each of your 3 brokers leads exactly 15 partitions out of 45 total. The distribution looks optimal."
```
**Offline Partitions Check:**
```
Human: Are there any offline partitions?
AI β KafkaIQ: get_offline_partitions()
KafkaIQ Response: {
"offline_count": 0,
"by_topic": {}
}
AI β Human: "Great news! There are no offline partitions in your cluster. All partitions have active leaders."
```
### Resource Monitoring
**Broker Resources:**
```
Human: What are the resources for broker-1?
AI β KafkaIQ: get_broker_resources("broker-1")
KafkaIQ Response: {
"broker_id": "broker-1",
"resources": {
"ram_gb": 32,
"storage_gb": 1000,
"cores": 8
},
"status": "active",
"note": "Hardcoded values - integrate with monitoring system"
}
AI β Human: "Broker-1 has 32GB RAM, 1TB storage, and 8 CPU cores. Status is active."
```
### Email Notifications
**Send Alert:**
```
Human: Email the cluster status to admin@company.com
AI β KafkaIQ: send_email_notification("admin@company.com", "Cluster Status: OK\nBrokers: 3\nTopics: 15\nNo issues detected", "KafkaIQ Daily Report")
KafkaIQ Response: {
"status": "sent",
"message": "Email sent successfully",
"recipient": "admin@company.com",
"subject": "KafkaIQ Daily Report",
"timestamp": "2025-09-28T10:30:00"
}
AI β Human: "Successfully sent the cluster status to admin@company.com!"
```
### Temporal Intelligence (NEW! π)
**Trend-Aware Health Analysis:**
```
Human: Has the cluster health changed in the last 2 hours?
AI β KafkaIQ: get_temporal_health_summary(2)
KafkaIQ Response: {
"time_window_hours": 2,
"health_checks_count": 24,
"current_status": "WARN",
"status_changes": [
{"from": "OK", "to": "WARN", "timestamp": 1703856000000}
],
"status_change_count": 1,
"alerts_count": 5,
"alert_frequencies": {
"CONSUMER_LAG": 5
}
}
AI β Human: "Your cluster status changed from OK to WARN 2 hours ago. There have been 5 consumer lag alerts during this period."
```
**Recurring Issue Detection:**
```
Human: What issues keep happening repeatedly?
AI β KafkaIQ: get_recurring_issues(3, 6)
KafkaIQ Response: {
"time_window_hours": 6,
"min_occurrences": 3,
"recurring_issues_count": 2,
"issues": [
{
"code": "CONSUMER_LAG",
"count": 8,
"first_seen": 1703835000000,
"last_seen": 1703856000000,
"severity_distribution": {"WARN": 6, "CRITICAL": 2},
"sample_message": "High lag group='analytics', topic='events': 15000"
},
{
"code": "UNDER_REPLICATED",
"count": 4,
"first_seen": 1703840000000,
"last_seen": 1703855000000,
"severity_distribution": {"WARN": 4}
}
]
}
AI β Human: "Found 2 recurring issues: Consumer lag occurred 8 times (6 WARN, 2 CRITICAL) and under-replication occurred 4 times in the last 6 hours. The analytics group on the events topic has been experiencing lag."
```
**Lag Trend Analysis:**
```
Human: Is consumer lag for the analytics group getting worse?
AI β KafkaIQ: get_lag_trend_analysis("analytics", "events", 6)
KafkaIQ Response: {
"code": "CONSUMER_LAG_analytics_events",
"sample_count": 12,
"min": 500,
"max": 15000,
"avg": 7250,
"current": 12000,
"trend": "increasing",
"time_window_hours": 6,
"spike_analysis": {
"spikes_detected": 3,
"average_lag": 7250,
"spike_threshold": 14500
}
}
AI β Human: "Yes, consumer lag for analytics/events is INCREASING. It ranges from 500 to 15,000 messages (avg: 7,250). Current lag is 12,000 messages. Detected 3 lag spikes above the threshold of 14,500 messages."
```
**Transient vs Persistent Classification:**
```
Human: Is this consumer lag issue a temporary spike or a persistent problem?
AI β KafkaIQ: detect_transient_vs_persistent("CONSUMER_LAG", 30)
KafkaIQ Response: {
"code": "CONSUMER_LAG",
"classification": "persistent",
"confidence": 0.85,
"occurrences": 8,
"duration_minutes": 350,
"avg_interval_minutes": 50,
"first_seen": 1703835000000,
"last_seen": 1703856000000
}
AI β Human: "This appears to be a PERSISTENT issue with 85% confidence. It has occurred 8 times over 350 minutes with an average interval of 50 minutes between occurrences. This is not a transient spike - it requires investigation."
```
**Partition Flapping Detection:**
```
Human: Are any partitions repeatedly going offline?
AI β KafkaIQ: get_partition_health_history("events", 12)
KafkaIQ Response: {
"topic_name": "events",
"time_window_hours": 12,
"flapping_partitions_count": 1,
"flapping_partitions": [
{
"topic": "events",
"partition": 2,
"offline_count": 5,
"first_offline": 1703820000000,
"last_offline": 1703856000000
}
]
}
AI β Human: "Yes! Partition 2 on the 'events' topic has gone offline 5 times in the last 12 hours. This is a flapping partition that needs attention - it may indicate a problematic broker or disk issue."
```
## π― Use Cases
### DevOps & Infrastructure Management
- **Automated Monitoring**: "Alert me if any partitions go offline"
- **Health Dashboards**: "Generate daily cluster health reports"
- **Incident Response**: "What's causing the high consumer lag?"
- **Capacity Planning**: "Show me broker resource utilization"
### Data Engineering
- **Pipeline Management**: "Create topics for new data streams"
- **Performance Tuning**: "Which consumers are lagging behind?"
- **Data Quality**: "Monitor topic configurations for compliance"
- **Stream Processing**: "Set up topics for real-time analytics"
### Development Workflows
- **Environment Setup**: "Create development topics with proper configs"
- **Testing**: "Clean up test topics after integration tests"
- **Debugging**: "Why are messages not being consumed?"
- **Configuration Management**: "Backup topic configurations"
### Business Intelligence
- **Operational Insights**: "Generate weekly Kafka usage reports"
- **Cost Optimization**: "Identify underutilized topics"
- **SLA Monitoring**: "Track consumer lag against SLAs"
- **Trend Analysis**: "Show partition distribution trends"
## ποΈ Architecture
### Core Components
```
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β AI Assistant β β KafkaIQ β β Kafka Cluster β
β βββββΊβ MCP Server βββββΊβ β
β (Claude, GPT) β β (Port 8080) β β (Brokers) β
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β β β
β β β
βΌ βΌ βΌ
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β Natural Languageβ β Tool Execution β β Operations β
β Commands β β & Response β β & Monitoring β
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
```
### Key Technologies
- **FastMCP**: Model Control Protocol implementation
- **kafka-python**: Kafka client library
- **Gmail SMTP**: Email notification system
- **JSON**: Structured data exchange
- **HTTP/REST**: Communication protocol
## π§ Configuration
### Environment Variables
```bash
# Email Configuration (Optional)
KAFKAIQ_GMAIL_USER=your-email@gmail.com
KAFKAIQ_GMAIL_PASSWORD=your-app-password
# Temporal Memory Configuration
KAFKAIQ_TEMPORAL_RETENTION_HOURS=24 # How long to keep events (default: 24 hours)
KAFKAIQ_TEMPORAL_MAX_EVENTS=100000 # Max events to store (default: 100,000)
KAFKAIQ_TEMPORAL_AUTO_PERSIST=true # Auto-save state on exit (default: true)
KAFKAIQ_TEMPORAL_PERSIST_PATH=temporal_state.json # State file path
```
### Alert Thresholds
Customize alert thresholds in `kafka_inspector.alert_summary()`:
```python
# Built-in thresholds (edit as needed)
offline_crit = 1 # Any offline partition = CRITICAL
urp_warn, urp_crit = 1, 5 # Under-replicated partitions
lag_warn, lag_crit = 10_000, 100_000 # Consumer lag thresholds
single_broker_warn = True # Warn on single broker setup
```
### Temporal Memory Tuning
**Retention Policy:**
- Default: 24 hours of event history
- Increase for longer trend analysis
- Decrease to reduce memory usage
**Memory Considerations:**
- ~100 bytes per event
- Default config: ~10MB for 100k events
- High-frequency monitoring (5-second intervals): ~1,700 events/hour
- Estimated 24h memory: ~4MB (very reasonable)
**Recommended Settings:**
```bash
# Production (high-traffic cluster)
KAFKAIQ_TEMPORAL_RETENTION_HOURS=48
KAFKAIQ_TEMPORAL_MAX_EVENTS=200000
# Development (local testing)
KAFKAIQ_TEMPORAL_RETENTION_HOURS=6
KAFKAIQ_TEMPORAL_MAX_EVENTS=10000
```
## π Security
### Authentication
- **Kafka**: Supports SASL/PLAIN, SASL/SCRAM, SSL/TLS
- **Email**: Gmail App Passwords (2FA required)
- **MCP**: HTTP-based with potential for API key integration
### Best Practices
- Use environment variables for credentials
- Enable Kafka SSL/TLS in production
- Implement network-level access controls
- Regular credential rotation
- Monitor access logs
## π Monitoring & Observability
### Built-in Metrics
- Cluster health status (OK/WARN/CRITICAL)
- Partition leadership distribution
- Consumer lag across all groups
- Offline partition detection
- Under-replicated partition monitoring
### Integration Points
- **Prometheus**: Metrics export (future enhancement)
- **Grafana**: Dashboard integration
- **Slack/Teams**: Alert forwarding
- **PagerDuty**: Incident management
- **SIEM**: Security event logging
## π€ AI Assistant Integration
### Supported Platforms
- **Claude** (Anthropic) - Primary development platform
- **ChatGPT** (OpenAI) - Compatible with MCP
- **Custom AI Systems** - Any MCP-compatible assistant
### Integration Steps
1. Start KafkaIQ server
2. Configure AI assistant with MCP endpoint
3. Test connection with basic commands
4. Set up monitoring and alerting workflows
## π Advanced Usage
### Batch Operations
```
Human: Create 5 topics for microservices: user-service, order-service, payment-service, notification-service, audit-service
AI: I'll create all 5 topics for your microservices architecture...
```
### Automated Workflows
```
Human: Set up monitoring that emails me if any consumer lag exceeds 50,000 messages
AI: I'll configure automated monitoring with email alerts for high consumer lag...
```
### Custom Configurations
```python
# Custom topic configuration example
topic_configs = {
"retention.ms": "604800000", # 7 days
"compression.type": "lz4",
"max.message.bytes": "1048576" # 1MB
}
```
## π Troubleshooting
### Common Issues
**Connection Failed**
```bash
Error: "Failed to connect to Kafka cluster"
Solution: Check if Kafka is running on specified port
```
**Email Not Sending**
```bash
Error: "Gmail authentication failed"
Solution: Enable 2FA and generate App Password
```
**Consumer Lag Timeout**
```bash
Error: "Consumer lag check timed out"
Solution: Increase timeout in KafkaConsumer config
```
**Permission Denied**
```bash
Error: "Not authorized to perform operation"
Solution: Check Kafka ACLs and user permissions
```
## π€ Contributing
We welcome contributions!
### Development Setup
```bash
# Clone and setup development environment
git clone https://github.com/yourusername/kafkaiq.git
cd kafkaiq
# Create virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
# or
venv\Scripts\activate # Windows
# Install development dependencies
pip install -r requirements.txt
# Run tests
python -m pytest tests/
# Start development server
python kafka_mcp_server.py
```
### Contribution Areas
- π Bug fixes and stability improvements
- β¨ New tool implementations
- π Documentation and examples
- π§ͺ Test coverage expansion
- π§ Performance optimizations
## π License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## π Acknowledgments
- **FastMCP Team**: For the excellent MCP framework
- **Kafka Community**: For the robust kafka-python library
- **Anthropic**: For Claude and MCP protocol development
## π Support & Community
- π **Issues**: [GitHub Issues](https://github.com/ashfaqbs/kafkaiq/issues)
- π‘ **Discussions**: [GitHub Discussions](https://github.com/ashfaqbs/kafkaiq/discussions)
## π·οΈ Tags
`kafka` `ai` `mcp` `devops` `monitoring` `alerts` `data-engineering` `stream-processing` `automation` `python` `fastmcp` `claude` `chatgpt` `infrastructure` `temporal-memory` `trend-analysis` `aiops` `observability` `time-series` `anomaly-detection`
## π Academic Framing
**Research Contribution:**
> "We augment tool-based MCP interaction with temporal operational memory, enabling trend-aware health assessment and longitudinal system reasoning without machine learning."
**Key Novelties:**
1. **Stateful MCP Architecture**: First MCP implementation with temporal state tracking
2. **Trend-Aware Reasoning**: Distinguish transient vs persistent failures using time-series analysis
3. **Zero-ML Anomaly Detection**: Recurring issue detection without machine learning models
4. **Operational Memory**: Lightweight temporal store optimized for observability workflows
**Applicable Research Areas:**
- AIOps (AI for IT Operations)
- Systems Conferences (SOSP, OSDI, NSDI)
- Observability Research
- Intelligent System Management
**Citation Example:**
```
@software{kafkaiq2025,
title={KafkaIQ: Temporal Operational Memory for MCP-based Kafka Management},
author={Your Name},
year={2025},
description={Stateful MCP server with trend-aware health assessment}
}
```
---
**Made with β€οΈ for the AI and Data Engineering communities**
*Transform your Kafka operations with the power of AI - Experience KafkaIQ today!*