Skip to main content
Glama
conftest.py7.3 kB
""" Shared fixtures and utilities for MCP server integration tests. """ from __future__ import annotations import asyncio import json import os import sys from collections.abc import AsyncIterator from contextlib import asynccontextmanager from pathlib import Path from typing import Any import pytest from mcp import ClientSession, StdioServerParameters, stdio_client PROJECT_ROOT = Path(__file__).resolve().parents[1] SRC_DIR = PROJECT_ROOT / "src" # Tools we expect to be registered by the server EXPECTED_TOOLS = { "get_buckets_in_cluster", "get_server_configuration_status", "test_cluster_connection", "get_scopes_and_collections_in_bucket", "get_collections_in_scope", "get_scopes_in_bucket", "get_document_by_id", "upsert_document_by_id", "delete_document_by_id", "get_schema_for_collection", "run_sql_plus_plus_query", "get_index_advisor_recommendations", "list_indexes", "get_cluster_health_and_services", # Performance analysis tools "get_longest_running_queries", "get_most_frequent_queries", "get_queries_with_largest_response_sizes", "get_queries_with_large_result_count", "get_queries_using_primary_index", "get_queries_not_using_covering_index", "get_queries_not_selective", } # Tools organized by category for validation TOOLS_BY_CATEGORY = { "server": { "get_server_configuration_status", "test_cluster_connection", "get_buckets_in_cluster", "get_scopes_in_bucket", "get_scopes_and_collections_in_bucket", "get_collections_in_scope", "get_cluster_health_and_services", }, "kv": { "get_document_by_id", "upsert_document_by_id", "delete_document_by_id", }, "query": { "get_schema_for_collection", "run_sql_plus_plus_query", }, "index": { "list_indexes", "get_index_advisor_recommendations", }, "performance": { "get_longest_running_queries", "get_most_frequent_queries", "get_queries_with_largest_response_sizes", "get_queries_with_large_result_count", "get_queries_using_primary_index", "get_queries_not_using_covering_index", "get_queries_not_selective", }, } # Expected required parameters for tools that need them TOOL_REQUIRED_PARAMS = { "get_scopes_in_bucket": ["bucket_name"], "get_scopes_and_collections_in_bucket": ["bucket_name"], "get_collections_in_scope": ["bucket_name", "scope_name"], "get_document_by_id": [ "bucket_name", "scope_name", "collection_name", "document_id", ], "upsert_document_by_id": [ "bucket_name", "scope_name", "collection_name", "document_id", "document_content", ], "delete_document_by_id": [ "bucket_name", "scope_name", "collection_name", "document_id", ], "get_schema_for_collection": ["bucket_name", "scope_name", "collection_name"], "run_sql_plus_plus_query": ["bucket_name", "scope_name", "query"], "get_index_advisor_recommendations": ["bucket_name", "scope_name", "query"], } # Minimum configuration needed to talk to a demo cluster REQUIRED_ENV_VARS = ("CB_CONNECTION_STRING", "CB_USERNAME", "CB_PASSWORD") # Default timeout (seconds) to guard against hangs when the Couchbase cluster # is unreachable or slow. Override with CB_MCP_TEST_TIMEOUT if needed. DEFAULT_TIMEOUT = int(os.getenv("CB_MCP_TEST_TIMEOUT", "120")) def _build_env() -> dict[str, str]: """Build the environment passed to the test server process.""" env = os.environ.copy() missing = [var for var in REQUIRED_ENV_VARS if not env.get(var)] if missing: pytest.skip( "Integration tests require demo cluster credentials. " f"Missing env vars: {', '.join(missing)}" ) # Ensure the server module can be imported from the repo's src/ folder existing_path = env.get("PYTHONPATH") env["PYTHONPATH"] = ( f"{SRC_DIR}{os.pathsep}{existing_path}" if existing_path else str(SRC_DIR) ) # Force stdio transport for the test server to match stdio_client env["CB_MCP_TRANSPORT"] = "stdio" # Ensure unbuffered output to avoid stdout/stderr buffering surprises env.setdefault("PYTHONUNBUFFERED", "1") return env @asynccontextmanager async def create_mcp_session() -> AsyncIterator[ClientSession]: """Create a fresh MCP client session connected to the server over stdio.""" env = _build_env() params = StdioServerParameters( command=sys.executable, args=["-m", "mcp_server"], env=env, ) async with asyncio.timeout(DEFAULT_TIMEOUT): async with stdio_client(params) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() yield session def extract_payload(response: Any) -> Any: """Extract a usable payload from a tool response. MCP tool responses can return data in different formats: - A single content block with JSON-encoded data (dict, list, etc.) - Multiple content blocks, one per list item (for list returns) This function handles both cases. """ content = getattr(response, "content", None) or [] if not content: return None # If there are multiple content blocks, collect them all as a list # (each item in a list return may be a separate content block) if len(content) > 1: items = [] for block in content: text = getattr(block, "text", None) if text is not None: try: items.append(json.loads(text)) except json.JSONDecodeError: items.append(text) return items if items else None # Single content block - try to parse as JSON first = content[0] raw = getattr(first, "text", None) if raw is None and hasattr(first, "data"): raw = first.data if isinstance(raw, str): try: return json.loads(raw) except json.JSONDecodeError: return raw return raw def get_test_bucket() -> str | None: """Get the test bucket name from environment, or None if not set.""" return os.getenv("CB_MCP_TEST_BUCKET") def get_test_scope() -> str: """Get the test scope name from environment, defaults to _default.""" return os.getenv("CB_MCP_TEST_SCOPE", "_default") def get_test_collection() -> str: """Get the test collection name from environment, defaults to _default.""" return os.getenv("CB_MCP_TEST_COLLECTION", "_default") def require_test_bucket() -> str: """Get the test bucket name, skipping test if not set.""" bucket = get_test_bucket() if not bucket: pytest.skip("CB_MCP_TEST_BUCKET not set") return bucket def ensure_list(value: Any) -> list[Any]: """Ensure the value is a list. MCP can return single-item lists as just the item (not wrapped in a list). This helper wraps single non-list values in a list for consistent handling. """ if value is None: return [] if isinstance(value, list): return value return [value]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Couchbase-Ecosystem/mcp-server-couchbase'

If you have feedback or need assistance with the MCP directory API, please join our Discord server