Skip to main content
Glama
topics.py16.1 kB
from typing import Any, Dict, List, Optional from clients.http_client import api_client from fastmcp import FastMCP from fastmcp.exceptions import ToolError """ Registers all topic and dataset operations with the MCP server. """ def register_topics(mcp: FastMCP): # ================ # TOPIC OPERATIONS # ================ @mcp.tool() async def list_topics(environment: str) -> List[Dict[str, Any]]: """ Retrieve information about all topics. Args: environment: The environment name. Returns: List of all topics with detailed information. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/topics" return await api_client._make_request("GET", endpoint) @mcp.tool() async def get_topic(environment: str, topic_name: str) -> Dict[str, Any]: """ Retrieve information about a specific topic. Args: environment: The environment name. topic_name: Name of the topic. Returns: Detailed topic information including partitions, consumers, config, etc. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/topics/{topic_name}" return await api_client._make_request("GET", endpoint) @mcp.tool() async def get_topic_partitions(environment: str, topic_name: str) -> Dict[str, Any]: """ Retrieve detailed partition information including messages and bytes (v2 endpoint). Args: environment: The environment name. topic_name: Name of the topic. Returns: Partition details with message counts, bytes, and JMX timestamp. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/v2/topics/{topic_name}/partitions" return await api_client._make_request("GET", endpoint) @mcp.tool() async def create_topic( environment: str, topic_name: str, partitions: int = 1, replication: int = 1, configs: Optional[Dict[str, str]] = None ) -> str: """ Creates a new Kafka topic with optional configuration. Args: environment: The environment name. topic_name: Topic name. partitions: Number of partitions (default: 1). replication: Replication factor (default: 1). configs: Topic configurations. Returns: Creation result. """ payload = { "topicName": topic_name, "partitions": partitions, "replication": replication } if configs: payload["configs"] = configs else: payload["configs"] = {} endpoint = f"/api/v1/environments/{environment}/proxy/api/topics" try: return await api_client._make_request("POST", endpoint, payload) except Exception as e: raise ToolError(f"Topic creation failed: {e}") @mcp.tool() async def create_topic_with_schema( environment: str, name: str, partitions: int = 1, replication: int = 1, configs: Optional[Dict[str, str]] = None, key_format: Optional[str] = None, key_schema: Optional[str] = None, value_format: Optional[str] = None, value_schema: Optional[str] = None ) -> Dict[str, Any]: """ Creates a new Kafka topic with optional format and schema configuration. Args: environment: The environment name. name: Topic name. partitions: Number of partitions (default: 1). replication: Replication factor (default: 1). configs: Topic configurations. key_format: Key format (AVRO, JSON, CSV, XML, INT, LONG, STRING, BYTES, etc.). key_schema: Key schema (required for AVRO, JSON, CSV, XML). value_format: Value format. value_schema: Value schema. Returns: Creation result. """ payload = { "name": name, "partitions": partitions, "replication": replication } if configs: payload["configs"] = configs else: payload["configs"] = {} if key_format or value_format: format_config = {} if key_format: format_config["key"] = {"format": key_format} if key_schema: format_config["key"]["schema"] = key_schema if value_format: format_config["value"] = {"format": value_format} if value_schema: format_config["value"]["schema"] = value_schema payload["format"] = format_config endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/kafka/topic" try: return await api_client._make_request("POST", endpoint, payload) except Exception as e: raise ToolError(f"Topic creation failed: {e}") @mcp.tool() async def update_topic_config( environment: str, topic_name: str, configs: List[Dict[str, str]] ) -> str: """ Update topic configuration. Args: environment: The environment name. topic_name: Name of the topic. configs: List of config key-value pairs [{"key": "retention.ms", "value": "86400000"}]. Returns: Success message. """ payload = {"configs": configs} endpoint = f"/api/v1/environments/{environment}/proxy/api/configs/topics/{topic_name}" return await api_client._make_request("PUT", endpoint, payload) @mcp.tool() async def get_topic_broker_configs(environment: str, topic_name: str) -> List[Dict[str, Any]]: """ Get broker configurations for a topic. Args: environment: The environment name. topic_name: Name of the topic. Returns: List of broker configuration details. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/topics/{topic_name}/brokerConfigs" return await api_client._make_request("GET", endpoint) @mcp.tool() async def add_topic_partitions( environment: str, topic_name: str, partitions: int ) -> Dict[str, Any]: """ Add partitions to an existing topic. Args: environment: The environment name. topic_name: Name of the topic. partitions: New total number of partitions. Returns: Updated partition count. """ payload = {"partitions": partitions} endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/kafka/topics/{topic_name}/partitions" return await api_client._make_request("PUT", endpoint, payload) @mcp.tool() async def resend_message( environment: str, topic_name: str, partition: int, offset: int ) -> Dict[str, Any]: """ Resend a Kafka message. Args: environment: The environment name. topic_name: Name of the topic. partition: Kafka partition number. offset: Kafka offset. Returns: Resend operation result with partition and offset. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/topics/{topic_name}/{partition}/{offset}/resend" return await api_client._make_request("PUT", endpoint) # ========================= # TOPIC METADATA OPERATIONS # ========================= @mcp.tool() async def list_topic_metadata(environment: str) -> List[Dict[str, Any]]: """ List all topic metadata. Args: environment: The environment name. Returns: List of topic metadata including schemas and descriptions. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/metadata/topics" return await api_client._make_request("GET", endpoint) @mcp.tool() async def get_topic_metadata(environment: str, topic_name: str) -> Dict[str, Any]: """ Get metadata for a specific topic. Args: environment: The environment name. topic_name: Name of the topic. Returns: Topic metadata including schema information and tags. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/metadata/topics/{topic_name}" return await api_client._make_request("GET", endpoint) @mcp.tool() async def update_topic_metadata( environment: str, metadata: Dict[str, Any] ) -> str: f""" Update topic metadata. The required parameters are: topicName, keyType and valueType. When updating tags, it is not a list of strings. It is a list of objects with parameter 'name', e.g. [{{'name':'tag1'}},{{'name':'tag2'}}] Args: environment: The environment name. topic_name: Name of the topic. configs: Metadata key-value pairs with the following value types: { "topicName": "text", "keyType": "text", "valueType": "text", "keySchema": "text", "keySchemaVersion": 1, "keySchemaInlined": "text", "valueSchema": "text", "valueSchemaVersion": 1, "valueSchemaInlined": "text", "description": "text", "tags": [ {{ "name": "text" }} ], "additionalInfo": null } Returns: Success message. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/metadata/topics" return await api_client._make_request("POST", endpoint, metadata) # ======================== # KAFKA DATASET OPERATIONS # ======================== @mcp.tool() async def list_datasets( environment: str, page: int = 1, page_size: int = 25, search: Optional[str] = None, connections: Optional[List[str]] = None, tags: Optional[List[str]] = None, sort_field: Optional[str] = None, sort_order: str = "asc", include_system: bool = False, search_fields: bool = True, schema_format: Optional[str] = None, has_records: Optional[bool] = None, is_compacted: Optional[bool] = None ) -> Dict[str, Any]: """ Retrieves a paginated list of datasets (topics and other data sources). Args: environment: The environment name. page: Page number (default: 1). page_size: Items per page (default: 25). search: Search keyword for dataset, fields and description. connections: List of connection names to filter by. tags: List of tag names to filter by. sort_field: Field to sort results by. sort_order: Sorting order - "asc" or "desc" (default: "asc"). include_system: Include system entities (default: False). search_fields: Search field names/documentation (default: True). schema_format: Schema format filter for SchemaRegistrySubject. has_records: Filter based on whether dataset has records. is_compacted: Filter based on compacted status (Kafka only). Returns: Paginated list of datasets with source types. """ params = { "page": page, "pageSize": page_size, "sortOrder": sort_order, "includeSystemEntities": include_system, "searchFields": search_fields } if search: params["search"] = search if connections: params["connections"] = connections if tags: params["tags"] = tags if sort_field: params["sortField"] = sort_field if schema_format: params["schemaFormat"] = schema_format if has_records is not None: params["hasRecords"] = has_records if is_compacted is not None: params["isCompacted"] = is_compacted # Build query string query_params = [] for key, value in params.items(): if isinstance(value, list): for item in value: query_params.append(f"{key}={item}") else: query_params.append(f"{key}={value}") query_string = "&".join(query_params) endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets?{query_string}" return await api_client._make_request("GET", endpoint) @mcp.tool() async def get_dataset(environment: str, connection: str, dataset: str) -> Dict[str, Any]: """ Get a single dataset by connection/name. Args: environment: The environment name. connection: The connection name (e.g., "kafka"). dataset: The dataset name. Returns: Dataset details including fields, policies, permissions, and metadata. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets/{connection}/{dataset}" return await api_client._make_request("GET", endpoint) @mcp.tool() async def get_dataset_message_metrics(environment: str, entity_name: str) -> List[Dict[str, Any]]: """ Get ranged metrics for a dataset's messages. Args: environment: The environment name. entity_name: The dataset's entity name. Returns: List of message metrics with date and message count. """ endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets/kafka/{entity_name}/messages/metrics" return await api_client._make_request("GET", endpoint) @mcp.tool() async def update_dataset_topic_description( environment: str, topic_name: str, description: Optional[str] = None ) -> Dict[str, Any]: """ Update topic description (in metadata). Args: environment: The environment name. topic_name: Name of the topic. description: The description of the topic. Returns: Success message. """ # The description cannot be an empty string so if it is, replace with a null value description_payload = { "description": description if description else None } endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets/kafka/{topic_name}/description" return await api_client._make_request("PUT", endpoint, description_payload) @mcp.tool() async def update_dataset_topic_tags( environment: str, topic_name: str, tags: List[str] ) -> Dict[str, Any]: """ Update topic tags (in metadata). Args: environment: The environment name. topic_name: Name of the topic. tags: List of tag names. Returns: Success message. """ tags_payload = { "tags": [{"name": tag_name} for tag_name in tags] } endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets/kafka/{topic_name}/tags" return await api_client._make_request("PUT", endpoint, tags_payload)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stereosky/lenses-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server