Skip to main content
Glama

mcp-timeplus

by timeplus-io
mcp_server.py10.8 kB
import logging from typing import Sequence import concurrent.futures import atexit import timeplus_connect from timeplus_connect.driver.binding import quote_identifier, format_query_value from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP MCP_SERVER_NAME = "mcp-timeplus" from mcp_timeplus.mcp_env import config from mcp_timeplus.prompt_template import TEMPLATE import json, os, time from confluent_kafka.admin import (AdminClient) from confluent_kafka import Consumer # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(MCP_SERVER_NAME) QUERY_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10) atexit.register(lambda: QUERY_EXECUTOR.shutdown(wait=True)) SELECT_QUERY_TIMEOUT_SECS = 30 load_dotenv() deps = [ "timeplus-connect", "python-dotenv", "uvicorn", "confluent-kafka", "pip-system-certs", ] mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps) @mcp.tool() def list_databases(): """List available Timeplus databases""" logger.info("Listing all databases") client = create_timeplus_client() result = client.command("SHOW DATABASES") logger.info(f"Found {len(result) if isinstance(result, list) else 1} databases") return result @mcp.tool() def list_tables(database: str = 'default', like: str = None): """List available tables/streams in the given database""" logger.info(f"Listing tables in database '{database}'") client = create_timeplus_client() query = f"SHOW STREAMS FROM {quote_identifier(database)}" if like: query += f" LIKE {format_query_value(like)}" result = client.command(query) # Get all table comments in one query table_comments_query = f"SELECT name, comment FROM system.tables WHERE database = {format_query_value(database)}" table_comments_result = client.query(table_comments_query) table_comments = {row[0]: row[1] for row in table_comments_result.result_rows} # Get all column comments in one query column_comments_query = f"SELECT table, name, comment FROM system.columns WHERE database = {format_query_value(database)}" column_comments_result = client.query(column_comments_query) column_comments = {} for row in column_comments_result.result_rows: table, col_name, comment = row if table not in column_comments: column_comments[table] = {} column_comments[table][col_name] = comment def get_table_info(table): logger.info(f"Getting schema info for table {database}.{table}") schema_query = f"DESCRIBE STREAM {quote_identifier(database)}.{quote_identifier(table)}" schema_result = client.query(schema_query) columns = [] column_names = schema_result.column_names for row in schema_result.result_rows: column_dict = {} for i, col_name in enumerate(column_names): column_dict[col_name] = row[i] # Add comment from our pre-fetched comments if table in column_comments and column_dict['name'] in column_comments[table]: column_dict['comment'] = column_comments[table][column_dict['name']] else: column_dict['comment'] = None columns.append(column_dict) create_table_query = f"SHOW CREATE STREAM {database}.`{table}`" create_table_result = client.command(create_table_query) return { "database": database, "name": table, "comment": table_comments.get(table), # "columns": columns, # exclude columns in the output since it's too verbose, the DDL below has enough information "create_table_query": create_table_result, } tables = [] if isinstance(result, str): # Single table result for table in (t.strip() for t in result.split()): if table: tables.append(get_table_info(table)) elif isinstance(result, Sequence): # Multiple table results for table in result: tables.append(get_table_info(table)) logger.info(f"Found {len(tables)} tables") return tables def execute_query(query: str): client = create_timeplus_client() try: readonly = 1 if config.readonly else 0 res = client.query(query, settings={"readonly": readonly}) column_names = res.column_names rows = [] for row in res.result_rows: row_dict = {} for i, col_name in enumerate(column_names): row_dict[col_name] = row[i] rows.append(row_dict) logger.info(f"Query returned {len(rows)} rows") return rows except Exception as err: logger.error(f"Error executing query: {err}") # Return a structured dictionary rather than a string to ensure proper serialization # by the MCP protocol. String responses for errors can cause BrokenResourceError. return {"error": str(err)} @mcp.tool() def run_sql(query: str): """Run a query in a Timeplus database""" logger.info(f"Executing query: {query}") try: future = QUERY_EXECUTOR.submit(execute_query, query) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_query if isinstance(result, dict) and "error" in result: logger.warning(f"Query failed: {result['error']}") # MCP requires structured responses; string error messages can cause # serialization issues leading to BrokenResourceError return {"status": "error", "message": f"Query failed: {result['error']}"} return result except concurrent.futures.TimeoutError: logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}") future.cancel() # Return a properly structured response for timeout errors return {"status": "error", "message": f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds"} except Exception as e: logger.error(f"Unexpected error in run_select_query: {str(e)}") # Catch all other exceptions and return them in a structured format # to prevent MCP serialization failures return {"status": "error", "message": f"Unexpected error: {str(e)}"} @mcp.prompt() def generate_sql(requirements: str) -> str: return f"Please generate Timeplus SQL for the requirement:\n\n{requirements}\n\nMake sure following the guide {TEMPLATE}" @mcp.tool() def list_kafka_topics(): logger.info("Listing all topics in the Kafka cluster") admin_client = AdminClient(json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG'])) topics = admin_client.list_topics(timeout=10).topics topics_array = [] for topic, detail in topics.items(): topic_info = {"topic": topic, "partitions": len(detail.partitions)} topics_array.append(topic_info) return topics_array @mcp.tool() def explore_kafka_topic(topic: str, message_count: int = 1): logger.info(f"Consuming topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) conf['group.id'] = f"mcp-{time.time()}" client = Consumer(conf) client.subscribe([topic]) messages = [] for i in range(message_count): logger.info(f"Consuming message {i+1}") message = client.poll() if message is None: logger.info("No message received") continue if message.error(): logger.error(f"Error consuming message: {message.error()}") continue else: logger.info(f"Received message {i+1}") messages.append(json.loads(message.value())) client.close() return messages @mcp.tool() def create_kafka_stream(topic: str): logger.info(f"Creating Kafka externalstream for topic {topic}") conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']) ext_stream=f"ext_stream_{topic}" sql=f"""CREATE EXTERNAL STREAM {ext_stream} (raw string) SETTINGS type='kafka',brokers='{conf['bootstrap.servers']}',topic='{topic}',security_protocol='{conf['security.protocol']}',sasl_mechanism='{conf['sasl.mechanism']}',username='{conf['sasl.username']}',password='{conf['sasl.password']}',skip_ssl_cert_check=true """ run_sql(sql) logger.info("External Stream created") sql=f"CREATE MATERIALIZED VIEW {topic} AS SELECT raw from {ext_stream}" run_sql(sql) logger.info("MATERIALIZED VIEW created") return f"Materialized the Kafka data as {topic}" def create_timeplus_client(): client_config = config.get_client_config() logger.info( f"Creating Timeplus client connection to {client_config['host']}:{client_config['port']} " f"as {client_config['username']} " f"(secure={client_config['secure']}, verify={client_config['verify']}, " f"connect_timeout={client_config['connect_timeout']}s, " f"send_receive_timeout={client_config['send_receive_timeout']}s)" ) try: client = timeplus_connect.get_client(**client_config) # Test the connection version = client.server_version logger.info(f"Successfully connected to Timeplus server version {version}") return client except Exception as e: logger.error(f"Failed to connect to Timeplus: {str(e)}") raise @mcp.tool() def connect_to_apache_iceberg(iceberg_db: str,aws_account_id: int, s3_bucket: str, aws_region: str="us-west-2",is_s3_table_bucket: bool=False): """Create a Timeplus database in iceberg type to connect to Iceberg""" if iceberg_db is None or aws_account_id is None or s3_bucket is None: return {"error": "iceberg_db, aws_account_id, and s3_bucket are required"} logger.info("Creating Iceberg database") warehouse=aws_account_id storage_endpoint=f"https://{s3_bucket}.s3.{aws_region}.amazonaws.com" if is_s3_table_bucket: warehouse=f"{aws_account_id}:s3tablescatalog/{s3_bucket}" storage_endpoint=f"https://s3tables.{aws_region}.amazonaws.com/{s3_bucket}" sql=f"""CREATE DATABASE {iceberg_db} SETTINGS type='iceberg',catalog_uri='https://glue.{aws_region}.amazonaws.com/iceberg',catalog_type='rest', warehouse='{warehouse}',storage_endpoint='{storage_endpoint}',use_environment_credentials=true, rest_catalog_sigv4_enabled=true,rest_catalog_signing_region='{aws_region}',rest_catalog_signing_name='glue' """ run_sql(sql) logger.info("Iceberg database created") sql=f"SHOW STREAMS FROM {iceberg_db}" return run_sql(sql)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timeplus-io/mcp-timeplus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server