switch_database
Change the active database context in CockroachDB to execute queries and operations on a specific database.
Instructions
Switch the active database context.
Args:
database_name: Database to switch to.
Returns:
Switch status.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| database_name | Yes |
Implementation Reference
- src/cockroachdb_mcp/server.py:357-371 (handler)MCP tool handler for 'switch_database', decorated with @mcp.tool(). Delegates to connection_manager.switch_database.@mcp.tool() async def switch_database(database_name: str) -> dict[str, Any]: """Switch the active database context. Args: database_name: Database to switch to. Returns: Switch status. """ try: return await connection_manager.switch_database(database_name) except Exception as e: return {"status": "error", "error": str(e)}
- Core logic for switching the active database connection in ConnectionManager, handling reconnection with new dbname.async def switch_database(self, database: str) -> dict[str, Any]: """Switch to a different database. Args: database: Database name to switch to. Returns: Switch status. """ # Check if database is blocked if database in settings.blocked_databases_list: return {"status": "error", "error": f"Database '{database}' is blocked"} async with self._lock: if self._state.connection is not None: try: await self._state.connection.close() except Exception: pass # Reconnect with new database old_database = settings.database # Temporarily modify settings for reconnection # Note: This is a workaround; in production, use a new connection self._state = ConnectionState() # Create new connection to the target database conn_params: dict[str, Any] = { "host": settings.host, "port": settings.port, "user": settings.user, "dbname": database, "row_factory": dict_row, "autocommit": True, } if settings.password: conn_params["password"] = settings.password if settings.sslmode != "disable": conn_params["sslmode"] = settings.sslmode if settings.sslrootcert: conn_params["sslrootcert"] = settings.sslrootcert if settings.cluster: conn_params["options"] = f"--cluster={settings.cluster}" try: conn = await asyncio.wait_for( psycopg.AsyncConnection.connect(**conn_params), timeout=settings.timeout, ) async with self._lock: self._state.connection = conn self._state.connected_at = datetime.now() self._state.database = database self._state.in_transaction = False return { "status": "switched", "previous_database": old_database, "current_database": database, } except Exception as e: return {"status": "error", "error": str(e)}