update_consumer_group_topic_partition_offset
Modify consumer group offsets for specific Kafka topic partitions to control data consumption or recover from processing issues.
Instructions
Update the offset for a topic-partition for a given group.
Args: environment: The environment name. group_id: The ID of the consumer group. topic: The topic name. partition: The partition number. offset: The new offset value.
Returns: The result of the update operation.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment | Yes | ||
| group_id | Yes | ||
| topic | Yes | ||
| partition | Yes | ||
| offset | Yes |
Implementation Reference
- The async handler function that implements the tool logic by constructing the API endpoint and making a PUT request to update the Kafka consumer group topic partition offset using the api_client.async def update_consumer_group_topic_partition_offset( environment: str, group_id: str, topic: str, partition: int, offset: int ) -> Dict[str, Any]: """ Update the offset for a topic-partition for a given group. Args: environment: The environment name. group_id: The ID of the consumer group. topic: The topic name. partition: The partition number. offset: The new offset value. Returns: The result of the update operation. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/consumers/{group_id}/offsets/topics/{topic}/partitions/{partition}" payload = {"offset": offset} return await api_client._make_request("PUT", endpoint, json=payload)
- src/lenses_mcp/server.py:30-30 (registration)Registers all Kafka consumer groups tools, including the update_consumer_group_topic_partition_offset tool, by calling the register_kafka_consumer_groups function on the MCP instance.register_kafka_consumer_groups(mcp)