"""Pytest configuration and shared fixtures for ms-fabric-mcp-server tests."""
import asyncio
import json
import os
import sys
import time
import uuid
import pytest
import pytest_asyncio
from pathlib import Path
from unittest.mock import Mock, MagicMock, patch
from typing import Dict, Any
# Ensure local src/ is used instead of any installed package.
PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC_PATH = PROJECT_ROOT / "src"
if str(SRC_PATH) not in sys.path:
sys.path.insert(0, str(SRC_PATH))
# ============================================================================
# Environment Setup
# ============================================================================
@pytest.fixture(autouse=True)
def mock_environment(monkeypatch, request):
"""Set up test environment variables for all tests."""
if request.node.get_closest_marker("integration"):
return
# Clear any existing environment that might interfere
env_vars_to_clear = [
"APPLICATIONINSIGHTS_CONNECTION_STRING",
"OTEL_SERVICE_NAME",
"FABRIC_BASE_URL",
"FABRIC_API_TIMEOUT",
]
for var in env_vars_to_clear:
monkeypatch.delenv(var, raising=False)
# Set test defaults
monkeypatch.setenv("FABRIC_BASE_URL", "https://api.fabric.microsoft.com/v1")
monkeypatch.setenv("FABRIC_API_TIMEOUT", "30")
@pytest.fixture
def temp_repo_dir(tmp_path: Path) -> Path:
"""Create temporary repository directory for testing."""
repo_dir = tmp_path / "test_repo"
repo_dir.mkdir()
return repo_dir
@pytest.fixture
def sample_llms_txt_content() -> str:
"""Sample llms.txt content for testing."""
return """# Overview
This is the overview section.
## Architecture
This describes the architecture.
### Components
Details about components.
## Getting Started
Installation and setup instructions.
# Reference
API reference documentation.
"""
@pytest.fixture
def sample_llms_txt_file(temp_repo_dir: Path, sample_llms_txt_content: str) -> Path:
"""Create a sample llms.txt file in temp directory."""
llms_file = temp_repo_dir / "llms.txt"
llms_file.write_text(sample_llms_txt_content)
return llms_file
# ============================================================================
# Fabric Client Fixtures
# ============================================================================
@pytest.fixture
def mock_fabric_config():
"""Mock Fabric configuration."""
config = Mock()
config.BASE_URL = "https://api.fabric.microsoft.com/v1"
config.SCOPES = ["https://api.fabric.microsoft.com/.default"]
config.API_CALL_TIMEOUT = 30
config.MAX_RETRIES = 3
config.RETRY_BACKOFF = 1.0
config.LIVY_SESSION_WAIT_TIMEOUT = 300
config.LIVY_STATEMENT_WAIT_TIMEOUT = 300
config.LIVY_POLL_INTERVAL = 2
return config
@pytest.fixture
def mock_azure_credential():
"""Mock Azure DefaultAzureCredential."""
credential = Mock()
token = Mock()
token.token = "mock_access_token_12345"
token.expires_on = 9999999999
credential.get_token.return_value = token
return credential
@pytest.fixture
def mock_fabric_client(mock_fabric_config, mock_azure_credential):
"""Mock FabricClient for testing services."""
with patch("ms_fabric_mcp_server.client.http_client.DefaultAzureCredential", return_value=mock_azure_credential):
from ms_fabric_mcp_server.client.http_client import FabricClient
client = FabricClient(mock_fabric_config)
# Mock the make_api_request method
client.make_api_request = Mock()
return client
@pytest.fixture
def mock_requests_response():
"""Factory for creating mock requests.Response objects."""
def _create_response(status_code: int = 200, json_data: Dict[str, Any] = None, text: str = ""):
response = Mock()
response.status_code = status_code
response.ok = 200 <= status_code < 300
response.json.return_value = json_data or {}
response.text = text
response.headers = {}
return response
return _create_response
# ============================================================================
# Fabric Service Fixtures
# ============================================================================
@pytest.fixture
def sample_workspace_data() -> Dict[str, Any]:
"""Sample workspace data."""
return {
"id": "workspace-123",
"displayName": "Test Workspace",
"description": "A test workspace",
"type": "Workspace",
"capacityId": "capacity-456"
}
@pytest.fixture
def sample_item_data() -> Dict[str, Any]:
"""Sample item data."""
return {
"id": "item-789",
"displayName": "Test Notebook",
"type": "Notebook",
"workspaceId": "workspace-123",
"description": "A test notebook"
}
@pytest.fixture
def sample_notebook_definition() -> Dict[str, Any]:
"""Sample notebook definition."""
return {
"format": "ipynb",
"parts": [
{
"path": "notebook-content.py",
"payload": "eyJjZWxscyI6W119", # Base64 encoded {"cells":[]}
"payloadType": "InlineBase64"
}
]
}
@pytest.fixture
def sample_livy_session() -> Dict[str, Any]:
"""Sample Livy session data."""
return {
"id": 1,
"name": None,
"appId": None,
"owner": None,
"proxyUser": None,
"state": "idle",
"kind": "pyspark",
"appInfo": {
"driverLogUrl": None,
"sparkUiUrl": None
},
"log": []
}
@pytest.fixture
def sample_livy_statement() -> Dict[str, Any]:
"""Sample Livy statement data."""
return {
"id": 0,
"code": "print('hello')",
"state": "available",
"output": {
"status": "ok",
"execution_count": 0,
"data": {
"text/plain": "hello"
}
},
"progress": 1.0
}
@pytest.fixture
def sample_job_instance() -> Dict[str, Any]:
"""Sample job instance data."""
return {
"id": "job-instance-123",
"itemId": "item-789",
"jobType": "RunNotebook",
"invokeType": "Manual",
"status": "Completed",
"startTimeUtc": "2025-10-14T10:00:00Z",
"endTimeUtc": "2025-10-14T10:05:00Z"
}
# ============================================================================
# FastMCP Fixtures
# ============================================================================
@pytest.fixture
def mock_fastmcp():
"""Mock FastMCP server instance."""
mcp = Mock()
mcp.name = "test-server"
mcp.instructions = "Test instructions"
mcp.add_middleware = Mock()
# Mock the tool decorator - it accepts kwargs (like title=) and returns a decorator
def mock_tool_decorator(**kwargs):
def decorator(func):
return func
return decorator
mcp.tool = Mock(side_effect=mock_tool_decorator)
return mcp
# ============================================================================
# Marker Configuration
# ============================================================================
def pytest_configure(config):
"""Register custom markers."""
config.addinivalue_line("markers", "unit: Unit tests (fast, no external dependencies)")
config.addinivalue_line("markers", "integration: Integration tests (require live services)")
config.addinivalue_line("markers", "slow: Slow running tests")
# ============================================================================
# Integration Test Fixtures
# ============================================================================
@pytest.fixture(autouse=True)
def integration_enabled(request):
"""Gate integration tests behind marker and env var."""
if not request.node.get_closest_marker("integration"):
return
if os.getenv("FABRIC_INTEGRATION_TESTS") != "1":
pytest.skip("Integration tests require FABRIC_INTEGRATION_TESTS=1")
def get_env_or_skip(name: str, allow_empty: bool = False) -> str:
"""Return env var value or skip with a clear message."""
value = os.getenv(name)
if value is None or (not allow_empty and not value.strip()):
pytest.skip(f"Missing required environment variable: {name}")
return value
def get_env_optional(name: str) -> str | None:
"""Return env var value or None if missing."""
value = os.getenv(name)
if value is None or not value.strip():
return None
return value
def unique_name(prefix: str) -> str:
"""Generate a unique name for live test items."""
timestamp = time.strftime("%Y%m%d%H%M%S", time.gmtime())
suffix = uuid.uuid4().hex[:8]
return f"{prefix}_{timestamp}_{suffix}"
async def _poll_until(check_fn, timeout_seconds: int = 300, interval_seconds: int = 10):
"""Poll until check_fn returns a truthy value or timeout expires."""
deadline = time.monotonic() + timeout_seconds
last_result = None
while time.monotonic() < deadline:
last_result = await check_fn()
if last_result:
return last_result
await asyncio.sleep(interval_seconds)
return last_result
@pytest.fixture
def poll_until():
"""Fixture wrapper for poll_until helper."""
return _poll_until
@pytest.fixture(scope="session")
def mcp_server():
"""Create a real FastMCP server with Fabric tools registered."""
from fastmcp import FastMCP
from ms_fabric_mcp_server import register_fabric_tools
mcp = FastMCP("integration-tests")
register_fabric_tools(mcp)
return mcp
@pytest_asyncio.fixture(scope="session")
async def tool_registry(mcp_server):
"""Return the registered tool mapping once per session."""
return await mcp_server.get_tools()
@pytest.fixture(scope="session")
def call_tool_session(tool_registry):
"""Session-scoped tool invoker for shared integration setup."""
async def _call(tool_name: str, **kwargs):
tool = tool_registry.get(tool_name)
if tool is None:
raise AssertionError(f"Tool not registered: {tool_name}")
result = await tool.run(kwargs)
structured = result.structured_content
if structured is None:
raise AssertionError(f"Tool returned no structured content: {tool_name}")
return structured
return _call
@pytest.fixture
def call_tool(tool_registry):
"""Invoke a tool by name and return structured content."""
async def _call(tool_name: str, **kwargs):
tool = tool_registry.get(tool_name)
if tool is None:
raise AssertionError(f"Tool not registered: {tool_name}")
result = await tool.run(kwargs)
structured = result.structured_content
if structured is None:
raise AssertionError(f"Tool returned no structured content: {tool_name}")
return structured
return _call
@pytest.fixture(scope="session")
def workspace_name_session():
"""Configured workspace display name (session scope)."""
return get_env_or_skip("FABRIC_TEST_WORKSPACE_NAME")
@pytest.fixture(scope="session")
def lakehouse_name_session():
"""Configured lakehouse display name (session scope)."""
return get_env_or_skip("FABRIC_TEST_LAKEHOUSE_NAME")
@pytest.fixture
def workspace_name():
"""Configured workspace display name."""
return get_env_or_skip("FABRIC_TEST_WORKSPACE_NAME")
@pytest.fixture
def lakehouse_name():
"""Configured lakehouse display name."""
return get_env_or_skip("FABRIC_TEST_LAKEHOUSE_NAME")
@pytest.fixture
def sql_database():
"""Configured SQL database name."""
return get_env_or_skip("FABRIC_TEST_SQL_DATABASE")
@pytest.fixture
def notebook_fixture_path() -> Path:
"""Path to the minimal notebook fixture."""
return PROJECT_ROOT / "tests" / "fixtures" / "minimal_notebook.ipynb"
@pytest_asyncio.fixture
async def workspace_id(call_tool, workspace_name):
"""Resolve workspace ID from display name."""
result = await call_tool("list_workspaces")
if result.get("status") != "success":
raise AssertionError(f"Failed to list workspaces: {result}")
for workspace in result.get("workspaces", []):
if workspace.get("display_name") == workspace_name:
return workspace.get("id")
pytest.skip(f"Workspace not found: {workspace_name}")
@pytest_asyncio.fixture
async def lakehouse_id(call_tool, workspace_name, lakehouse_name):
"""Resolve lakehouse ID from display name."""
result = await call_tool("list_items", workspace_name=workspace_name, item_type="Lakehouse")
if result.get("status") != "success":
raise AssertionError(f"Failed to list lakehouses: {result}")
for item in result.get("items", []):
if item.get("display_name") == lakehouse_name:
return item.get("id")
pytest.skip(f"Lakehouse not found: {lakehouse_name}")
@pytest.fixture
def pipeline_copy_inputs():
"""Optional pipeline copy inputs from env vars."""
source_connection_id = get_env_optional("FABRIC_TEST_SOURCE_CONNECTION_ID")
source_type = get_env_optional("FABRIC_TEST_SOURCE_TYPE")
source_schema = get_env_optional("FABRIC_TEST_SOURCE_SCHEMA")
source_table = get_env_optional("FABRIC_TEST_SOURCE_TABLE")
dest_connection_id = get_env_optional("FABRIC_TEST_DEST_CONNECTION_ID")
dest_table = get_env_optional("FABRIC_TEST_DEST_TABLE_NAME") or source_table
if not all([
source_connection_id,
source_type,
source_schema,
source_table,
dest_connection_id,
dest_table,
]):
return None
return {
"source_connection_id": source_connection_id,
"source_type": source_type,
"source_schema": source_schema,
"source_table": source_table,
"destination_connection_id": dest_connection_id,
"destination_table": dest_table,
}
@pytest.fixture
def pipeline_copy_sql_inputs():
"""Optional pipeline copy inputs for SQL fallback mode."""
source_connection_id = get_env_optional("FABRIC_TEST_SOURCE_SQL_CONNECTION_ID")
source_schema = get_env_optional("FABRIC_TEST_SOURCE_SCHEMA")
source_table = get_env_optional("FABRIC_TEST_SOURCE_TABLE")
dest_connection_id = get_env_optional("FABRIC_TEST_DEST_CONNECTION_ID")
dest_table = get_env_optional("FABRIC_TEST_DEST_TABLE_NAME") or source_table
source_sql_query = get_env_optional("FABRIC_TEST_SOURCE_SQL_QUERY")
if not all([
source_connection_id,
source_schema,
source_table,
dest_connection_id,
dest_table,
]):
return None
return {
"source_connection_id": source_connection_id,
"source_schema": source_schema,
"source_table": source_table,
"destination_connection_id": dest_connection_id,
"destination_table": dest_table,
"source_sql_query": source_sql_query,
}
@pytest.fixture
def dataflow_name():
"""Optional dataflow name for pipeline integration tests."""
return get_env_optional("FABRIC_TEST_DATAFLOW_NAME")
def _parse_semantic_model_columns(raw: str | None, env_name: str) -> list[dict] | None:
if not raw:
return None
try:
data = json.loads(raw)
except json.JSONDecodeError as exc:
raise AssertionError(f"{env_name} must be valid JSON: {exc}")
if not isinstance(data, list):
raise AssertionError(f"{env_name} must be a JSON list")
if not data:
raise AssertionError(f"{env_name} must not be empty")
return data
@pytest.fixture
def semantic_model_table():
"""Optional semantic model table name for integration tests."""
return get_env_optional("FABRIC_TEST_SEMANTIC_MODEL_TABLE")
@pytest.fixture
def semantic_model_columns():
"""Optional semantic model columns (JSON) for integration tests."""
return _parse_semantic_model_columns(
get_env_optional("FABRIC_TEST_SEMANTIC_MODEL_COLUMNS"),
"FABRIC_TEST_SEMANTIC_MODEL_COLUMNS",
)
@pytest.fixture
def semantic_model_table_2():
"""Optional second semantic model table name for relationship tests."""
return get_env_optional("FABRIC_TEST_SEMANTIC_MODEL_TABLE_2")
@pytest.fixture
def semantic_model_columns_2():
"""Optional second semantic model columns (JSON) for relationship tests."""
return _parse_semantic_model_columns(
get_env_optional("FABRIC_TEST_SEMANTIC_MODEL_COLUMNS_2"),
"FABRIC_TEST_SEMANTIC_MODEL_COLUMNS_2",
)
@pytest.fixture
def sql_dependencies_available(tool_registry):
"""Skip SQL tests if dependencies or tools are unavailable."""
pyodbc = pytest.importorskip("pyodbc")
drivers = [driver.lower() for driver in pyodbc.drivers()]
if not any("odbc driver" in driver and "sql server" in driver for driver in drivers):
pytest.skip("SQL tests require a SQL Server ODBC driver")
if "get_sql_endpoint" not in tool_registry:
pytest.skip("SQL tools are not registered (pyodbc missing?)")
return True
@pytest.fixture
def delete_item_if_exists(call_tool, workspace_name):
"""Delete an item and ignore not-found errors."""
async def _delete(item_display_name: str, item_type: str):
result = await call_tool(
"delete_item",
workspace_name=workspace_name,
item_display_name=item_display_name,
item_type=item_type,
)
if result.get("status") == "error":
message = (result.get("message") or "").lower()
if "not found" not in message:
raise AssertionError(f"Failed to delete {item_type} '{item_display_name}': {result}")
return result
return _delete
@pytest_asyncio.fixture(scope="session")
async def executed_notebook_context(
call_tool_session,
workspace_name_session,
lakehouse_name_session,
):
"""Provision and execute a notebook once for integration tests."""
notebook_name = unique_name("e2e_notebook_shared")
job_instance_id = None
location_url = None
async def _get_content():
result = await call_tool_session(
"get_notebook_content",
workspace_name=workspace_name_session,
notebook_display_name=notebook_name,
)
if result.get("status") == "success":
return result
message = (result.get("message") or "").lower()
if "not found" in message or "notfound" in message:
return None
return result
def _is_transient_job_error(result: dict) -> bool:
message = (result.get("message") or "").lower()
return any(token in message for token in ("not found", "404", "does not exist", "not yet"))
async def _wait_for_job():
status_result = await call_tool_session(
"get_job_status",
workspace_name=workspace_name_session,
item_name=notebook_name,
item_type="Notebook",
job_instance_id=job_instance_id,
)
if status_result.get("status") != "success":
if _is_transient_job_error(status_result):
return None
return status_result
job = status_result.get("job", {})
if job.get("is_terminal"):
return status_result
return None
try:
import_result = await call_tool_session(
"import_notebook_to_fabric",
workspace_name=workspace_name_session,
notebook_display_name=notebook_name,
local_notebook_path=str(PROJECT_ROOT / "tests" / "fixtures" / "minimal_notebook.ipynb"),
)
assert import_result["status"] == "success"
content_result = await _poll_until(_get_content, timeout_seconds=300, interval_seconds=10)
assert content_result is not None
assert content_result["status"] == "success"
attach_result = await call_tool_session(
"attach_lakehouse_to_notebook",
workspace_name=workspace_name_session,
notebook_name=notebook_name,
lakehouse_name=lakehouse_name_session,
)
assert attach_result["status"] == "success"
run_result = await call_tool_session(
"run_on_demand_job",
workspace_name=workspace_name_session,
item_name=notebook_name,
item_type="Notebook",
job_type="RunNotebook",
)
assert run_result["status"] == "success"
job_instance_id = run_result.get("job_instance_id")
location_url = run_result.get("location_url")
assert job_instance_id
assert location_url
status_result = await _poll_until(_wait_for_job, timeout_seconds=1800, interval_seconds=15)
assert status_result is not None
assert status_result["status"] == "success"
job = status_result.get("job", {})
assert job.get("is_terminal")
assert job.get("is_successful"), f"Job failed: {job.get('failure_reason')}"
yield {
"notebook_name": notebook_name,
"job_instance_id": job_instance_id,
"location_url": location_url,
}
finally:
result = await call_tool_session(
"delete_item",
workspace_name=workspace_name_session,
item_display_name=notebook_name,
item_type="Notebook",
)
if result.get("status") == "error":
message = (result.get("message") or "").lower()
if "not found" not in message:
raise AssertionError(f"Failed to delete Notebook '{notebook_name}': {result}")