connect
Establish a connection to a CockroachDB cluster using environment variables to access cluster information and enable database operations.
Instructions
Connect to the CockroachDB cluster.
Uses configuration from environment variables (CRDB_HOST, CRDB_USER, etc.).
Returns:
Connection status and cluster information.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/cockroachdb_mcp/server.py:24-37 (handler)MCP tool handler for 'connect'. Registers the tool and executes by calling connection_manager.connect().@mcp.tool() async def connect() -> dict[str, Any]: """Connect to the CockroachDB cluster. Uses configuration from environment variables (CRDB_HOST, CRDB_USER, etc.). Returns: Connection status and cluster information. """ try: return await connection_manager.connect() except Exception as e: return {"status": "error", "error": str(e)}
- src/cockroachdb_mcp/server.py:24-24 (registration)Registration of the 'connect' tool via @mcp.tool() decorator.@mcp.tool()
- Core implementation of the connection logic in ConnectionManager.connect(), handling actual DB connection, cluster info, and state updates.async def connect(self) -> dict[str, Any]: """Connect to CockroachDB cluster. Returns: Connection status and cluster info. """ async with self._lock: if self._state.connection is not None and not self._state.connection.closed: return { "status": "already_connected", "cluster_id": self._state.cluster_id, "version": self._state.version, "database": self._state.database, "connected_at": self._state.connected_at.isoformat() if self._state.connected_at else None, } # Build connection parameters conn_params: dict[str, Any] = { "host": settings.host, "port": settings.port, "user": settings.user, "dbname": settings.database, "row_factory": dict_row, "autocommit": True, } if settings.password: conn_params["password"] = settings.password # SSL configuration if settings.sslmode != "disable": conn_params["sslmode"] = settings.sslmode if settings.sslrootcert: conn_params["sslrootcert"] = settings.sslrootcert # CockroachDB Cloud cluster option if settings.cluster: conn_params["options"] = f"--cluster={settings.cluster}" # Connect with timeout try: conn = await asyncio.wait_for( psycopg.AsyncConnection.connect(**conn_params), timeout=settings.timeout, ) # Get cluster info async with conn.cursor() as cur: await cur.execute("SELECT version()") version_row = await cur.fetchone() version = version_row["version"] if version_row else None await cur.execute("SHOW CLUSTER SETTING cluster.organization") org_row = await cur.fetchone() cluster_id = ( org_row["cluster.organization"] if org_row and org_row.get("cluster.organization") else "local" ) self._state.connection = conn self._state.connected_at = datetime.now() self._state.cluster_id = cluster_id self._state.version = version self._state.database = settings.database self._state.in_transaction = False return { "status": "connected", "cluster_id": self._state.cluster_id, "version": self._state.version, "database": self._state.database, "host": settings.host, "port": settings.port, "connected_at": self._state.connected_at.isoformat(), } except asyncio.TimeoutError as e: raise ConnectionError( f"Connection to CockroachDB timed out after {settings.timeout}s" ) from e except Exception as e: raise ConnectionError(f"Failed to connect to CockroachDB: {e}") from e