Skip to main content
Glama
Script.py27.3 kB
# app.py import json from typing import Any, Dict, List, Optional, Union from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType, NewTopic from kafka import KafkaConsumer, TopicPartition from kafka.errors import KafkaError, NoBrokersAvailable, KafkaTimeoutError, TopicAlreadyExistsError from typing import Union, List from typing import Union, List from kafka.admin import KafkaAdminClient from typing import Union, List from kafka.admin import KafkaAdminClient from typing import Union, List from kafka.admin import KafkaAdminClient from typing import Union, List from kafka.admin import KafkaAdminClient from typing import Union, List from kafka.admin import KafkaAdminClient from kafka.errors import KafkaError class KafkaInspector: """ Holds broker details (object-level) and exposes single-responsibility utilities. """ def __init__( self, bootstrap_servers: Union[str, List[str]], client_id: str = "kafka-inspector", request_timeout_ms: int = 45000, # keep > session_timeout_ms api_version_auto_timeout_ms: int = 8000, ): self.bootstrap_servers = bootstrap_servers self.client_id = client_id self.request_timeout_ms = request_timeout_ms self.api_version_auto_timeout_ms = api_version_auto_timeout_ms self._admin = KafkaAdminClient( bootstrap_servers=self.bootstrap_servers, client_id=self.client_id, request_timeout_ms=self.request_timeout_ms, api_version_auto_timeout_ms=self.api_version_auto_timeout_ms, ) def list_topics(self, include_internal: bool = False) -> List[str]: """ Return topic names in the cluster. By default skips internal topics (e.g., __consumer_offsets). """ try: topics_meta = self._admin.describe_topics() # all topics names: List[str] = [] for t in topics_meta: if not include_internal and t.get("is_internal", False): continue names.append(t["topic"]) return sorted(names) except KafkaError as e: print(f"[LIST_TOPICS] failed: {e}") return [] def alert_summary(self) -> dict: """ Auto-discovers cluster signals + lag and returns OK/WARN/CRITICAL without external params. Built-in thresholds (tune here if needed): - offline_crit: any offline partition -> CRITICAL - urp_warn/urp_crit: under-replicated partitions thresholds - lag_warn/lag_crit: per (group, topic) lag thresholds """ import time from kafka.errors import KafkaError # ---- thresholds (edit here if you want different defaults) ---- offline_crit = 1 urp_warn, urp_crit = 1, 5 lag_warn, lag_crit = 10_000, 100_000 single_broker_warn = True def rank(level: str) -> int: return {"OK": 0, "WARN": 1, "CRITICAL": 2}[level] overall = "OK" signals: list[dict] = [] # ---- cluster health (uses your existing helper) ---- cluster = self.get_cluster_details() if "error" in cluster: return { "status": "CRITICAL", "signals": [{"level": "CRITICAL", "code": "CLUSTER_UNREACHABLE", "message": cluster["error"], "data": {}}], "summary": {"cluster_id": None}, "timestamp": int(time.time() * 1000), } cid = cluster.get("cluster_id") brokers = cluster.get("broker_count", 0) topics_count = cluster.get("topics_count", 0) urp = cluster.get("health", {}).get("under_replicated_count", 0) offline = cluster.get("health", {}).get("offline_partition_count", 0) single_broker = cluster.get("health", {}).get("single_broker", False) # offline partitions -> critical if offline >= offline_crit: signals.append({ "level": "CRITICAL", "code": "OFFLINE_PARTITIONS", "message": f"{offline} partition(s) offline (no leader)", "data": {"offline_partitions": offline}, }) overall = "CRITICAL" # URP if urp >= urp_crit: lvl = "CRITICAL" elif urp >= urp_warn: lvl = "WARN" else: lvl = "OK" if lvl != "OK": signals.append({ "level": lvl, "code": "UNDER_REPLICATED", "message": f"{urp} partition(s) under-replicated", "data": {"under_replicated": urp}, }) if rank(lvl) > rank(overall): overall = lvl # single broker (optional warning) if single_broker_warn and single_broker: signals.append({ "level": "WARN", "code": "SINGLE_BROKER", "message": "Cluster has a single broker (no replication/failover)", "data": {"broker_count": brokers}, }) if rank("WARN") > rank(overall): overall = "WARN" # ---- auto-discover groups & topics they actually consume ---- lag_summaries: list[dict] = [] try: groups = self._admin.list_consumer_groups() # [(group_id, protocol_type)] group_ids = [g[0] for g in groups] for gid in group_ids: offsets = self._admin.list_consumer_group_offsets(gid) # {TopicPartition: OffsetAndMetadata} topics_for_group = sorted({tp.topic for tp in offsets.keys()}) for topic in topics_for_group: lag = self.get_consumer_lag(gid, topic, print_output=False) total = int(lag.get("total_lag", 0)) if total >= lag_crit: lvl = "CRITICAL" elif total >= lag_warn: lvl = "WARN" else: lvl = "OK" lag_summaries.append({ "group": gid, "topic": topic, "total_lag": total, "level": lvl, }) if lvl != "OK": signals.append({ "level": lvl, "code": "CONSUMER_LAG", "message": f"High lag group='{gid}', topic='{topic}': {total}", "data": {"group": gid, "topic": topic, "total_lag": total}, }) if rank(lvl) > rank(overall): overall = lvl except KafkaError as e: signals.append({ "level": "WARN", "code": "LAG_DISCOVERY_FAILED", "message": f"Could not enumerate groups/offsets: {e}", "data": {}, }) if rank("WARN") > rank(overall): overall = "WARN" return { "status": overall, "signals": signals, "summary": { "cluster_id": cid, "broker_count": brokers, "topics_count": topics_count, "under_replicated": urp, "offline_partitions": offline, "single_broker": single_broker, "lag": lag_summaries, }, "timestamp": int(time.time() * 1000), } def broker_leadership_distribution(self, include_internal: bool = False) -> dict: """ Return how many partitions each broker is the leader for. Example Output: { "total_partitions": int, "distribution": { broker_id: count }, "by_topic": { topic: { broker_id: [partitions...] } } } """ try: topics_meta = self._admin.describe_topics() except Exception as e: return {"error": f"describe_topics failed: {e}"} distribution = {} by_topic = {} total = 0 for t in topics_meta: if not include_internal and t.get("is_internal", False): continue topic_name = t["topic"] by_topic[topic_name] = {} for p in t.get("partitions", []) or []: leader = p.get("leader", -1) if leader == -1: continue # skip offline partitions total += 1 distribution[leader] = distribution.get(leader, 0) + 1 by_topic[topic_name].setdefault(leader, []).append(p["partition"]) return { "total_partitions": total, "distribution": distribution, "by_topic": by_topic, } def get_offline_partitions(self, include_internal: bool = False) -> dict: """ Returns all offline partitions (leader == -1). Example Output: { "offline_count": int, "by_topic": { topic: [partition_ids...] } } """ try: topics_meta = self._admin.describe_topics() except Exception as e: return {"error": f"describe_topics failed: {e}"} result = {"offline_count": 0, "by_topic": {}} for t in topics_meta: if not include_internal and t.get("is_internal", False): continue topic_name = t["topic"] partitions = t.get("partitions", []) or [] for p in partitions: if p.get("leader", -1) == -1: result["offline_count"] += 1 result["by_topic"].setdefault(topic_name, []).append(p["partition"]) return result def get_consumer_lag(self, group_id: str, topic: str, print_output: bool = True) -> Dict[str, Any]: """ Compute lag for one topic in one consumer group. Returns: { "group_id": str, "topic": str, "by_partition": { <p>: {"end": int, "committed": int|None, "lag": int} }, "total_lag": int } """ result: Dict[str, Any] = {"group_id": group_id, "topic": topic, "by_partition": {}, "total_lag": 0} try: probe = KafkaConsumer( bootstrap_servers=self.bootstrap_servers, group_id=group_id, # we want committed offsets for this group enable_auto_commit=False, auto_offset_reset="earliest", request_timeout_ms=45000, # must be > session_timeout_ms consumer_timeout_ms=5000, client_id=f"{self.client_id}-lag", ) parts = probe.partitions_for_topic(topic) if not parts: if print_output: print(f"[LAG] topic '{topic}' has no partitions (or not found).") probe.close() return result tps = [TopicPartition(topic, p) for p in sorted(parts)] end_offsets = probe.end_offsets(tps) total = 0 for tp in tps: end = int(end_offsets.get(tp, 0)) committed = probe.committed(tp) # None if group never committed on this partition lag = end if committed is None else max(end - int(committed), 0) result["by_partition"][tp.partition] = { "end": end, "committed": None if committed is None else int(committed), "lag": int(lag), } total += int(lag) result["total_lag"] = total probe.close() if print_output: print(f"[LAG] group='{group_id}' topic='{topic}' total={total}") compact = {p: v["lag"] for p, v in result["by_partition"].items()} print(f" by_partition={compact}") return result except KafkaError as e: if print_output: print(f"[LAG] failed for group='{group_id}' topic='{topic}': {e}") result["error"] = f"KafkaError: {e}" return result def get_cluster_details(self) -> dict: """ Returns a compact cluster summary useful for analysis: - cluster_id, controller_id - brokers (node_id, host, port), broker_count - topics_count (incl/excl internal), total_partitions - consumer_groups_count - health signals: under_replicated_count, offline_partition_count, topics_with_rf_gt_brokers, single_broker """ from kafka.errors import KafkaError summary = { "cluster_id": None, "controller_id": None, "brokers": [], "broker_count": 0, "topics_count": 0, "topics_internal_count": 0, "total_partitions": 0, "consumer_groups_count": 0, "health": { "under_replicated_count": 0, "offline_partition_count": 0, "topics_with_rf_gt_brokers": [], "single_broker": False, }, } try: # --- Core cluster meta meta = self._admin.describe_cluster() brokers = meta.get("brokers", []) or [] summary["cluster_id"] = meta.get("cluster_id") summary["controller_id"] = meta.get("controller_id") summary["brokers"] = [ { "node_id": b.get("node_id"), "host": b.get("host"), "port": b.get("port"), "rack": b.get("rack"), } for b in brokers ] summary["broker_count"] = len(brokers) summary["health"]["single_broker"] = (len(brokers) == 1) # --- Topics & partitions + health topics_meta = self._admin.describe_topics() summary["topics_count"] = len(topics_meta) internal = 0 total_partitions = 0 urp = 0 offline = 0 rf_gt_brokers = set() for t in topics_meta: if t.get("is_internal", False): internal += 1 parts = t.get("partitions", []) or [] total_partitions += len(parts) # replication factor (assume same across partitions) rf = len(parts[0]["replicas"]) if parts else 0 if summary["broker_count"] and rf > summary["broker_count"]: rf_gt_brokers.add(t["topic"]) for p in parts: replicas = p.get("replicas", []) or [] isr = p.get("isr", []) or [] leader = p.get("leader", -1) # Under-replicated when ISR size < replica count if len(isr) < len(replicas): urp += 1 # Offline partition when leader is -1 if leader == -1: offline += 1 summary["topics_internal_count"] = internal summary["total_partitions"] = total_partitions summary["health"]["under_replicated_count"] = urp summary["health"]["offline_partition_count"] = offline summary["health"]["topics_with_rf_gt_brokers"] = sorted(list(rf_gt_brokers)) # --- Consumer groups count try: groups = self._admin.list_consumer_groups() # [(group_id, protocol_type)] summary["consumer_groups_count"] = len(groups) except KafkaError: # not fatal for cluster summary pass return summary except KafkaError as e: return {"error": f"KafkaError: {e}"} except Exception as e: return {"error": f"Unexpected: {repr(e)}"} # ------------------ Public: Topic Details (one-call) ------------------ # Ready def get_topic_details(self, topic: str) -> Dict[str, Any]: """ Returns a structured dict with: - exists: bool - metadata: {partitions: int, replication_factor: int, partitions_detail: [{partition, leader, replicas, isr}]} - configs: {name: value} - watermarks: {partition: {beginning, end, approx_messages}} """ try: meta = self._topic_metadata(topic) if not meta["exists"]: return { "topic": topic, "exists": False, "metadata": None, "configs": None, "watermarks": None, } cfg = self._topic_configs(topic) wmarks = self._topic_watermarks(topic) return { "topic": topic, "exists": True, "metadata": meta["metadata"], "configs": cfg, "watermarks": wmarks, } except (NoBrokersAvailable, KafkaTimeoutError, KafkaError) as e: return { "topic": topic, "exists": None, "error": f"{type(e).__name__}: {e}", "metadata": None, "configs": None, "watermarks": None, } # ------------------ Internal: Metadata ------------------ # def _topic_metadata(self, topic: str) -> Dict[str, Any]: """ Uses AdminClient.describe_topics() and extracts: partitions, replication_factor, per-partition leader/replicas/isr. """ topics_meta = self._admin.describe_topics() # all topics chosen: Optional[Dict[str, Any]] = None for t in topics_meta: if t.get("topic") == topic: chosen = t break if not chosen: return {"exists": False, "metadata": None} parts = chosen.get("partitions", []) or [] rf = len(parts[0].get("replicas", [])) if parts else 0 partitions_detail = [] for p in parts: partitions_detail.append( { "partition": p.get("partition"), "leader": p.get("leader", -1), "replicas": p.get("replicas", []), "isr": p.get("isr", []), } ) return { "exists": True, "metadata": { "partitions": len(parts), "replication_factor": rf, "partitions_detail": partitions_detail, }, } # ------------------ Internal: Configs (robust to protocol tuple shapes) ------------------ # def _topic_configs(self, topic: str) -> Dict[str, str]: """ Uses AdminClient.describe_configs([TOPIC]) and normalizes output to {name: value}. Handles both 4-tuple and 5-tuple resource responses. """ try: responses = self._admin.describe_configs( [ConfigResource(ConfigResourceType.TOPIC, topic)], include_synonyms=False, ) except KafkaError: return {} cfg: Dict[str, str] = {} resp_list = responses if isinstance(responses, list) else [responses] for resp in resp_list: resources = getattr(resp, "resources", []) or [] for res in resources: # res is often a tuple: # (err_code, rtype, rname, configs) OR (err_code, err_msg, rtype, rname, configs) err, rname, configs = 0, topic, None if isinstance(res, (list, tuple)): if len(res) >= 5: err, _err_msg, _rtype, rname, configs = res[:5] elif len(res) >= 4: err, _rtype, rname, configs = res[:4] else: configs = res[-1] if res else None else: err = getattr(res, "error_code", 0) rname = getattr(res, "resource_name", topic) configs = getattr(res, "configs", None) if err: continue if not configs: continue for entry in configs: if isinstance(entry, (list, tuple)) and len(entry) >= 2: name, value = entry[0], entry[1] else: name = getattr(entry, "name", None) value = getattr(entry, "value", None) if name is not None: cfg[str(name)] = None if value is None else str(value) return cfg # ------------------ Internal: Watermarks ------------------ # def _topic_watermarks(self, topic: str) -> Dict[int, Dict[str, Optional[int]]]: """ Uses a lightweight KafkaConsumer to fetch beginning/end offsets per partition. Returns {partition: {beginning, end, approx_messages}} """ c = KafkaConsumer( bootstrap_servers=self.bootstrap_servers, group_id=None, client_id=f"{self.client_id}-wmarks", enable_auto_commit=False, request_timeout_ms=20000, consumer_timeout_ms=5000, ) parts = c.partitions_for_topic(topic) if not parts: c.close() return {} tps = [TopicPartition(topic, p) for p in sorted(parts)] beginnings = c.beginning_offsets(tps) ends = c.end_offsets(tps) summary: Dict[int, Dict[str, Optional[int]]] = {} for tp in tps: b = beginnings.get(tp) e = ends.get(tp) summary[tp.partition] = { "beginning": b, "end": e, "approx_messages": (e - b) if (b is not None and e is not None) else None, } c.close() return summary # 2 Ready def create_topic( self, name: str, num_partitions: int = 1, replication_factor: int = 1, configs: Optional[Dict[str, str]] = None, if_not_exists: bool = True, timeout_ms: int = 15000, ) -> Dict[str, Any]: """ Create a topic. On your single-broker setup, replication_factor must be 1. Returns a structured result with 'created': True/False and 'error' if any. """ try: new_topic = NewTopic( name=name, num_partitions=num_partitions, replication_factor=replication_factor, topic_configs=configs or {}, ) self._admin.create_topics([new_topic], timeout_ms=timeout_ms, validate_only=False) result = {"topic": name, "created": True, "error": None} print(f"[CREATE] topic='{name}' P={num_partitions} RF={replication_factor}") return result except TopicAlreadyExistsError: if if_not_exists: msg = "already exists" result = {"topic": name, "created": False, "error": msg} print(f"[CREATE] SKIP topic='{name}' ({msg})") return result raise except KafkaError as e: result = {"topic": name, "created": False, "error": f"KafkaError: {e}"} print(f"[CREATE] FAIL topic='{name}' -> {e}") return result # ------------------ Test in main ------------------ # if __name__ == "__main__": # Your Docker Compose exposes OUTSIDE on localhost:9092 inspector = KafkaInspector(bootstrap_servers="localhost:9092") # 1 topic meta data ----------- # topic_name = "my-topic" # change if needed # details = inspector.get_topic_details(topic_name) # # Human-readable line # if details.get("exists"): # meta = details["metadata"] # print(f"[Topic] {topic_name} P={meta['partitions']} RF={meta['replication_factor']}") # else: # print(f"[Topic] {topic_name} does not exist or cluster unreachable.") # # Full JSON (easy to inspect/parse) # print(json.dumps(details, indent=2)) # 1 topic meta data ----------- # 2 Create topic ----- # # Choose a topic that is NOT auto-created by compose to test creation # topic_to_create = "demo-topic" # # On a single broker, RF must be 1. You can pick multiple partitions if you want. # create_result = inspector.create_topic( # name=topic_to_create, # num_partitions=3, # replication_factor=1, # configs={"retention.ms": "600000"}, # 10 minutes (example) # if_not_exists=True, # ) # # Verify by fetching details # details = inspector.get_topic_details(topic_to_create) # # Print verification succinctly + full JSON # if details.get("exists"): # meta = details["metadata"] # print(f"[VERIFY] {topic_to_create} P={meta['partitions']} RF={meta['replication_factor']}") # else: # print(f"[VERIFY] topic '{topic_to_create}' not found") # print(json.dumps({"create_result": create_result, "details": details}, indent=2)) # Create topic ----- #3 cluster details most useful for analysis (controller, brokers, topic/partition stats, consumer groups count, and health signals like offline/under-replicated partitions and RF>brokers): --------------------------------------------------- # cluster = inspector.get_cluster_details() # # Human line # if "error" not in cluster: # print(f"[CLUSTER] id={cluster['cluster_id']} brokers={cluster['broker_count']} " # f"controller={cluster['controller_id']} topics={cluster['topics_count']} " # f"parts={cluster['total_partitions']} groups={cluster['consumer_groups_count']}") # else: # print(cluster["error"]) # # Full JSON # print(json.dumps(cluster, indent=2)) # -------------------------------------------------------- #4 get_consumer_lag --------- # group_id = "lag-demo" # topic = "my-topic" # lag = inspector.get_consumer_lag(group_id=group_id, topic=topic, print_output=True) # # Also dump full JSON (handy for verification) # print(json.dumps(lag, indent=2)) # get_consumer_lag --------- # 5 get_offline_partitions ---- # offline = inspector.get_offline_partitions() # print(f"[OFFLINE] count={offline.get('offline_count')}") # print(json.dumps(offline, indent=2)) # ---------------- # 6 ---- dist = inspector.broker_leadership_distribution() # print("[LEADERSHIP DISTRIBUTION]") # print(json.dumps(dist, indent=2)) # --------- # 7 alert summaries ----------------------------- # summary = inspector.alert_summary() # print(f"[ALERT] status={summary['status']}") # print(json.dumps(summary, indent=2)) # ----------------------------- # topics = inspector.list_topics() # default: exclude internal # print("[TOPICS] (exclude internal):", topics) # topics_all = inspector.list_topics(include_internal=True) # print("[TOPICS] (include internal):", topics_all)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ojhaayush03/kafka_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server