list_databases
Retrieve a list of available databases in Timeplus for streaming data platforms like Apache Kafka and Pulsar.
Instructions
List available Timeplus databases
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- mcp_timeplus/mcp_server.py:42-49 (handler)The `list_databases` function is the core handler. It is decorated with @mcp.tool(), creates a Timeplus client, runs 'SHOW DATABASES', and returns the result.
@mcp.tool() def list_databases(): """List available Timeplus databases""" logger.info("Listing all databases") client = create_timeplus_client() result = client.command("SHOW DATABASES") logger.info(f"Found {len(result) if isinstance(result, list) else 1} databases") return result - mcp_timeplus/mcp_server.py:42-42 (registration)The `@mcp.tool()` decorator registers `list_databases` as an MCP tool on the FastMCP server instance.
@mcp.tool() - mcp_timeplus/mcp_server.py:222-240 (helper)The `create_timeplus_client` helper function is called by `list_databases` to establish a connection to Timeplus.
def create_timeplus_client(): client_config = config.get_client_config() logger.info( f"Creating Timeplus client connection to {client_config['host']}:{client_config['port']} " f"as {client_config['username']} " f"(secure={client_config['secure']}, verify={client_config['verify']}, " f"connect_timeout={client_config['connect_timeout']}s, " f"send_receive_timeout={client_config['send_receive_timeout']}s)" ) try: client = timeplus_connect.get_client(**client_config) # Test the connection version = client.server_version logger.info(f"Successfully connected to Timeplus server version {version}") return client except Exception as e: logger.error(f"Failed to connect to Timeplus: {str(e)}") raise - mcp_timeplus/mcp_server.py:42-49 (schema)The function has no parameters (no input schema needed) and returns the raw result (a list of database names) from the SHOW DATABASES command.
@mcp.tool() def list_databases(): """List available Timeplus databases""" logger.info("Listing all databases") client = create_timeplus_client() result = client.command("SHOW DATABASES") logger.info(f"Found {len(result) if isinstance(result, list) else 1} databases") return result