from typing import Any, Dict, List, Optional
from clients.http_client import api_client
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
"""
Registers all topic and dataset operations with the MCP server.
"""
def register_topics(mcp: FastMCP):
# ================
# TOPIC OPERATIONS
# ================
@mcp.tool()
async def list_topics(environment: str) -> List[Dict[str, Any]]:
"""
Retrieve information about all topics.
Args:
environment: The environment name.
Returns:
List of all topics with detailed information.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/topics"
return await api_client._make_request("GET", endpoint)
@mcp.tool()
async def get_topic(environment: str, topic_name: str) -> Dict[str, Any]:
"""
Retrieve information about a specific topic.
Args:
environment: The environment name.
topic_name: Name of the topic.
Returns:
Detailed topic information including partitions, consumers, config, etc.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/topics/{topic_name}"
return await api_client._make_request("GET", endpoint)
@mcp.tool()
async def get_topic_partitions(environment: str, topic_name: str) -> Dict[str, Any]:
"""
Retrieve detailed partition information including messages and bytes (v2 endpoint).
Args:
environment: The environment name.
topic_name: Name of the topic.
Returns:
Partition details with message counts, bytes, and JMX timestamp.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/v2/topics/{topic_name}/partitions"
return await api_client._make_request("GET", endpoint)
@mcp.tool()
async def create_topic(
environment: str,
topic_name: str,
partitions: int = 1,
replication: int = 1,
configs: Optional[Dict[str, str]] = None
) -> str:
"""
Creates a new Kafka topic with optional configuration.
Args:
environment: The environment name.
topic_name: Topic name.
partitions: Number of partitions (default: 1).
replication: Replication factor (default: 1).
configs: Topic configurations.
Returns:
Creation result.
"""
payload = {
"topicName": topic_name,
"partitions": partitions,
"replication": replication
}
if configs:
payload["configs"] = configs
else:
payload["configs"] = {}
endpoint = f"/api/v1/environments/{environment}/proxy/api/topics"
try:
return await api_client._make_request("POST", endpoint, payload)
except Exception as e:
raise ToolError(f"Topic creation failed: {e}")
@mcp.tool()
async def create_topic_with_schema(
environment: str,
name: str,
partitions: int = 1,
replication: int = 1,
configs: Optional[Dict[str, str]] = None,
key_format: Optional[str] = None,
key_schema: Optional[str] = None,
value_format: Optional[str] = None,
value_schema: Optional[str] = None
) -> Dict[str, Any]:
"""
Creates a new Kafka topic with optional format and schema configuration.
Args:
environment: The environment name.
name: Topic name.
partitions: Number of partitions (default: 1).
replication: Replication factor (default: 1).
configs: Topic configurations.
key_format: Key format (AVRO, JSON, CSV, XML, INT, LONG, STRING, BYTES, etc.).
key_schema: Key schema (required for AVRO, JSON, CSV, XML).
value_format: Value format.
value_schema: Value schema.
Returns:
Creation result.
"""
payload = {
"name": name,
"partitions": partitions,
"replication": replication
}
if configs:
payload["configs"] = configs
else:
payload["configs"] = {}
if key_format or value_format:
format_config = {}
if key_format:
format_config["key"] = {"format": key_format}
if key_schema:
format_config["key"]["schema"] = key_schema
if value_format:
format_config["value"] = {"format": value_format}
if value_schema:
format_config["value"]["schema"] = value_schema
payload["format"] = format_config
endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/kafka/topic"
try:
return await api_client._make_request("POST", endpoint, payload)
except Exception as e:
raise ToolError(f"Topic creation failed: {e}")
@mcp.tool()
async def update_topic_config(
environment: str,
topic_name: str,
configs: List[Dict[str, str]]
) -> str:
"""
Update topic configuration.
Args:
environment: The environment name.
topic_name: Name of the topic.
configs: List of config key-value pairs [{"key": "retention.ms", "value": "86400000"}].
Returns:
Success message.
"""
payload = {"configs": configs}
endpoint = f"/api/v1/environments/{environment}/proxy/api/configs/topics/{topic_name}"
return await api_client._make_request("PUT", endpoint, payload)
@mcp.tool()
async def get_topic_broker_configs(environment: str, topic_name: str) -> List[Dict[str, Any]]:
"""
Get broker configurations for a topic.
Args:
environment: The environment name.
topic_name: Name of the topic.
Returns:
List of broker configuration details.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/topics/{topic_name}/brokerConfigs"
return await api_client._make_request("GET", endpoint)
@mcp.tool()
async def add_topic_partitions(
environment: str,
topic_name: str,
partitions: int
) -> Dict[str, Any]:
"""
Add partitions to an existing topic.
Args:
environment: The environment name.
topic_name: Name of the topic.
partitions: New total number of partitions.
Returns:
Updated partition count.
"""
payload = {"partitions": partitions}
endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/kafka/topics/{topic_name}/partitions"
return await api_client._make_request("PUT", endpoint, payload)
@mcp.tool()
async def resend_message(
environment: str,
topic_name: str,
partition: int,
offset: int
) -> Dict[str, Any]:
"""
Resend a Kafka message.
Args:
environment: The environment name.
topic_name: Name of the topic.
partition: Kafka partition number.
offset: Kafka offset.
Returns:
Resend operation result with partition and offset.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/topics/{topic_name}/{partition}/{offset}/resend"
return await api_client._make_request("PUT", endpoint)
# =========================
# TOPIC METADATA OPERATIONS
# =========================
@mcp.tool()
async def list_topic_metadata(environment: str) -> List[Dict[str, Any]]:
"""
List all topic metadata.
Args:
environment: The environment name.
Returns:
List of topic metadata including schemas and descriptions.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/metadata/topics"
return await api_client._make_request("GET", endpoint)
@mcp.tool()
async def get_topic_metadata(environment: str, topic_name: str) -> Dict[str, Any]:
"""
Get metadata for a specific topic.
Args:
environment: The environment name.
topic_name: Name of the topic.
Returns:
Topic metadata including schema information and tags.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/metadata/topics/{topic_name}"
return await api_client._make_request("GET", endpoint)
@mcp.tool()
async def update_topic_metadata(
environment: str,
metadata: Dict[str, Any]
) -> str:
f"""
Update topic metadata. The required parameters are: topicName, keyType and valueType.
When updating tags, it is not a list of strings.
It is a list of objects with parameter 'name', e.g. [{{'name':'tag1'}},{{'name':'tag2'}}]
Args:
environment: The environment name.
topic_name: Name of the topic.
configs: Metadata key-value pairs with the following value types:
{
"topicName": "text",
"keyType": "text",
"valueType": "text",
"keySchema": "text",
"keySchemaVersion": 1,
"keySchemaInlined": "text",
"valueSchema": "text",
"valueSchemaVersion": 1,
"valueSchemaInlined": "text",
"description": "text",
"tags": [
{{
"name": "text"
}}
],
"additionalInfo": null
}
Returns:
Success message.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/metadata/topics"
return await api_client._make_request("POST", endpoint, metadata)
# ========================
# KAFKA DATASET OPERATIONS
# ========================
@mcp.tool()
async def list_datasets(
environment: str,
page: int = 1,
page_size: int = 25,
search: Optional[str] = None,
connections: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
sort_field: Optional[str] = None,
sort_order: str = "asc",
include_system: bool = False,
search_fields: bool = True,
schema_format: Optional[str] = None,
has_records: Optional[bool] = None,
is_compacted: Optional[bool] = None
) -> Dict[str, Any]:
"""
Retrieves a paginated list of datasets (topics and other data sources).
Args:
environment: The environment name.
page: Page number (default: 1).
page_size: Items per page (default: 25).
search: Search keyword for dataset, fields and description.
connections: List of connection names to filter by.
tags: List of tag names to filter by.
sort_field: Field to sort results by.
sort_order: Sorting order - "asc" or "desc" (default: "asc").
include_system: Include system entities (default: False).
search_fields: Search field names/documentation (default: True).
schema_format: Schema format filter for SchemaRegistrySubject.
has_records: Filter based on whether dataset has records.
is_compacted: Filter based on compacted status (Kafka only).
Returns:
Paginated list of datasets with source types.
"""
params = {
"page": page,
"pageSize": page_size,
"sortOrder": sort_order,
"includeSystemEntities": include_system,
"searchFields": search_fields
}
if search:
params["search"] = search
if connections:
params["connections"] = connections
if tags:
params["tags"] = tags
if sort_field:
params["sortField"] = sort_field
if schema_format:
params["schemaFormat"] = schema_format
if has_records is not None:
params["hasRecords"] = has_records
if is_compacted is not None:
params["isCompacted"] = is_compacted
# Build query string
query_params = []
for key, value in params.items():
if isinstance(value, list):
for item in value:
query_params.append(f"{key}={item}")
else:
query_params.append(f"{key}={value}")
query_string = "&".join(query_params)
endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets?{query_string}"
return await api_client._make_request("GET", endpoint)
@mcp.tool()
async def get_dataset(environment: str, connection: str, dataset: str) -> Dict[str, Any]:
"""
Get a single dataset by connection/name.
Args:
environment: The environment name.
connection: The connection name (e.g., "kafka").
dataset: The dataset name.
Returns:
Dataset details including fields, policies, permissions, and metadata.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets/{connection}/{dataset}"
return await api_client._make_request("GET", endpoint)
@mcp.tool()
async def get_dataset_message_metrics(environment: str, entity_name: str) -> List[Dict[str, Any]]:
"""
Get ranged metrics for a dataset's messages.
Args:
environment: The environment name.
entity_name: The dataset's entity name.
Returns:
List of message metrics with date and message count.
"""
endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets/kafka/{entity_name}/messages/metrics"
return await api_client._make_request("GET", endpoint)
@mcp.tool()
async def update_dataset_topic_description(
environment: str,
topic_name: str,
description: Optional[str] = None
) -> Dict[str, Any]:
"""
Update topic description (in metadata).
Args:
environment: The environment name.
topic_name: Name of the topic.
description: The description of the topic.
Returns:
Success message.
"""
# The description cannot be an empty string so if it is, replace with a null value
description_payload = { "description": description if description else None }
endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets/kafka/{topic_name}/description"
return await api_client._make_request("PUT", endpoint, description_payload)
@mcp.tool()
async def update_dataset_topic_tags(
environment: str,
topic_name: str,
tags: List[str]
) -> Dict[str, Any]:
"""
Update topic tags (in metadata).
Args:
environment: The environment name.
topic_name: Name of the topic.
tags: List of tag names.
Returns:
Success message.
"""
tags_payload = {
"tags": [{"name": tag_name} for tag_name in tags]
}
endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets/kafka/{topic_name}/tags"
return await api_client._make_request("PUT", endpoint, tags_payload)