KafkaIQ π§ β‘

AI-Powered Kafka Management via Model Control Protocol (MCP)
KafkaIQ is an intelligent Kafka management tool that exposes Apache Kafka operations through the Model Control Protocol (MCP), enabling AI assistants to seamlessly interact with and manage Kafka clusters through natural language commands.
π Overview
Transform your Kafka operations with the power of AI. KafkaIQ acts as an intelligent middleware layer between AI systems and Kafka infrastructure, translating natural language requests into precise Kafka operations.
AI Assistant β MCP Protocol β KafkaIQ β Apache Kafka Cluster
β¨ Features
π€ AI-Native Integration: Built specifically for AI assistants using MCP protocol
β‘ Real-time Operations: Instant topic management, configuration, and monitoring
π Intelligent Inspection: Smart cluster analysis and health monitoring
π Advanced Analytics: Consumer lag analysis and partition leadership tracking
π‘οΈ Proactive Alerting: Automated health checks with CRITICAL/WARN/OK status
οΏ½ Temporal Memory: Track alerts and trends over time, detect recurring issues (NEW!)
π Trend Analysis: Distinguish transient vs persistent failures without ML
οΏ½π§ Email Notifications: Built-in Gmail SMTP integration for alerts
π HTTP API: RESTful streamable HTTP interface for easy integration
π οΈ Available Tools
Core Kafka Operations
Tool | Description | Example Usage |
initialize_kafka_connection
| Connect to Kafka cluster | Connect to localhost:9092 |
list_kafka_topics
| List all topics | Show me all topics |
describe_kafka_topic
| Get topic details | Describe the 'orders' topic |
create_kafka_topic
| Create new topic | Create topic 'events' with 3 partitions |
delete_kafka_topic
| Delete topic | Remove the 'old-logs' topic |
get_kafka_topic_config
| Get topic configuration | Show config for 'user-data' |
kafka_health_check
| Check connection health | Is Kafka healthy? |
get_cluster_details
| Get cluster information | Show cluster overview |
get_consumer_lag
| Analyze consumer lag | Check lag for group 'processors' |
kafka_alert_summary
| Intelligent health analysis | Give me cluster health status |
broker_leadership_distribution
| Partition leadership analysis | Show broker partition distribution |
get_offline_partitions
| Find offline partitions | List all offline partitions |
get_broker_resources
| Get broker resources | Show resources for broker-1 |
send_email_notification
| Send email alerts | Email cluster status to admin |
Temporal Intelligence Tools (NEW! π)
Tool | Description | Example Usage |
get_temporal_health_summary
| Health trend analysis over time | Show health changes in last hour |
get_recurring_issues
| Detect repeated problems | Find issues that occurred 5+ times |
get_issue_frequency
| Count issue occurrences | How many lag spikes today? |
get_lag_trend_analysis
| Analyze consumer lag trends | Is lag increasing or decreasing? |
get_partition_health_history
| Track partition flapping | Which partitions keep going offline? |
detect_transient_vs_persistent
| Classify failure types | Is this issue temporary or ongoing? |
export_temporal_state
| Save temporal memory | Backup temporal data |
import_temporal_state
| Restore temporal memory | Load previous temporal state |
get_temporal_statistics
| Memory store statistics | Show temporal memory usage |
clear_temporal_memory
| Clear all temporal data | Reset temporal tracking |
π¦ Installation
Prerequisites
Setup
# Clone the repository
git clone https://github.com/ashfaqbs/kafkaiq.git
cd kafkaiq
# Install dependencies
pip install fastmcp kafka-python
# Set email credentials (optional)
export KAFKAIQ_GMAIL_USER="your-email@gmail.com"
export KAFKAIQ_GMAIL_PASSWORD="your-app-password"
# Run the server
python kafka_mcp_server.py
π Quick Start
1. Start KafkaIQ Server
python kafka_mcp_server.py
Server starts on http://127.0.0.1:8080
2. Connect to Your AI Assistant
Configure your AI assistant to use the MCP endpoint at http://127.0.0.1:8080
3. Start Managing Kafka with Natural Language!
π¬ Example Conversations
Basic Operations
Connection Setup:
Human: Connect to my local Kafka cluster server : localhost:9092
AI β KafkaIQ: initialize_kafka_connection("localhost:9092")
KafkaIQ Response: "Successfully connected to Kafka cluster at localhost:9092"
AI β Human: "Connected to your Kafka cluster successfully!"
Topic Management:
Human: What topics exist in my cluster?
AI β KafkaIQ: list_kafka_topics()
KafkaIQ Response: {
"topics": ["orders", "users", "events", "payments"],
"count": 4,
"include_internal": false
}
AI β Human: "Your cluster has 4 topics: orders, users, events, and payments."
Create New Topic:
Human: Create a topic called 'notifications' with 5 partitions
AI β KafkaIQ: create_kafka_topic("notifications", 5, 1)
KafkaIQ Response: {
"status": "success",
"message": "Topic 'notifications' created successfully",
"topic_name": "notifications",
"num_partitions": 5,
"replication_factor": 1
}
AI β Human: "Successfully created topic 'notifications' with 5 partitions!"
Health Monitoring
Cluster Health Check:
Human: How is my Kafka cluster doing?
AI β KafkaIQ: kafka_alert_summary()
KafkaIQ Response: {
"status": "OK",
"signals": [],
"summary": {
"cluster_id": "kafka-cluster-1",
"broker_count": 3,
"topics_count": 15,
"under_replicated": 0,
"offline_partitions": 0,
"single_broker": false,
"lag": [
{"group": "processors", "topic": "orders", "total_lag": 50, "level": "OK"},
{"group": "analytics", "topic": "events", "total_lag": 1250, "level": "OK"}
]
}
}
AI β Human: "Your Kafka cluster is healthy! 3 brokers running, 15 topics, no offline partitions or under-replicated partitions. Consumer lag looks good across all groups."
Consumer Lag Analysis:
Human: Check consumer lag for the 'analytics' group on 'events' topic
AI β KafkaIQ: get_consumer_lag("analytics", "events")
KafkaIQ Response: {
"group_id": "analytics",
"topic": "events",
"total_lag": 1250,
"partitions": [
{"partition": 0, "consumer_offset": 10000, "high_water_mark": 10500, "lag": 500},
{"partition": 1, "consumer_offset": 8000, "high_water_mark": 8750, "lag": 750}
]
}
AI β Human: "The 'analytics' group has a total lag of 1,250 messages on the 'events' topic. Partition 0 has 500 messages lag, partition 1 has 750 messages lag."
Advanced Analytics
Partition Leadership:
Human: Show me how partitions are distributed across brokers
AI β KafkaIQ: broker_leadership_distribution()
KafkaIQ Response: {
"total_partitions": 45,
"distribution": {
"1": 15,
"2": 15,
"3": 15
},
"by_topic": {
"orders": {"1": [0, 3], "2": [1, 4], "3": [2, 5]},
"events": {"1": [0, 2], "2": [1, 3], "3": [4, 5]}
}
}
AI β Human: "Partition leadership is well-balanced! Each of your 3 brokers leads exactly 15 partitions out of 45 total. The distribution looks optimal."
Offline Partitions Check:
Human: Are there any offline partitions?
AI β KafkaIQ: get_offline_partitions()
KafkaIQ Response: {
"offline_count": 0,
"by_topic": {}
}
AI β Human: "Great news! There are no offline partitions in your cluster. All partitions have active leaders."
Resource Monitoring
Broker Resources:
Human: What are the resources for broker-1?
AI β KafkaIQ: get_broker_resources("broker-1")
KafkaIQ Response: {
"broker_id": "broker-1",
"resources": {
"ram_gb": 32,
"storage_gb": 1000,
"cores": 8
},
"status": "active",
"note": "Hardcoded values - integrate with monitoring system"
}
AI β Human: "Broker-1 has 32GB RAM, 1TB storage, and 8 CPU cores. Status is active."
Email Notifications
Send Alert:
Human: Email the cluster status to admin@company.com
AI β KafkaIQ: send_email_notification("admin@company.com", "Cluster Status: OK\nBrokers: 3\nTopics: 15\nNo issues detected", "KafkaIQ Daily Report")
KafkaIQ Response: {
"status": "sent",
"message": "Email sent successfully",
"recipient": "admin@company.com",
"subject": "KafkaIQ Daily Report",
"timestamp": "2025-09-28T10:30:00"
}
AI β Human: "Successfully sent the cluster status to admin@company.com!"
Temporal Intelligence (NEW! π)
Trend-Aware Health Analysis:
Human: Has the cluster health changed in the last 2 hours?
AI β KafkaIQ: get_temporal_health_summary(2)
KafkaIQ Response: {
"time_window_hours": 2,
"health_checks_count": 24,
"current_status": "WARN",
"status_changes": [
{"from": "OK", "to": "WARN", "timestamp": 1703856000000}
],
"status_change_count": 1,
"alerts_count": 5,
"alert_frequencies": {
"CONSUMER_LAG": 5
}
}
AI β Human: "Your cluster status changed from OK to WARN 2 hours ago. There have been 5 consumer lag alerts during this period."
Recurring Issue Detection:
Human: What issues keep happening repeatedly?
AI β KafkaIQ: get_recurring_issues(3, 6)
KafkaIQ Response: {
"time_window_hours": 6,
"min_occurrences": 3,
"recurring_issues_count": 2,
"issues": [
{
"code": "CONSUMER_LAG",
"count": 8,
"first_seen": 1703835000000,
"last_seen": 1703856000000,
"severity_distribution": {"WARN": 6, "CRITICAL": 2},
"sample_message": "High lag group='analytics', topic='events': 15000"
},
{
"code": "UNDER_REPLICATED",
"count": 4,
"first_seen": 1703840000000,
"last_seen": 1703855000000,
"severity_distribution": {"WARN": 4}
}
]
}
AI β Human: "Found 2 recurring issues: Consumer lag occurred 8 times (6 WARN, 2 CRITICAL) and under-replication occurred 4 times in the last 6 hours. The analytics group on the events topic has been experiencing lag."
Lag Trend Analysis:
Human: Is consumer lag for the analytics group getting worse?
AI β KafkaIQ: get_lag_trend_analysis("analytics", "events", 6)
KafkaIQ Response: {
"code": "CONSUMER_LAG_analytics_events",
"sample_count": 12,
"min": 500,
"max": 15000,
"avg": 7250,
"current": 12000,
"trend": "increasing",
"time_window_hours": 6,
"spike_analysis": {
"spikes_detected": 3,
"average_lag": 7250,
"spike_threshold": 14500
}
}
AI β Human: "Yes, consumer lag for analytics/events is INCREASING. It ranges from 500 to 15,000 messages (avg: 7,250). Current lag is 12,000 messages. Detected 3 lag spikes above the threshold of 14,500 messages."
Transient vs Persistent Classification:
Human: Is this consumer lag issue a temporary spike or a persistent problem?
AI β KafkaIQ: detect_transient_vs_persistent("CONSUMER_LAG", 30)
KafkaIQ Response: {
"code": "CONSUMER_LAG",
"classification": "persistent",
"confidence": 0.85,
"occurrences": 8,
"duration_minutes": 350,
"avg_interval_minutes": 50,
"first_seen": 1703835000000,
"last_seen": 1703856000000
}
AI β Human: "This appears to be a PERSISTENT issue with 85% confidence. It has occurred 8 times over 350 minutes with an average interval of 50 minutes between occurrences. This is not a transient spike - it requires investigation."
Partition Flapping Detection:
Human: Are any partitions repeatedly going offline?
AI β KafkaIQ: get_partition_health_history("events", 12)
KafkaIQ Response: {
"topic_name": "events",
"time_window_hours": 12,
"flapping_partitions_count": 1,
"flapping_partitions": [
{
"topic": "events",
"partition": 2,
"offline_count": 5,
"first_offline": 1703820000000,
"last_offline": 1703856000000
}
]
}
AI β Human: "Yes! Partition 2 on the 'events' topic has gone offline 5 times in the last 12 hours. This is a flapping partition that needs attention - it may indicate a problematic broker or disk issue."
π― Use Cases
DevOps & Infrastructure Management
Automated Monitoring: "Alert me if any partitions go offline"
Health Dashboards: "Generate daily cluster health reports"
Incident Response: "What's causing the high consumer lag?"
Capacity Planning: "Show me broker resource utilization"
Data Engineering
Pipeline Management: "Create topics for new data streams"
Performance Tuning: "Which consumers are lagging behind?"
Data Quality: "Monitor topic configurations for compliance"
Stream Processing: "Set up topics for real-time analytics"
Development Workflows
Environment Setup: "Create development topics with proper configs"
Testing: "Clean up test topics after integration tests"
Debugging: "Why are messages not being consumed?"
Configuration Management: "Backup topic configurations"
Business Intelligence
Operational Insights: "Generate weekly Kafka usage reports"
Cost Optimization: "Identify underutilized topics"
SLA Monitoring: "Track consumer lag against SLAs"
Trend Analysis: "Show partition distribution trends"
ποΈ Architecture
Core Components
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β AI Assistant β β KafkaIQ β β Kafka Cluster β
β βββββΊβ MCP Server βββββΊβ β
β (Claude, GPT) β β (Port 8080) β β (Brokers) β
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β β β
β β β
βΌ βΌ βΌ
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β Natural Languageβ β Tool Execution β β Operations β
β Commands β β & Response β β & Monitoring β
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
Key Technologies
FastMCP: Model Control Protocol implementation
kafka-python: Kafka client library
Gmail SMTP: Email notification system
JSON: Structured data exchange
HTTP/REST: Communication protocol
π§ Configuration
Environment Variables
# Email Configuration (Optional)
KAFKAIQ_GMAIL_USER=your-email@gmail.com
KAFKAIQ_GMAIL_PASSWORD=your-app-password
# Temporal Memory Configuration
KAFKAIQ_TEMPORAL_RETENTION_HOURS=24 # How long to keep events (default: 24 hours)
KAFKAIQ_TEMPORAL_MAX_EVENTS=100000 # Max events to store (default: 100,000)
KAFKAIQ_TEMPORAL_AUTO_PERSIST=true # Auto-save state on exit (default: true)
KAFKAIQ_TEMPORAL_PERSIST_PATH=temporal_state.json # State file path
Alert Thresholds
Customize alert thresholds in kafka_inspector.alert_summary():
# Built-in thresholds (edit as needed)
offline_crit = 1 # Any offline partition = CRITICAL
urp_warn, urp_crit = 1, 5 # Under-replicated partitions
lag_warn, lag_crit = 10_000, 100_000 # Consumer lag thresholds
single_broker_warn = True # Warn on single broker setup
Temporal Memory Tuning
Retention Policy:
Default: 24 hours of event history
Increase for longer trend analysis
Decrease to reduce memory usage
Memory Considerations:
~100 bytes per event
Default config: ~10MB for 100k events
High-frequency monitoring (5-second intervals): ~1,700 events/hour
Estimated 24h memory: ~4MB (very reasonable)
Recommended Settings:
# Production (high-traffic cluster)
KAFKAIQ_TEMPORAL_RETENTION_HOURS=48
KAFKAIQ_TEMPORAL_MAX_EVENTS=200000
# Development (local testing)
KAFKAIQ_TEMPORAL_RETENTION_HOURS=6
KAFKAIQ_TEMPORAL_MAX_EVENTS=10000
π Security
Authentication
Kafka: Supports SASL/PLAIN, SASL/SCRAM, SSL/TLS
Email: Gmail App Passwords (2FA required)
MCP: HTTP-based with potential for API key integration
Best Practices
Use environment variables for credentials
Enable Kafka SSL/TLS in production
Implement network-level access controls
Regular credential rotation
Monitor access logs
π Monitoring & Observability
Built-in Metrics
Cluster health status (OK/WARN/CRITICAL)
Partition leadership distribution
Consumer lag across all groups
Offline partition detection
Under-replicated partition monitoring
Integration Points
Prometheus: Metrics export (future enhancement)
Grafana: Dashboard integration
Slack/Teams: Alert forwarding
PagerDuty: Incident management
SIEM: Security event logging
π€ AI Assistant Integration
Supported Platforms
Claude (Anthropic) - Primary development platform
ChatGPT (OpenAI) - Compatible with MCP
Custom AI Systems - Any MCP-compatible assistant
Integration Steps
Start KafkaIQ server
Configure AI assistant with MCP endpoint
Test connection with basic commands
Set up monitoring and alerting workflows
π Advanced Usage
Batch Operations
Human: Create 5 topics for microservices: user-service, order-service, payment-service, notification-service, audit-service
AI: I'll create all 5 topics for your microservices architecture...
Automated Workflows
Human: Set up monitoring that emails me if any consumer lag exceeds 50,000 messages
AI: I'll configure automated monitoring with email alerts for high consumer lag...
Custom Configurations
# Custom topic configuration example
topic_configs = {
"retention.ms": "604800000", # 7 days
"compression.type": "lz4",
"max.message.bytes": "1048576" # 1MB
}
π Troubleshooting
Common Issues
Connection Failed
Error: "Failed to connect to Kafka cluster"
Solution: Check if Kafka is running on specified port
Email Not Sending
Error: "Gmail authentication failed"
Solution: Enable 2FA and generate App Password
Consumer Lag Timeout
Error: "Consumer lag check timed out"
Solution: Increase timeout in KafkaConsumer config
Permission Denied
Error: "Not authorized to perform operation"
Solution: Check Kafka ACLs and user permissions
π€ Contributing
We welcome contributions!
Development Setup
# Clone and setup development environment
git clone https://github.com/yourusername/kafkaiq.git
cd kafkaiq
# Create virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
# or
venv\Scripts\activate # Windows
# Install development dependencies
pip install -r requirements.txt
# Run tests
python -m pytest tests/
# Start development server
python kafka_mcp_server.py
Contribution Areas
π Bug fixes and stability improvements
β¨ New tool implementations
π Documentation and examples
π§ͺ Test coverage expansion
π§ Performance optimizations
π License
This project is licensed under the MIT License - see the LICENSE file for details.
π Acknowledgments
FastMCP Team: For the excellent MCP framework
Kafka Community: For the robust kafka-python library
Anthropic: For Claude and MCP protocol development
π Support & Community
π·οΈ Tags
kafka ai mcp devops monitoring alerts data-engineering stream-processing automation python fastmcp claude chatgpt infrastructure temporal-memory trend-analysis aiops observability time-series anomaly-detection
π Academic Framing
Research Contribution:
"We augment tool-based MCP interaction with temporal operational memory, enabling trend-aware health assessment and longitudinal system reasoning without machine learning."
Key Novelties:
Stateful MCP Architecture: First MCP implementation with temporal state tracking
Trend-Aware Reasoning: Distinguish transient vs persistent failures using time-series analysis
Zero-ML Anomaly Detection: Recurring issue detection without machine learning models
Operational Memory: Lightweight temporal store optimized for observability workflows
Applicable Research Areas:
AIOps (AI for IT Operations)
Systems Conferences (SOSP, OSDI, NSDI)
Observability Research
Intelligent System Management
Citation Example:
@software{kafkaiq2025,
title={KafkaIQ: Temporal Operational Memory for MCP-based Kafka Management},
author={Your Name},
year={2025},
description={Stateful MCP server with trend-aware health assessment}
}
Made with β€οΈ for the AI and Data Engineering communities
Transform your Kafka operations with the power of AI - Experience KafkaIQ today!