Skip to main content
Glama

run_sql

Execute SQL queries on Timeplus, a database optimized for streaming data from sources like Apache Kafka or Pulsar, to analyze and process real-time data efficiently.

Instructions

Run a query in a Timeplus database

Input Schema

NameRequiredDescriptionDefault
queryYes

Input Schema (JSON Schema)

{ "properties": { "query": { "title": "Query", "type": "string" } }, "required": [ "query" ], "title": "run_sqlArguments", "type": "object" }

Implementation Reference

  • The primary handler function for the 'run_sql' tool. It submits the SQL query to a thread pool executor, handles timeouts and errors with structured responses compatible with MCP protocol.
    @mcp.tool() def run_sql(query: str): """Run a query in a Timeplus database""" logger.info(f"Executing query: {query}") try: future = QUERY_EXECUTOR.submit(execute_query, query) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_query if isinstance(result, dict) and "error" in result: logger.warning(f"Query failed: {result['error']}") # MCP requires structured responses; string error messages can cause # serialization issues leading to BrokenResourceError return {"status": "error", "message": f"Query failed: {result['error']}"} return result except concurrent.futures.TimeoutError: logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}") future.cancel() # Return a properly structured response for timeout errors return {"status": "error", "message": f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds"} except Exception as e: logger.error(f"Unexpected error in run_select_query: {str(e)}") # Catch all other exceptions and return them in a structured format # to prevent MCP serialization failures return {"status": "error", "message": f"Unexpected error: {str(e)}"}
  • Supporting helper function that performs the actual query execution using the Timeplus client, formats results into list of dicts, and returns structured error on failure.
    def execute_query(query: str): client = create_timeplus_client() try: readonly = 1 if config.readonly else 0 res = client.query(query, settings={"readonly": readonly}) column_names = res.column_names rows = [] for row in res.result_rows: row_dict = {} for i, col_name in enumerate(column_names): row_dict[col_name] = row[i] rows.append(row_dict) logger.info(f"Query returned {len(rows)} rows") return rows except Exception as err: logger.error(f"Error executing query: {err}") # Return a structured dictionary rather than a string to ensure proper serialization # by the MCP protocol. String responses for errors can cause BrokenResourceError. return {"error": str(err)}
  • Exports the run_sql function (imported from mcp_server.py) as part of the package public API, facilitating its use and registration in the MCP server.
    from .mcp_server import ( create_timeplus_client, list_databases, list_tables, run_sql, list_kafka_topics, explore_kafka_topic, create_kafka_stream, generate_sql, connect_to_apache_iceberg, ) __all__ = [ "list_databases", "list_tables", "run_sql", "create_timeplus_client", "list_kafka_topics", "explore_kafka_topic", "create_kafka_stream", "generate_sql", "connect_to_apache_iceberg", ]

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timeplus-io/mcp-timeplus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server