Skip to main content
Glama
kafka_consumer_groups.py5.12 kB
from typing import Any, Dict, List from clients.http_client import api_client from fastmcp import FastMCP """ Registers all Kafka consumer group operations with the MCP server. """ def register_kafka_consumer_groups(mcp: FastMCP): @mcp.tool() async def list_consumer_groups(environment: str) -> List[Dict[str, Any]]: """ Retrieve a list of all Kafka consumer groups. Args: environment: The environment name. Returns: A list of consumer group objects. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/consumers" return await api_client._make_request("GET", endpoint) @mcp.tool() async def list_consumer_groups_by_topic(environment: str, topic: str) -> List[Dict[str, Any]]: """ Retrieve a list of consumer groups by a specific topic. Args: environment: The environment name. topic: The name of the topic. Returns: A list of consumer group objects. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/consumers/{topic}" return await api_client._make_request("GET", endpoint) @mcp.tool() async def update_consumer_group_offsets( environment: str, group_id: str, offsets: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Update the offset for a consumer group topic-partition tuples. Args: environment: The environment name. group_id: The ID of the consumer group. offsets: A list of topic-partition offset objects. Returns: The result of the update operation. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/consumers/{group_id}/offsets" return await api_client._make_request("PUT", endpoint, json=offsets) @mcp.tool() async def delete_consumer_group_offsets( environment: str, group_id: str, offsets: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Delete offsets for a consumer group topic-partition tuples. Args: environment: The environment name. group_id: The ID of the consumer group. offsets: A list of topic-partition objects. Returns: The result of the delete operation. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/consumers/{group_id}/offsets/delete" return await api_client._make_request("POST", endpoint, json=offsets) @mcp.tool() async def update_consumer_group_topic_partition_offset( environment: str, group_id: str, topic: str, partition: int, offset: int ) -> Dict[str, Any]: """ Update the offset for a topic-partition for a given group. Args: environment: The environment name. group_id: The ID of the consumer group. topic: The topic name. partition: The partition number. offset: The new offset value. Returns: The result of the update operation. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/consumers/{group_id}/offsets/topics/{topic}/partitions/{partition}" payload = {"offset": offset} return await api_client._make_request("PUT", endpoint, json=payload) @mcp.tool() async def delete_consumer_group_topic_partition_offset( environment: str, group_id: str, topic: str, partition: int ) -> Dict[str, Any]: """ Delete the offset for a topic-partition for a given group. Args: environment: The environment name. group_id: The ID of the consumer group. topic: The topic name. partition: The partition number. Returns: The result of the delete operation. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/consumers/{group_id}/topics/{topic}/partitions/{partition}/offsets" return await api_client._make_request("DELETE", endpoint) @mcp.tool() async def delete_consumer_group(environment: str, group_id: str) -> Dict[str, Any]: """ Delete a consumer group. Args: environment: The environment name. group_id: The ID of the consumer group to delete. Returns: The result of the delete operation. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/consumers/{group_id}" return await api_client._make_request("DELETE", endpoint) @mcp.prompt() def list_consumer_groups_for_topic(topic: str, environment: str) -> List[str]: """List consumer groups for a specified topic in a specified environment""" return f""" Please list consumer groups for the '{topic}' topic in the '{environment}' environment """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stereosky/lenses-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server