Skip to main content
Glama

list_tables

Retrieve available tables or streams from a specified database in Timeplus, enabling quick access to data sources for streaming analytics.

Instructions

List available tables/streams in the given database

Input Schema

NameRequiredDescriptionDefault
databaseNodefault
likeNo

Input Schema (JSON Schema)

{ "properties": { "database": { "default": "default", "title": "Database", "type": "string" }, "like": { "default": null, "title": "Like", "type": "string" } }, "title": "list_tablesArguments", "type": "object" }

Implementation Reference

  • The core handler function for the 'list_tables' tool. Decorated with @mcp.tool() for registration in FastMCP. Lists tables/streams in a Timeplus database, fetches comments, schema details, and CREATE statements.
    @mcp.tool() def list_tables(database: str = 'default', like: str = None): """List available tables/streams in the given database""" logger.info(f"Listing tables in database '{database}'") client = create_timeplus_client() query = f"SHOW STREAMS FROM {quote_identifier(database)}" if like: query += f" LIKE {format_query_value(like)}" result = client.command(query) # Get all table comments in one query table_comments_query = f"SELECT name, comment FROM system.tables WHERE database = {format_query_value(database)}" table_comments_result = client.query(table_comments_query) table_comments = {row[0]: row[1] for row in table_comments_result.result_rows} # Get all column comments in one query column_comments_query = f"SELECT table, name, comment FROM system.columns WHERE database = {format_query_value(database)}" column_comments_result = client.query(column_comments_query) column_comments = {} for row in column_comments_result.result_rows: table, col_name, comment = row if table not in column_comments: column_comments[table] = {} column_comments[table][col_name] = comment def get_table_info(table): logger.info(f"Getting schema info for table {database}.{table}") schema_query = f"DESCRIBE STREAM {quote_identifier(database)}.{quote_identifier(table)}" schema_result = client.query(schema_query) columns = [] column_names = schema_result.column_names for row in schema_result.result_rows: column_dict = {} for i, col_name in enumerate(column_names): column_dict[col_name] = row[i] # Add comment from our pre-fetched comments if table in column_comments and column_dict['name'] in column_comments[table]: column_dict['comment'] = column_comments[table][column_dict['name']] else: column_dict['comment'] = None columns.append(column_dict) create_table_query = f"SHOW CREATE STREAM {database}.`{table}`" create_table_result = client.command(create_table_query) return { "database": database, "name": table, "comment": table_comments.get(table), # "columns": columns, # exclude columns in the output since it's too verbose, the DDL below has enough information "create_table_query": create_table_result, } tables = [] if isinstance(result, str): # Single table result for table in (t.strip() for t in result.split()): if table: tables.append(get_table_info(table)) elif isinstance(result, Sequence): # Multiple table results for table in result: tables.append(get_table_info(table)) logger.info(f"Found {len(tables)} tables") return tables
  • Re-exports the list_tables function from mcp_server.py, making it available for import in tests and elsewhere.
    from .mcp_server import ( create_timeplus_client, list_databases, list_tables, run_sql, list_kafka_topics, explore_kafka_topic, create_kafka_stream, generate_sql, connect_to_apache_iceberg, ) __all__ = [ "list_databases", "list_tables", "run_sql", "create_timeplus_client", "list_kafka_topics", "explore_kafka_topic", "create_kafka_stream", "generate_sql", "connect_to_apache_iceberg", ]
  • Helper function used by list_tables to create Timeplus client connections.
    def create_timeplus_client(): client_config = config.get_client_config() logger.info( f"Creating Timeplus client connection to {client_config['host']}:{client_config['port']} " f"as {client_config['username']} " f"(secure={client_config['secure']}, verify={client_config['verify']}, " f"connect_timeout={client_config['connect_timeout']}s, " f"send_receive_timeout={client_config['send_receive_timeout']}s)" ) try: client = timeplus_connect.get_client(**client_config) # Test the connection version = client.server_version logger.info(f"Successfully connected to Timeplus server version {version}") return client except Exception as e: logger.error(f"Failed to connect to Timeplus: {str(e)}") raise

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timeplus-io/mcp-timeplus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server