Skip to main content
Glama

KafkaIQ 🧠⚑

alt text

AI-Powered Kafka Management via Model Control Protocol (MCP)

KafkaIQ is an intelligent Kafka management tool that exposes Apache Kafka operations through the Model Control Protocol (MCP), enabling AI assistants to seamlessly interact with and manage Kafka clusters through natural language commands.

πŸš€ Overview

Transform your Kafka operations with the power of AI. KafkaIQ acts as an intelligent middleware layer between AI systems and Kafka infrastructure, translating natural language requests into precise Kafka operations.

AI Assistant β†’ MCP Protocol β†’ KafkaIQ β†’ Apache Kafka Cluster

✨ Features

  • πŸ€– AI-Native Integration: Built specifically for AI assistants using MCP protocol

  • ⚑ Real-time Operations: Instant topic management, configuration, and monitoring

  • πŸ” Intelligent Inspection: Smart cluster analysis and health monitoring

  • πŸ“Š Advanced Analytics: Consumer lag analysis and partition leadership tracking

  • πŸ›‘οΈ Proactive Alerting: Automated health checks with CRITICAL/WARN/OK status

  • οΏ½ Temporal Memory: Track alerts and trends over time, detect recurring issues (NEW!)

  • πŸ“ˆ Trend Analysis: Distinguish transient vs persistent failures without ML

  • οΏ½πŸ“§ Email Notifications: Built-in Gmail SMTP integration for alerts

  • 🌐 HTTP API: RESTful streamable HTTP interface for easy integration

πŸ› οΈ Available Tools

Core Kafka Operations

Tool

Description

Example Usage

initialize_kafka_connection

Connect to Kafka cluster

Connect to localhost:9092

list_kafka_topics

List all topics

Show me all topics

describe_kafka_topic

Get topic details

Describe the 'orders' topic

create_kafka_topic

Create new topic

Create topic 'events' with 3 partitions

delete_kafka_topic

Delete topic

Remove the 'old-logs' topic

get_kafka_topic_config

Get topic configuration

Show config for 'user-data'

kafka_health_check

Check connection health

Is Kafka healthy?

get_cluster_details

Get cluster information

Show cluster overview

get_consumer_lag

Analyze consumer lag

Check lag for group 'processors'

kafka_alert_summary

Intelligent health analysis

Give me cluster health status

broker_leadership_distribution

Partition leadership analysis

Show broker partition distribution

get_offline_partitions

Find offline partitions

List all offline partitions

get_broker_resources

Get broker resources

Show resources for broker-1

send_email_notification

Send email alerts

Email cluster status to admin

Temporal Intelligence Tools (NEW! πŸ•)

Tool

Description

Example Usage

get_temporal_health_summary

Health trend analysis over time

Show health changes in last hour

get_recurring_issues

Detect repeated problems

Find issues that occurred 5+ times

get_issue_frequency

Count issue occurrences

How many lag spikes today?

get_lag_trend_analysis

Analyze consumer lag trends

Is lag increasing or decreasing?

get_partition_health_history

Track partition flapping

Which partitions keep going offline?

detect_transient_vs_persistent

Classify failure types

Is this issue temporary or ongoing?

export_temporal_state

Save temporal memory

Backup temporal data

import_temporal_state

Restore temporal memory

Load previous temporal state

get_temporal_statistics

Memory store statistics

Show temporal memory usage

clear_temporal_memory

Clear all temporal data

Reset temporal tracking

πŸ“¦ Installation

Prerequisites

  • Python 3.8+

  • Apache Kafka cluster

  • Gmail account (for email notifications)

Setup

# Clone the repository git clone https://github.com/ashfaqbs/kafkaiq.git cd kafkaiq # Install dependencies pip install fastmcp kafka-python # Set email credentials (optional) export KAFKAIQ_GMAIL_USER="your-email@gmail.com" export KAFKAIQ_GMAIL_PASSWORD="your-app-password" # Run the server python kafka_mcp_server.py

πŸš€ Quick Start

1. Start KafkaIQ Server

python kafka_mcp_server.py

Server starts on http://127.0.0.1:8080

2. Connect to Your AI Assistant

Configure your AI assistant to use the MCP endpoint at http://127.0.0.1:8080

3. Start Managing Kafka with Natural Language!

πŸ’¬ Example Conversations

Basic Operations

Connection Setup:

Human: Connect to my local Kafka cluster server : localhost:9092 AI β†’ KafkaIQ: initialize_kafka_connection("localhost:9092") KafkaIQ Response: "Successfully connected to Kafka cluster at localhost:9092" AI β†’ Human: "Connected to your Kafka cluster successfully!"

Topic Management:

Human: What topics exist in my cluster? AI β†’ KafkaIQ: list_kafka_topics() KafkaIQ Response: { "topics": ["orders", "users", "events", "payments"], "count": 4, "include_internal": false } AI β†’ Human: "Your cluster has 4 topics: orders, users, events, and payments."

Create New Topic:

Human: Create a topic called 'notifications' with 5 partitions AI β†’ KafkaIQ: create_kafka_topic("notifications", 5, 1) KafkaIQ Response: { "status": "success", "message": "Topic 'notifications' created successfully", "topic_name": "notifications", "num_partitions": 5, "replication_factor": 1 } AI β†’ Human: "Successfully created topic 'notifications' with 5 partitions!"

Health Monitoring

Cluster Health Check:

Human: How is my Kafka cluster doing? AI β†’ KafkaIQ: kafka_alert_summary() KafkaIQ Response: { "status": "OK", "signals": [], "summary": { "cluster_id": "kafka-cluster-1", "broker_count": 3, "topics_count": 15, "under_replicated": 0, "offline_partitions": 0, "single_broker": false, "lag": [ {"group": "processors", "topic": "orders", "total_lag": 50, "level": "OK"}, {"group": "analytics", "topic": "events", "total_lag": 1250, "level": "OK"} ] } } AI β†’ Human: "Your Kafka cluster is healthy! 3 brokers running, 15 topics, no offline partitions or under-replicated partitions. Consumer lag looks good across all groups."

Consumer Lag Analysis:

Human: Check consumer lag for the 'analytics' group on 'events' topic AI β†’ KafkaIQ: get_consumer_lag("analytics", "events") KafkaIQ Response: { "group_id": "analytics", "topic": "events", "total_lag": 1250, "partitions": [ {"partition": 0, "consumer_offset": 10000, "high_water_mark": 10500, "lag": 500}, {"partition": 1, "consumer_offset": 8000, "high_water_mark": 8750, "lag": 750} ] } AI β†’ Human: "The 'analytics' group has a total lag of 1,250 messages on the 'events' topic. Partition 0 has 500 messages lag, partition 1 has 750 messages lag."

Advanced Analytics

Partition Leadership:

Human: Show me how partitions are distributed across brokers AI β†’ KafkaIQ: broker_leadership_distribution() KafkaIQ Response: { "total_partitions": 45, "distribution": { "1": 15, "2": 15, "3": 15 }, "by_topic": { "orders": {"1": [0, 3], "2": [1, 4], "3": [2, 5]}, "events": {"1": [0, 2], "2": [1, 3], "3": [4, 5]} } } AI β†’ Human: "Partition leadership is well-balanced! Each of your 3 brokers leads exactly 15 partitions out of 45 total. The distribution looks optimal."

Offline Partitions Check:

Human: Are there any offline partitions? AI β†’ KafkaIQ: get_offline_partitions() KafkaIQ Response: { "offline_count": 0, "by_topic": {} } AI β†’ Human: "Great news! There are no offline partitions in your cluster. All partitions have active leaders."

Resource Monitoring

Broker Resources:

Human: What are the resources for broker-1? AI β†’ KafkaIQ: get_broker_resources("broker-1") KafkaIQ Response: { "broker_id": "broker-1", "resources": { "ram_gb": 32, "storage_gb": 1000, "cores": 8 }, "status": "active", "note": "Hardcoded values - integrate with monitoring system" } AI β†’ Human: "Broker-1 has 32GB RAM, 1TB storage, and 8 CPU cores. Status is active."

Email Notifications

Send Alert:

Human: Email the cluster status to admin@company.com AI β†’ KafkaIQ: send_email_notification("admin@company.com", "Cluster Status: OK\nBrokers: 3\nTopics: 15\nNo issues detected", "KafkaIQ Daily Report") KafkaIQ Response: { "status": "sent", "message": "Email sent successfully", "recipient": "admin@company.com", "subject": "KafkaIQ Daily Report", "timestamp": "2025-09-28T10:30:00" } AI β†’ Human: "Successfully sent the cluster status to admin@company.com!"

Temporal Intelligence (NEW! πŸ•)

Trend-Aware Health Analysis:

Human: Has the cluster health changed in the last 2 hours? AI β†’ KafkaIQ: get_temporal_health_summary(2) KafkaIQ Response: { "time_window_hours": 2, "health_checks_count": 24, "current_status": "WARN", "status_changes": [ {"from": "OK", "to": "WARN", "timestamp": 1703856000000} ], "status_change_count": 1, "alerts_count": 5, "alert_frequencies": { "CONSUMER_LAG": 5 } } AI β†’ Human: "Your cluster status changed from OK to WARN 2 hours ago. There have been 5 consumer lag alerts during this period."

Recurring Issue Detection:

Human: What issues keep happening repeatedly? AI β†’ KafkaIQ: get_recurring_issues(3, 6) KafkaIQ Response: { "time_window_hours": 6, "min_occurrences": 3, "recurring_issues_count": 2, "issues": [ { "code": "CONSUMER_LAG", "count": 8, "first_seen": 1703835000000, "last_seen": 1703856000000, "severity_distribution": {"WARN": 6, "CRITICAL": 2}, "sample_message": "High lag group='analytics', topic='events': 15000" }, { "code": "UNDER_REPLICATED", "count": 4, "first_seen": 1703840000000, "last_seen": 1703855000000, "severity_distribution": {"WARN": 4} } ] } AI β†’ Human: "Found 2 recurring issues: Consumer lag occurred 8 times (6 WARN, 2 CRITICAL) and under-replication occurred 4 times in the last 6 hours. The analytics group on the events topic has been experiencing lag."

Lag Trend Analysis:

Human: Is consumer lag for the analytics group getting worse? AI β†’ KafkaIQ: get_lag_trend_analysis("analytics", "events", 6) KafkaIQ Response: { "code": "CONSUMER_LAG_analytics_events", "sample_count": 12, "min": 500, "max": 15000, "avg": 7250, "current": 12000, "trend": "increasing", "time_window_hours": 6, "spike_analysis": { "spikes_detected": 3, "average_lag": 7250, "spike_threshold": 14500 } } AI β†’ Human: "Yes, consumer lag for analytics/events is INCREASING. It ranges from 500 to 15,000 messages (avg: 7,250). Current lag is 12,000 messages. Detected 3 lag spikes above the threshold of 14,500 messages."

Transient vs Persistent Classification:

Human: Is this consumer lag issue a temporary spike or a persistent problem? AI β†’ KafkaIQ: detect_transient_vs_persistent("CONSUMER_LAG", 30) KafkaIQ Response: { "code": "CONSUMER_LAG", "classification": "persistent", "confidence": 0.85, "occurrences": 8, "duration_minutes": 350, "avg_interval_minutes": 50, "first_seen": 1703835000000, "last_seen": 1703856000000 } AI β†’ Human: "This appears to be a PERSISTENT issue with 85% confidence. It has occurred 8 times over 350 minutes with an average interval of 50 minutes between occurrences. This is not a transient spike - it requires investigation."

Partition Flapping Detection:

Human: Are any partitions repeatedly going offline? AI β†’ KafkaIQ: get_partition_health_history("events", 12) KafkaIQ Response: { "topic_name": "events", "time_window_hours": 12, "flapping_partitions_count": 1, "flapping_partitions": [ { "topic": "events", "partition": 2, "offline_count": 5, "first_offline": 1703820000000, "last_offline": 1703856000000 } ] } AI β†’ Human: "Yes! Partition 2 on the 'events' topic has gone offline 5 times in the last 12 hours. This is a flapping partition that needs attention - it may indicate a problematic broker or disk issue."

🎯 Use Cases

DevOps & Infrastructure Management

  • Automated Monitoring: "Alert me if any partitions go offline"

  • Health Dashboards: "Generate daily cluster health reports"

  • Incident Response: "What's causing the high consumer lag?"

  • Capacity Planning: "Show me broker resource utilization"

Data Engineering

  • Pipeline Management: "Create topics for new data streams"

  • Performance Tuning: "Which consumers are lagging behind?"

  • Data Quality: "Monitor topic configurations for compliance"

  • Stream Processing: "Set up topics for real-time analytics"

Development Workflows

  • Environment Setup: "Create development topics with proper configs"

  • Testing: "Clean up test topics after integration tests"

  • Debugging: "Why are messages not being consumed?"

  • Configuration Management: "Backup topic configurations"

Business Intelligence

  • Operational Insights: "Generate weekly Kafka usage reports"

  • Cost Optimization: "Identify underutilized topics"

  • SLA Monitoring: "Track consumer lag against SLAs"

  • Trend Analysis: "Show partition distribution trends"

πŸ—οΈ Architecture

Core Components

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ AI Assistant β”‚ β”‚ KafkaIQ β”‚ β”‚ Kafka Cluster β”‚ β”‚ │◄──►│ MCP Server │◄──►│ β”‚ β”‚ (Claude, GPT) β”‚ β”‚ (Port 8080) β”‚ β”‚ (Brokers) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β–Ό β–Ό β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Natural Languageβ”‚ β”‚ Tool Execution β”‚ β”‚ Operations β”‚ β”‚ Commands β”‚ β”‚ & Response β”‚ β”‚ & Monitoring β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Key Technologies

  • FastMCP: Model Control Protocol implementation

  • kafka-python: Kafka client library

  • Gmail SMTP: Email notification system

  • JSON: Structured data exchange

  • HTTP/REST: Communication protocol

πŸ”§ Configuration

Environment Variables

# Email Configuration (Optional) KAFKAIQ_GMAIL_USER=your-email@gmail.com KAFKAIQ_GMAIL_PASSWORD=your-app-password # Temporal Memory Configuration KAFKAIQ_TEMPORAL_RETENTION_HOURS=24 # How long to keep events (default: 24 hours) KAFKAIQ_TEMPORAL_MAX_EVENTS=100000 # Max events to store (default: 100,000) KAFKAIQ_TEMPORAL_AUTO_PERSIST=true # Auto-save state on exit (default: true) KAFKAIQ_TEMPORAL_PERSIST_PATH=temporal_state.json # State file path

Alert Thresholds

Customize alert thresholds in kafka_inspector.alert_summary():

# Built-in thresholds (edit as needed) offline_crit = 1 # Any offline partition = CRITICAL urp_warn, urp_crit = 1, 5 # Under-replicated partitions lag_warn, lag_crit = 10_000, 100_000 # Consumer lag thresholds single_broker_warn = True # Warn on single broker setup

Temporal Memory Tuning

Retention Policy:

  • Default: 24 hours of event history

  • Increase for longer trend analysis

  • Decrease to reduce memory usage

Memory Considerations:

  • ~100 bytes per event

  • Default config: ~10MB for 100k events

  • High-frequency monitoring (5-second intervals): ~1,700 events/hour

  • Estimated 24h memory: ~4MB (very reasonable)

Recommended Settings:

# Production (high-traffic cluster) KAFKAIQ_TEMPORAL_RETENTION_HOURS=48 KAFKAIQ_TEMPORAL_MAX_EVENTS=200000 # Development (local testing) KAFKAIQ_TEMPORAL_RETENTION_HOURS=6 KAFKAIQ_TEMPORAL_MAX_EVENTS=10000

πŸ” Security

Authentication

  • Kafka: Supports SASL/PLAIN, SASL/SCRAM, SSL/TLS

  • Email: Gmail App Passwords (2FA required)

  • MCP: HTTP-based with potential for API key integration

Best Practices

  • Use environment variables for credentials

  • Enable Kafka SSL/TLS in production

  • Implement network-level access controls

  • Regular credential rotation

  • Monitor access logs

πŸ“Š Monitoring & Observability

Built-in Metrics

  • Cluster health status (OK/WARN/CRITICAL)

  • Partition leadership distribution

  • Consumer lag across all groups

  • Offline partition detection

  • Under-replicated partition monitoring

Integration Points

  • Prometheus: Metrics export (future enhancement)

  • Grafana: Dashboard integration

  • Slack/Teams: Alert forwarding

  • PagerDuty: Incident management

  • SIEM: Security event logging

🀝 AI Assistant Integration

Supported Platforms

  • Claude (Anthropic) - Primary development platform

  • ChatGPT (OpenAI) - Compatible with MCP

  • Custom AI Systems - Any MCP-compatible assistant

Integration Steps

  1. Start KafkaIQ server

  2. Configure AI assistant with MCP endpoint

  3. Test connection with basic commands

  4. Set up monitoring and alerting workflows

πŸš€ Advanced Usage

Batch Operations

Human: Create 5 topics for microservices: user-service, order-service, payment-service, notification-service, audit-service AI: I'll create all 5 topics for your microservices architecture...

Automated Workflows

Human: Set up monitoring that emails me if any consumer lag exceeds 50,000 messages AI: I'll configure automated monitoring with email alerts for high consumer lag...

Custom Configurations

# Custom topic configuration example topic_configs = { "retention.ms": "604800000", # 7 days "compression.type": "lz4", "max.message.bytes": "1048576" # 1MB }

πŸ› Troubleshooting

Common Issues

Connection Failed

Error: "Failed to connect to Kafka cluster" Solution: Check if Kafka is running on specified port

Email Not Sending

Error: "Gmail authentication failed" Solution: Enable 2FA and generate App Password

Consumer Lag Timeout

Error: "Consumer lag check timed out" Solution: Increase timeout in KafkaConsumer config

Permission Denied

Error: "Not authorized to perform operation" Solution: Check Kafka ACLs and user permissions

🀝 Contributing

We welcome contributions!

Development Setup

# Clone and setup development environment git clone https://github.com/yourusername/kafkaiq.git cd kafkaiq # Create virtual environment python -m venv venv source venv/bin/activate # Linux/Mac # or venv\Scripts\activate # Windows # Install development dependencies pip install -r requirements.txt # Run tests python -m pytest tests/ # Start development server python kafka_mcp_server.py

Contribution Areas

  • πŸ› Bug fixes and stability improvements

  • ✨ New tool implementations

  • πŸ“š Documentation and examples

  • πŸ§ͺ Test coverage expansion

  • πŸ”§ Performance optimizations

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ™ Acknowledgments

  • FastMCP Team: For the excellent MCP framework

  • Kafka Community: For the robust kafka-python library

  • Anthropic: For Claude and MCP protocol development

πŸ“ž Support & Community

🏷️ Tags

kafka ai mcp devops monitoring alerts data-engineering stream-processing automation python fastmcp claude chatgpt infrastructure temporal-memory trend-analysis aiops observability time-series anomaly-detection

πŸŽ“ Academic Framing

Research Contribution:

"We augment tool-based MCP interaction with temporal operational memory, enabling trend-aware health assessment and longitudinal system reasoning without machine learning."

Key Novelties:

  1. Stateful MCP Architecture: First MCP implementation with temporal state tracking

  2. Trend-Aware Reasoning: Distinguish transient vs persistent failures using time-series analysis

  3. Zero-ML Anomaly Detection: Recurring issue detection without machine learning models

  4. Operational Memory: Lightweight temporal store optimized for observability workflows

Applicable Research Areas:

  • AIOps (AI for IT Operations)

  • Systems Conferences (SOSP, OSDI, NSDI)

  • Observability Research

  • Intelligent System Management

Citation Example:

@software{kafkaiq2025, title={KafkaIQ: Temporal Operational Memory for MCP-based Kafka Management}, author={Your Name}, year={2025}, description={Stateful MCP server with trend-aware health assessment} }

Made with ❀️ for the AI and Data Engineering communities

Transform your Kafka operations with the power of AI - Experience KafkaIQ today!

-
security - not tested
F
license - not found
-
quality - not tested

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server