list_databases
Retrieve available Timeplus databases to identify and select streaming data sources for querying and analysis.
Instructions
List available Timeplus databases
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- mcp_timeplus/mcp_server.py:42-49 (handler)The handler function for the 'list_databases' tool. It is decorated with @mcp.tool(), which registers it with the FastMCP server. The function connects to Timeplus using create_timeplus_client() and executes 'SHOW DATABASES' to list available databases.@mcp.tool() def list_databases(): """List available Timeplus databases""" logger.info("Listing all databases") client = create_timeplus_client() result = client.command("SHOW DATABASES") logger.info(f"Found {len(result) if isinstance(result, list) else 1} databases") return result
- mcp_timeplus/mcp_server.py:222-241 (helper)Helper function used by the list_databases tool (and others) to create a configured Timeplus client connection from environment variables.def create_timeplus_client(): client_config = config.get_client_config() logger.info( f"Creating Timeplus client connection to {client_config['host']}:{client_config['port']} " f"as {client_config['username']} " f"(secure={client_config['secure']}, verify={client_config['verify']}, " f"connect_timeout={client_config['connect_timeout']}s, " f"send_receive_timeout={client_config['send_receive_timeout']}s)" ) try: client = timeplus_connect.get_client(**client_config) # Test the connection version = client.server_version logger.info(f"Successfully connected to Timeplus server version {version}") return client except Exception as e: logger.error(f"Failed to connect to Timeplus: {str(e)}") raise
- mcp_timeplus/mcp_server.py:42-49 (registration)The @mcp.tool() decorator registers the list_databases function as an MCP tool with the FastMCP server instance.@mcp.tool() def list_databases(): """List available Timeplus databases""" logger.info("Listing all databases") client = create_timeplus_client() result = client.command("SHOW DATABASES") logger.info(f"Found {len(result) if isinstance(result, list) else 1} databases") return result