"""Tests for the Fastn MCP server tool handlers.
Each tool handler is tested by mocking the SDK client methods to verify:
1. Correct SDK method is called with correct arguments
2. Success responses are properly formatted
3. SDK exceptions are mapped to correct MCP error codes
"""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from fastn.exceptions import (
APIError,
AuthError,
ConfigError,
ConnectorNotFoundError,
FastnError,
FlowNotFoundError,
)
import fastn_mcp.server as _server_mod
from fastn_mcp.server import (
_handle_create_flow as create_flow,
_handle_delete_flow as delete_flow,
_handle_update_flow as update_flow,
_handle_run_flow as run_flow,
_handle_execute_tool as execute_tool,
_handle_list_flows as list_flows,
_handle_find_tools as find_tools,
_handle_list_connectors as list_connectors,
_handle_configure_custom_auth as configure_custom_auth,
_handle_list_skills as list_skills,
_handle_list_projects as list_projects,
handle_call_tool,
handle_list_tools,
UCL_TOOL_NAMES,
TOOLS,
)
# Mode is set at startup via CLI flags. We test the filtering logic
# indirectly through handle_list_tools + module-level variables.
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_result(result) -> dict:
"""Parse a TextContent result list into a dict."""
assert len(result) == 1
return json.loads(result[0].text)
def _mock_client(**namespace_mocks):
"""Create a mock AsyncFastnClient with configurable namespace mocks.
Usage:
client = _mock_client(
flows_create=AsyncMock(return_value={"flow_id": "flow_abc"}),
flows_delete=AsyncMock(side_effect=FlowNotFoundError("xyz")),
)
"""
client = MagicMock()
client.close = AsyncMock()
# Flows namespace
client.flows = MagicMock()
client.flows.create = namespace_mocks.get("flows_create", AsyncMock(return_value={}))
client.flows.delete = namespace_mocks.get("flows_delete", AsyncMock(return_value={}))
client.flows.run = namespace_mocks.get("flows_run", AsyncMock(return_value={}))
client.flows.get_run = namespace_mocks.get("flows_get_run", AsyncMock(return_value={}))
client.flows.list = namespace_mocks.get("flows_list", AsyncMock(return_value=[]))
client.flows.update = namespace_mocks.get("flows_update", AsyncMock(return_value={}))
# Connections namespace
client.connections = MagicMock()
client.connections.initiate = namespace_mocks.get("connections_initiate", AsyncMock(return_value={}))
client.connections.status = namespace_mocks.get("connections_status", AsyncMock(return_value={}))
# Connectors catalog (async — calls GraphQL API)
client.connectors = MagicMock()
client.connectors.list = namespace_mocks.get("connectors_list", AsyncMock(return_value=[]))
# Skills namespace
client.skills = MagicMock()
client.skills.list = namespace_mocks.get("skills_list", AsyncMock(return_value=[]))
# Projects namespace
client.projects = MagicMock()
client.projects.list = namespace_mocks.get("projects_list", AsyncMock(return_value=[]))
# configure_custom_auth
client.configure_custom_auth = namespace_mocks.get("configure_custom_auth", AsyncMock(return_value={}))
# execute (used by execute_tool and run_flow)
client.execute = namespace_mocks.get("execute", AsyncMock(return_value={}))
# Registry (used by get_tool_actions to look up connector IDs)
client._registry = namespace_mocks.get("registry", {"connectors": {}})
# Config (used by delete_flow for project_id)
client._config = namespace_mocks.get("config", MagicMock())
client._config.resolve_project_id = MagicMock(return_value="workspace_123")
return client
# ---------------------------------------------------------------------------
# create_flow tests
# ---------------------------------------------------------------------------
class TestCreateFlow:
@pytest.mark.asyncio
async def test_returns_under_development(self):
"""create_flow returns UNDER_DEVELOPMENT status."""
result = await create_flow({"prompt": "Sync HubSpot contacts"})
data = _parse_result(result)
assert data["error"] == "UNDER_DEVELOPMENT"
assert "execute_tool" in data["message"]
# ---------------------------------------------------------------------------
# delete_flow tests
# ---------------------------------------------------------------------------
class TestDeleteFlow:
@pytest.mark.asyncio
async def test_success(self):
"""delete_flow calls GraphQL deleteModelSchema and returns confirmation."""
client = _mock_client()
mock_gql_response = {"deleteModelSchema": True}
with patch("fastn_mcp.server._get_client", return_value=client), \
patch("fastn._http._gql_call_async", new_callable=AsyncMock,
return_value=mock_gql_response):
result = await delete_flow({"flow_id": "flow_abc"})
data = _parse_result(result)
assert data["deleted"] is True
assert data["flow_id"] == "flow_abc"
@pytest.mark.asyncio
async def test_missing_flow_id(self):
"""delete_flow returns MISSING_PARAM when flow_id not provided."""
result = await delete_flow({})
data = _parse_result(result)
assert data["error"] == "MISSING_PARAM"
@pytest.mark.asyncio
async def test_auth_error_propagates(self):
"""delete_flow propagates AuthError to centralized handler."""
client = _mock_client()
with patch("fastn_mcp.server._get_client", return_value=client), \
patch("fastn._http._gql_call_async", new_callable=AsyncMock,
side_effect=AuthError("Token expired")), \
pytest.raises(AuthError):
await delete_flow({"flow_id": "flow_abc"})
@pytest.mark.asyncio
async def test_closes_client(self):
"""delete_flow always closes the client even on error."""
client = _mock_client()
with patch("fastn_mcp.server._get_client", return_value=client), \
patch("fastn._http._gql_call_async", new_callable=AsyncMock,
side_effect=FastnError("boom")), \
pytest.raises(FastnError):
await delete_flow({"flow_id": "flow_abc"})
client.close.assert_called_once()
# ---------------------------------------------------------------------------
# update_flow tests
# ---------------------------------------------------------------------------
class TestUpdateFlow:
@pytest.mark.asyncio
async def test_returns_under_development(self):
"""update_flow returns UNDER_DEVELOPMENT status."""
result = await update_flow({"flow_id": "flow_abc", "prompt": "change something"})
data = _parse_result(result)
assert data["error"] == "UNDER_DEVELOPMENT"
# ---------------------------------------------------------------------------
# run_flow tests
# ---------------------------------------------------------------------------
class TestRunFlow:
@pytest.mark.asyncio
async def test_success(self):
"""run_flow executes via client.execute() and returns result."""
client = _mock_client(
execute=AsyncMock(return_value={"status": "ok", "data": [1, 2, 3]}),
)
with patch("fastn_mcp.server._get_client", return_value=client):
result = await run_flow({"flow_id": "flow_abc"})
data = _parse_result(result)
assert data["status"] == "ok"
client.execute.assert_called_once_with(
tool="flow_abc",
params={},
)
@pytest.mark.asyncio
async def test_with_parameters(self):
"""run_flow passes parameters to execute()."""
client = _mock_client(
execute=AsyncMock(return_value={"status": "ok"}),
)
with patch("fastn_mcp.server._get_client", return_value=client):
await run_flow({"flow_id": "flow_abc", "parameters": {"key": "value"}})
client.execute.assert_called_once_with(
tool="flow_abc",
params={"key": "value"},
)
@pytest.mark.asyncio
async def test_missing_flow_id(self):
"""run_flow returns MISSING_PARAM when flow_id not provided."""
result = await run_flow({})
data = _parse_result(result)
assert data["error"] == "MISSING_PARAM"
@pytest.mark.asyncio
async def test_auth_error_propagates(self):
"""run_flow propagates AuthError to centralized handler."""
client = _mock_client(
execute=AsyncMock(side_effect=AuthError("Token expired")),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(AuthError):
await run_flow({"flow_id": "flow_abc"})
@pytest.mark.asyncio
async def test_closes_client(self):
"""run_flow always closes the client even on error."""
client = _mock_client(
execute=AsyncMock(side_effect=FastnError("boom")),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(FastnError):
await run_flow({"flow_id": "flow_abc"})
client.close.assert_called_once()
# ---------------------------------------------------------------------------
# execute_tool tests
# ---------------------------------------------------------------------------
class TestExecuteAction:
@pytest.mark.asyncio
async def test_success(self):
"""execute_tool calls client.execute() and returns result."""
client = _mock_client(
execute=AsyncMock(return_value={"message_id": "msg_123", "ok": True}),
)
with patch("fastn_mcp.server._get_client", return_value=client):
result = await execute_tool({
"action_id": "act_slack_send",
"parameters": {"channel": "general", "text": "Hello"},
})
data = _parse_result(result)
assert data["ok"] is True
assert data["message_id"] == "msg_123"
client.execute.assert_called_once_with(
tool="act_slack_send",
params={"channel": "general", "text": "Hello"},
connection_id=None,
)
@pytest.mark.asyncio
async def test_with_connection_id(self):
"""execute_tool passes connection_id to client.execute()."""
client = _mock_client(
execute=AsyncMock(return_value={"ok": True}),
)
with patch("fastn_mcp.server._get_client", return_value=client):
await execute_tool({
"action_id": "act_slack_send",
"parameters": {"channel": "general"},
"connection_id": "conn_abc",
})
client.execute.assert_called_once_with(
tool="act_slack_send",
params={"channel": "general"},
connection_id="conn_abc",
)
@pytest.mark.asyncio
async def test_missing_action_id(self):
"""execute_tool returns MISSING_PARAM when action_id not provided."""
result = await execute_tool({"parameters": {"key": "value"}})
data = _parse_result(result)
assert data["error"] == "MISSING_PARAM"
@pytest.mark.asyncio
async def test_invalid_parameters(self):
"""execute_tool returns INVALID_PARAM when parameters is not a dict."""
result = await execute_tool({
"action_id": "act_test",
"parameters": "not_a_dict",
})
data = _parse_result(result)
assert data["error"] == "INVALID_PARAM"
@pytest.mark.asyncio
async def test_auth_error_propagates(self):
"""execute_tool propagates AuthError to centralized handler."""
client = _mock_client(
execute=AsyncMock(side_effect=AuthError("Token expired")),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(AuthError):
await execute_tool({"action_id": "act_test", "parameters": {}})
@pytest.mark.asyncio
async def test_api_error_propagates(self):
"""execute_tool propagates APIError to centralized handler."""
client = _mock_client(
execute=AsyncMock(side_effect=APIError("API failed", status_code=500)),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(APIError):
await execute_tool({"action_id": "act_test", "parameters": {}})
@pytest.mark.asyncio
async def test_closes_client(self):
"""execute_tool always closes the client even on error."""
client = _mock_client(
execute=AsyncMock(side_effect=FastnError("boom")),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(FastnError):
await execute_tool({
"action_id": "act_test",
"parameters": {},
})
client.close.assert_called_once()
# ---------------------------------------------------------------------------
# list_flows tests
# ---------------------------------------------------------------------------
class TestListFlows:
@pytest.mark.asyncio
async def test_success(self):
"""list_flows returns flows with total count."""
client = _mock_client(
flows_list=AsyncMock(return_value=[
{"flow_id": "flow_1", "status": "active"},
{"flow_id": "flow_2", "status": "paused"},
]),
)
with patch("fastn_mcp.server._get_client", return_value=client):
result = await list_flows({})
data = _parse_result(result)
assert data["total"] == 2
assert len(data["flows"]) == 2
@pytest.mark.asyncio
async def test_with_status_filter(self):
"""list_flows passes status filter to SDK."""
client = _mock_client(
flows_list=AsyncMock(return_value=[
{"flow_id": "flow_1", "status": "active"},
]),
)
with patch("fastn_mcp.server._get_client", return_value=client):
result = await list_flows({"status": "active"})
client.flows.list.assert_called_once_with(status="active")
@pytest.mark.asyncio
async def test_config_error_propagates(self):
"""list_flows propagates ConfigError to centralized handler."""
client = _mock_client(
flows_list=AsyncMock(side_effect=ConfigError("Missing project_id")),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(ConfigError):
await list_flows({})
# ---------------------------------------------------------------------------
# find_tools tests
# ---------------------------------------------------------------------------
class TestFindTools:
@pytest.mark.asyncio
async def test_success(self):
"""find_tools returns relevant tools via getTools API."""
mock_tools = [
{"name": "send_message", "description": "Send a Slack message", "actionId": "act_1"},
{"name": "list_channels", "description": "List Slack channels", "actionId": "act_2"},
]
client = _mock_client()
client.get_tools_for = AsyncMock(return_value=mock_tools)
with patch("fastn_mcp.server._get_client", return_value=client):
result = await find_tools({"prompt": "send a slack message"})
data = _parse_result(result)
assert data["total"] == 2
assert len(data["tools"]) == 2
client.get_tools_for.assert_called_once_with(prompt="send a slack message", format="raw", limit=5)
@pytest.mark.asyncio
async def test_missing_prompt(self):
"""find_tools returns MISSING_PARAM when prompt not provided."""
result = await find_tools({})
data = _parse_result(result)
assert data["error"] == "MISSING_PARAM"
@pytest.mark.asyncio
async def test_auth_error_propagates(self):
"""find_tools propagates AuthError to centralized handler."""
client = _mock_client()
client.get_tools_for = AsyncMock(side_effect=AuthError("Token expired"))
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(AuthError):
await find_tools({"prompt": "send email"})
@pytest.mark.asyncio
async def test_closes_client(self):
"""find_tools always closes the client even on error."""
client = _mock_client()
client.get_tools_for = AsyncMock(side_effect=FastnError("boom"))
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(FastnError):
await find_tools({"prompt": "test"})
client.close.assert_called_once()
# ---------------------------------------------------------------------------
# list_connectors tests
# ---------------------------------------------------------------------------
class TestListConnectors:
@pytest.mark.asyncio
async def test_success_returns_all(self):
"""list_connectors returns all connectors with connect_url."""
mock_connectors = [
{"name": "slack", "display_name": "Slack", "category": "messaging", "tool_count": 12},
{"name": "jira", "display_name": "Jira", "category": "project-management", "tool_count": 8},
]
client = _mock_client(connectors_list=AsyncMock(return_value=mock_connectors))
with patch("fastn_mcp.server._get_client", return_value=client):
result = await list_connectors({})
data = _parse_result(result)
assert data["total"] == 2
assert len(data["connectors"]) == 2
assert data["connectors"][0]["name"] == "slack"
assert data["connectors"][1]["name"] == "jira"
# Each connector has a deep link with query param for the connector name
assert "app.ucl.dev" in data["connectors"][0]["connect_url"]
assert "workspace_123" in data["connectors"][0]["connect_url"]
assert "open=managetools" in data["connectors"][0]["connect_url"]
assert "query=Slack" in data["connectors"][0]["connect_url"]
assert "query=Jira" in data["connectors"][1]["connect_url"]
# Only name + connect_url — no extra fields
assert set(data["connectors"][0].keys()) == {"name", "connect_url"}
@pytest.mark.asyncio
async def test_filter_by_query(self):
"""list_connectors filters connectors by query."""
mock_connectors = [
{"name": "slack", "display_name": "Slack", "category": "messaging", "tool_count": 12},
{"name": "jira", "display_name": "Jira", "category": "project-management", "tool_count": 8},
{"name": "gmail", "display_name": "Gmail", "category": "email", "tool_count": 5},
]
client = _mock_client(connectors_list=AsyncMock(return_value=mock_connectors))
with patch("fastn_mcp.server._get_client", return_value=client):
result = await list_connectors({"query": "jira"})
data = _parse_result(result)
assert data["total"] == 1
assert data["connectors"][0]["name"] == "jira"
@pytest.mark.asyncio
async def test_filter_by_display_name(self):
"""list_connectors matches query against display_name too."""
mock_connectors = [
{"name": "ms_teams", "display_name": "Microsoft Teams", "category": "messaging", "tool_count": 6},
]
client = _mock_client(connectors_list=AsyncMock(return_value=mock_connectors))
with patch("fastn_mcp.server._get_client", return_value=client):
result = await list_connectors({"query": "Microsoft"})
data = _parse_result(result)
assert data["total"] == 1
assert data["connectors"][0]["name"] == "ms_teams"
@pytest.mark.asyncio
async def test_auth_error_propagates(self):
"""list_connectors propagates AuthError to centralized handler."""
client = _mock_client(connectors_list=AsyncMock(side_effect=AuthError("Token expired")))
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(AuthError):
await list_connectors({})
@pytest.mark.asyncio
async def test_closes_client(self):
"""list_connectors always closes the client even on error."""
client = _mock_client(connectors_list=AsyncMock(side_effect=FastnError("boom")))
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(FastnError):
await list_connectors({})
client.close.assert_called_once()
# ---------------------------------------------------------------------------
# list_skills tests
# ---------------------------------------------------------------------------
class TestListSkills:
@pytest.mark.asyncio
async def test_success_returns_all(self):
"""list_skills returns all skills with id, name, description."""
mock_skills = [
{"id": "sk_001", "projectId": "proj_abc", "name": "Onboarding", "description": "Customer onboarding workflow", "createdAt": "2025-01-15", "updatedAt": "2025-01-20"},
{"id": "sk_002", "projectId": "proj_abc", "name": "Incident Response", "description": "Handle production incidents", "createdAt": "2025-02-01", "updatedAt": "2025-02-05"},
]
client = _mock_client(skills_list=AsyncMock(return_value=mock_skills))
with patch("fastn_mcp.server._get_client", return_value=client):
result = await list_skills({})
data = _parse_result(result)
assert data["total"] == 2
assert len(data["skills"]) == 2
assert data["skills"][0]["id"] == "sk_001"
assert data["skills"][0]["name"] == "Onboarding"
assert data["skills"][0]["description"] == "Customer onboarding workflow"
assert data["skills"][1]["name"] == "Incident Response"
# Only id, name, description — no extra fields
assert set(data["skills"][0].keys()) == {"id", "name", "description"}
@pytest.mark.asyncio
async def test_empty(self):
"""list_skills returns empty list when no skills exist."""
client = _mock_client(skills_list=AsyncMock(return_value=[]))
with patch("fastn_mcp.server._get_client", return_value=client):
result = await list_skills({})
data = _parse_result(result)
assert data["total"] == 0
assert data["skills"] == []
@pytest.mark.asyncio
async def test_auth_error_propagates(self):
"""list_skills propagates AuthError to centralized handler."""
client = _mock_client(skills_list=AsyncMock(side_effect=AuthError("Token expired")))
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(AuthError):
await list_skills({})
@pytest.mark.asyncio
async def test_closes_client(self):
"""list_skills always closes the client even on error."""
client = _mock_client(skills_list=AsyncMock(side_effect=FastnError("boom")))
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(FastnError):
await list_skills({})
client.close.assert_called_once()
# ---------------------------------------------------------------------------
# configure_custom_auth tests
# ---------------------------------------------------------------------------
class TestConfigureCustomAuth:
@pytest.mark.asyncio
async def test_success(self):
"""configure_custom_auth sends issuer config and returns confirmation."""
client = _mock_client(
configure_custom_auth=AsyncMock(return_value={"configured": True}),
)
with patch("fastn_mcp.server._get_client", return_value=client):
result = await configure_custom_auth({
"issuer_url": "https://myapp.auth0.com/",
"jwks_uri": "https://myapp.auth0.com/.well-known/jwks.json",
"user_id_claim": "sub",
})
data = _parse_result(result)
assert data["configured"] is True
client.configure_custom_auth.assert_called_once_with(
issuer_url="https://myapp.auth0.com/",
jwks_uri="https://myapp.auth0.com/.well-known/jwks.json",
user_id_claim="sub",
)
@pytest.mark.asyncio
async def test_auth_error_propagates(self):
"""configure_custom_auth propagates AuthError to centralized handler."""
client = _mock_client(
configure_custom_auth=AsyncMock(side_effect=AuthError("Token expired")),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(AuthError):
await configure_custom_auth({
"issuer_url": "https://example.com/",
"jwks_uri": "https://example.com/.well-known/jwks.json",
})
@pytest.mark.asyncio
async def test_default_user_id_claim(self):
"""configure_custom_auth defaults user_id_claim to 'sub'."""
client = _mock_client(
configure_custom_auth=AsyncMock(return_value={"configured": True}),
)
with patch("fastn_mcp.server._get_client", return_value=client):
await configure_custom_auth({
"issuer_url": "https://example.com/",
"jwks_uri": "https://example.com/.well-known/jwks.json",
})
client.configure_custom_auth.assert_called_once_with(
issuer_url="https://example.com/",
jwks_uri="https://example.com/.well-known/jwks.json",
user_id_claim="sub",
)
# ---------------------------------------------------------------------------
# _get_client token resolution tests
# ---------------------------------------------------------------------------
class TestGetClientTokenResolution:
"""Tests for _get_client Bearer token fallback."""
@pytest.mark.asyncio
async def test_bearer_header_fallback_no_auth(self):
"""_get_client extracts Bearer token from HTTP header when no OAuth provider."""
from fastn_mcp.server import _get_client, server
from unittest.mock import MagicMock, PropertyMock, patch
mock_request = MagicMock()
mock_request.headers = {"authorization": "Bearer test-keycloak-jwt"}
mock_ctx = MagicMock()
mock_ctx.request = mock_request
with patch.object(type(server), "request_context", new_callable=PropertyMock, return_value=mock_ctx), \
patch("fastn_mcp.server._oauth_provider", None):
client = _get_client({})
assert client._config.auth_token == "test-keycloak-jwt"
await client.close()
@pytest.mark.asyncio
async def test_bearer_header_ignored_when_args_present(self):
"""_get_client prefers access_token from tool args over header."""
from fastn_mcp.server import _get_client, server
from unittest.mock import MagicMock, PropertyMock, patch
mock_request = MagicMock()
mock_request.headers = {"authorization": "Bearer header-token"}
mock_ctx = MagicMock()
mock_ctx.request = mock_request
with patch.object(type(server), "request_context", new_callable=PropertyMock, return_value=mock_ctx), \
patch("fastn_mcp.server._oauth_provider", None):
client = _get_client({"access_token": "args-token"})
assert client._config.auth_token == "args-token"
await client.close()
@pytest.mark.asyncio
async def test_no_token_falls_through(self):
"""_get_client doesn't crash when no token is available anywhere."""
from fastn_mcp.server import _get_client, server
from unittest.mock import MagicMock, PropertyMock, patch
mock_request = MagicMock()
mock_request.headers = {}
mock_ctx = MagicMock()
mock_ctx.request = mock_request
with patch.object(type(server), "request_context", new_callable=PropertyMock, return_value=mock_ctx), \
patch("fastn_mcp.server._oauth_provider", None):
# Should not raise — SDK may pick up token from config file
client = _get_client({})
await client.close()
@pytest.mark.asyncio
async def test_stdio_no_request_context(self):
"""_get_client handles stdio transport (no request context) gracefully."""
from fastn_mcp.server import _get_client, server
from unittest.mock import PropertyMock, patch
with patch.object(type(server), "request_context", new_callable=PropertyMock, side_effect=LookupError), \
patch("fastn_mcp.server._oauth_provider", None):
# Should not raise even without request context
client = _get_client({})
await client.close()
# ---------------------------------------------------------------------------
# Top-level error handling tests (errors that escape individual handlers)
# ---------------------------------------------------------------------------
class TestTopLevelErrorHandling:
@pytest.mark.asyncio
async def test_config_error_returns_workspace_not_set(self):
"""ConfigError from _get_client returns WORKSPACE_NOT_SET."""
with patch("fastn_mcp.server._get_client", side_effect=ConfigError("Missing api_key")):
result = await handle_call_tool("list_flows", {})
data = _parse_result(result)
assert data["error"] == "WORKSPACE_NOT_SET"
@pytest.mark.asyncio
async def test_auth_error_returns_invalid_token(self):
"""AuthError escaping a handler returns INVALID_TOKEN."""
with patch("fastn_mcp.server._get_client", side_effect=AuthError("Token expired")):
result = await handle_call_tool("execute_tool", {
"action_id": "act_test", "parameters": {},
})
data = _parse_result(result)
assert data["error"] == "INVALID_TOKEN"
@pytest.mark.asyncio
async def test_fastn_error_returns_fastn_error(self):
"""FastnError escaping a handler returns FASTN_ERROR."""
with patch("fastn_mcp.server._get_client", side_effect=FastnError("SDK crash")):
result = await handle_call_tool("delete_flow", {"flow_id": "x"})
data = _parse_result(result)
assert data["error"] == "FASTN_ERROR"
@pytest.mark.asyncio
async def test_unexpected_error_returns_internal_error(self):
"""Non-SDK exceptions still return INTERNAL_ERROR."""
with patch("fastn_mcp.server._get_client", side_effect=RuntimeError("boom")):
result = await handle_call_tool("list_flows", {})
data = _parse_result(result)
assert data["error"] == "INTERNAL_ERROR"
@pytest.mark.asyncio
async def test_unknown_tool_returns_unknown_tool(self):
"""Calling a non-existent tool returns UNKNOWN_TOOL."""
result = await handle_call_tool("nonexistent_tool", {})
data = _parse_result(result)
assert data["error"] == "UNKNOWN_TOOL"
# Auth module tests
# ---------------------------------------------------------------------------
class TestPKCEAuth:
def test_generate_pkce_challenge(self):
"""PKCE challenge generation produces valid verifier and challenge."""
from fastn_mcp.auth import generate_pkce_challenge
pkce = generate_pkce_challenge()
assert len(pkce.code_verifier) >= 43
assert len(pkce.code_challenge) >= 43
assert pkce.code_verifier != pkce.code_challenge
def test_build_authorize_url(self):
"""Authorize URL contains required PKCE parameters."""
from fastn_mcp.auth import build_authorize_url
url = build_authorize_url(
code_challenge="test_challenge",
redirect_uri="https://example.com/callback",
state="csrf_token",
)
assert "live.fastn.ai" in url
assert "code_challenge=test_challenge" in url
assert "code_challenge_method=S256" in url
assert "redirect_uri=https://example.com/callback" in url
assert "state=csrf_token" in url
assert "response_type=code" in url
def test_build_connect_url(self):
"""Connect URL contains access token and redirect."""
from fastn_mcp.auth import build_connect_url
url = build_connect_url(
access_token="my_token",
redirect_uri="https://example.com/done",
)
assert "app.fastn.ai/connect" in url
assert "access_token=my_token" in url
assert "redirect_uri=https://example.com/done" in url
# ---------------------------------------------------------------------------
# Remote transport tests
# ---------------------------------------------------------------------------
class TestCreateStarletteApp:
def test_sse_app_creation(self):
"""create_starlette_app('sse') returns a Starlette app with SSE and shttp routes."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app("sse", auth_enabled=False)
# Should be a Starlette application
from starlette.applications import Starlette
assert isinstance(app, Starlette)
# Should have routes for /sse, /messages/, and /shttp
route_paths = []
for route in app.routes:
if hasattr(route, "path"):
route_paths.append(route.path)
assert "/sse" in route_paths
assert "/shttp" in route_paths
def test_shttp_only_app_creation(self):
"""create_starlette_app('shttp-only') returns app with /shttp only, no /sse."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app("shttp-only", auth_enabled=False)
from starlette.applications import Starlette
assert isinstance(app, Starlette)
route_paths = []
for route in app.routes:
if hasattr(route, "path"):
route_paths.append(route.path)
assert "/shttp" in route_paths
assert "/sse" not in route_paths
def test_sse_only_app_creation(self):
"""create_starlette_app('sse-only') returns app with /sse only, no /shttp."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app("sse-only", auth_enabled=False)
from starlette.applications import Starlette
assert isinstance(app, Starlette)
route_paths = []
for route in app.routes:
if hasattr(route, "path"):
route_paths.append(route.path)
assert "/sse" in route_paths
assert "/shttp" not in route_paths
def test_unknown_transport_raises(self):
"""create_starlette_app raises ValueError for unknown transport."""
from fastn_mcp.server import create_starlette_app
with pytest.raises(ValueError, match="Unknown transport"):
create_starlette_app("grpc")
def test_root_route_exists(self):
"""create_starlette_app always adds a GET / route."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app("sse", auth_enabled=False)
route_paths = [r.path for r in app.routes if hasattr(r, "path")]
assert "/" in route_paths
class TestPerModeRouting:
"""Tests that Starlette routes dispatch to the correct per-mode handler.
Each URL path should reach the Route with the correct tool set:
/shttp → Route("/shttp") → all tools (10)
/shttp/ucl → Route("/shttp/ucl") → discovery tools (4)
/shttp/ucl/{project_id} → Route("/shttp/ucl/{project_id}") → discovery tools (3)
NOTE: We use Route (exact match), not Mount (prefix match), because
Starlette's Mount appends /{path:path} to the regex — so Mount("/shttp")
only matches /shttp/<something>, never /shttp alone.
"""
def _find_first_match(self, app, path, method="POST"):
"""Find the first route that matches the given path/method."""
from starlette.routing import Match
scope = {"type": "http", "path": path, "method": method}
for route in app.routes:
match, child_scope = route.matches(scope)
if match == Match.FULL:
return route, child_scope
return None, None
def test_shttp_ucl_matches_ucl_route(self):
"""POST /shttp/ucl should match Route('/shttp/ucl')."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app("shttp-only", auth_enabled=False)
route, child_scope = self._find_first_match(app, "/shttp/ucl")
assert route is not None, "No route matched /shttp/ucl"
assert route.path == "/shttp/ucl", f"Expected /shttp/ucl but matched {route.path}"
def test_shttp_ucl_project_matches_project_route(self):
"""POST /shttp/ucl/<uuid> should match Route('/shttp/ucl/{project_id}')."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app("shttp-only", auth_enabled=False)
uuid = "12345678-1234-1234-1234-123456789abc"
route, child_scope = self._find_first_match(app, f"/shttp/ucl/{uuid}")
assert route is not None, f"No route matched /shttp/ucl/{uuid}"
assert route.path == "/shttp/ucl/{project_id:path}", \
f"Expected /shttp/ucl/{{project_id:path}} but matched {route.path}"
def test_shttp_all_matches_shttp_route(self):
"""POST /shttp should match Route('/shttp')."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app("shttp-only", auth_enabled=False)
route, child_scope = self._find_first_match(app, "/shttp")
assert route is not None, "No route matched /shttp"
assert route.path == "/shttp", f"Expected /shttp but matched {route.path}"
def test_shttp_routes_dont_cross_match(self):
"""/shttp/ucl should NOT match Route('/shttp') — only its own route."""
from fastn_mcp.server import create_starlette_app
from starlette.routing import Match
app = create_starlette_app("shttp-only", auth_enabled=False)
scope = {"type": "http", "path": "/shttp/ucl", "method": "POST"}
shttp_route = None
for route in app.routes:
if hasattr(route, "path") and route.path == "/shttp":
shttp_route = route
break
if shttp_route:
match, _ = shttp_route.matches(scope)
assert match != Match.FULL, "/shttp Route should NOT match /shttp/ucl"
def test_per_mode_server_tool_counts(self):
"""Verify that _create_mcp_server is called with correct tool counts."""
from fastn_mcp.server import create_starlette_app, _create_mcp_server, TOOLS, UCL_TOOL_NAMES
server_tool_counts = []
original = _create_mcp_server
def tracking_create(tools):
server_tool_counts.append(len(tools))
return original(tools)
with patch("fastn_mcp.server._create_mcp_server", side_effect=tracking_create):
app = create_starlette_app("shttp-only", auth_enabled=False)
# 3 servers: all tools, ucl tools, ucl tools without list_projects
assert len(server_tool_counts) == 4
assert server_tool_counts[0] == len(TOOLS) # all: 11
assert server_tool_counts[1] == len(UCL_TOOL_NAMES) # ucl: 5
assert server_tool_counts[2] == len(UCL_TOOL_NAMES) - 1 # ucl-proj: 4
assert server_tool_counts[3] == len(UCL_TOOL_NAMES) - 2 # ucl-proj-skill: 3
def test_sse_routes_have_correct_endpoints(self):
"""SSE mode creates 3 per-mode SSE routes."""
from fastn_mcp.server import create_starlette_app
from starlette.routing import Route
app = create_starlette_app("sse", auth_enabled=False)
sse_routes = [r for r in app.routes if isinstance(r, Route) and r.path.startswith("/sse")]
sse_paths = [r.path for r in sse_routes]
assert "/sse" in sse_paths
assert "/sse/ucl" in sse_paths
assert "/sse/ucl/{project_id}" in sse_paths
def test_shttp_routes_have_all_three(self):
"""shttp-only mode creates 3 Route entries for /shttp paths."""
from fastn_mcp.server import create_starlette_app
from starlette.routing import Route
app = create_starlette_app("shttp-only", auth_enabled=False)
shttp_routes = [r for r in app.routes if isinstance(r, Route) and
hasattr(r, "path") and "/shttp" in r.path]
shttp_paths = sorted(r.path for r in shttp_routes)
assert "/shttp" in shttp_paths
assert "/shttp/ucl" in shttp_paths
class TestRootEndpoint:
"""Tests for the dynamic GET / server info endpoint."""
@pytest.mark.asyncio
async def test_root_sse_shows_all_mode_combinations(self):
"""GET / with SSE transport lists all mode combinations for SSE + shttp."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app("sse", auth_enabled=False, server_url="http://localhost:8000")
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/")
assert resp.status_code == 200
data = resp.json()
paths = [e["path"] for e in data["endpoints"]]
# shttp mode combinations
assert "/shttp" in paths
assert "/shttp/ucl" in paths
assert "/shttp/ucl/{project_id}" in paths
# SSE mode combinations
assert "/sse" in paths
assert "/sse/ucl" in paths
assert "/sse/ucl/{project_id}" in paths
assert "/messages/" in paths
assert data["auth"] == "disabled"
@pytest.mark.asyncio
async def test_root_shttp_only(self):
"""GET / with shttp-only transport lists only shttp endpoints."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app("shttp-only", auth_enabled=False, server_url="http://localhost:8000")
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/")
data = resp.json()
paths = [e["path"] for e in data["endpoints"]]
assert "/shttp" in paths
assert "/shttp/ucl" in paths
assert "/sse" not in paths
assert "/messages/" not in paths
@pytest.mark.asyncio
async def test_root_sse_only(self):
"""GET / with sse-only transport lists only SSE endpoints."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app("sse-only", auth_enabled=False, server_url="http://localhost:8000")
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/")
data = resp.json()
paths = [e["path"] for e in data["endpoints"]]
assert "/sse" in paths
assert "/sse/ucl" in paths
assert "/messages/" in paths
assert "/shttp" not in paths
@pytest.mark.asyncio
async def test_root_auth_enabled_shows_auth_endpoints(self):
"""GET / with auth enabled lists OAuth endpoints."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app("sse", auth_enabled=True, server_url="http://localhost:8000")
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/")
data = resp.json()
assert data["auth"] == "enabled"
paths = [e["path"] for e in data["endpoints"]]
assert "/.well-known/oauth-authorization-server" in paths
assert "/token" in paths
assert "/register" in paths
@pytest.mark.asyncio
async def test_root_shows_modes_with_tool_lists(self):
"""GET / includes modes dict with tool lists for each mode."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app("sse", auth_enabled=False, server_url="http://localhost:8000")
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/")
data = resp.json()
# modes dict with agent and ucl
assert "modes" in data
assert "agent" in data["modes"]
assert "ucl" in data["modes"]
all_tools = data["modes"]["agent"]["tools"]
ucl_tools = data["modes"]["ucl"]["tools"]
assert len(all_tools) == len(TOOLS)
assert set(ucl_tools) == UCL_TOOL_NAMES
@pytest.mark.asyncio
async def test_root_uses_server_url_for_endpoint_urls(self):
"""GET / builds endpoint URLs from server_url, not request host."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse", auth_enabled=False,
server_url="https://mcp.example.com",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/")
data = resp.json()
for endpoint in data["endpoints"]:
assert endpoint["url"].startswith("https://mcp.example.com/")
class TestCLI:
@staticmethod
def _make_parser():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--stdio", action="store_true", default=False)
parser.add_argument("--sse", action="store_true", default=False)
parser.add_argument("--shttp", action="store_true", default=False)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--no-auth", action="store_true", default=False)
parser.add_argument("--server-url", default=None)
parser.add_argument("-v", "--verbose", action="store_true", default=False)
return parser
@staticmethod
def _resolve_transport(args):
if args.stdio:
return "stdio"
elif args.sse and not args.shttp:
return "sse-only"
elif args.shttp and not args.sse:
return "shttp-only"
else:
return "sse"
def test_default_args(self):
"""CLI defaults to SSE + shttp (both transports)."""
args = self._make_parser().parse_args([])
transport = self._resolve_transport(args)
assert transport == "sse" # both
assert args.host == "0.0.0.0"
assert args.port == 8000
assert args.no_auth is False
assert args.verbose is False
def test_sse_only(self):
"""--sse alone gives sse-only."""
args = self._make_parser().parse_args(["--sse"])
assert self._resolve_transport(args) == "sse-only"
def test_shttp_only(self):
"""--shttp alone gives shttp-only."""
args = self._make_parser().parse_args(["--shttp"])
assert self._resolve_transport(args) == "shttp-only"
def test_both_flags(self):
"""--sse --shttp gives both (same as default)."""
args = self._make_parser().parse_args(["--sse", "--shttp"])
assert self._resolve_transport(args) == "sse"
def test_stdio_flag(self):
"""--stdio gives stdio transport."""
args = self._make_parser().parse_args(["--stdio"])
assert self._resolve_transport(args) == "stdio"
def test_no_auth_flag(self):
"""CLI parses --no-auth flag."""
args = self._make_parser().parse_args(["--no-auth"])
assert args.no_auth is True
def test_server_url_flag(self):
"""CLI parses --server-url flag."""
args = self._make_parser().parse_args(["--server-url", "https://mcp.ucl.dev"])
assert args.server_url == "https://mcp.ucl.dev"
def test_verbose_flag(self):
"""CLI parses --verbose / -v flag."""
args = self._make_parser().parse_args(["--verbose"])
assert args.verbose is True
args_short = self._make_parser().parse_args(["-v"])
assert args_short.verbose is True
class TestSSESessionCleanup:
"""Tests for SSE session tracking and dead-session detection."""
@pytest.mark.asyncio
async def test_post_to_dead_session_returns_404(self):
"""POST /messages/?session_id=<dead> returns 404, not 202."""
from fastn_mcp.server import create_starlette_app, _active_sse_sessions
from httpx import ASGITransport, AsyncClient
app = create_starlette_app("sse", auth_enabled=False)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
# POST to a session that was never created
response = await client.post(
"/messages/?session_id=deadbeefdeadbeefdeadbeefdeadbeef",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_active_session_tracking(self):
"""_active_sse_sessions set is maintained properly."""
from fastn_mcp.server import _active_sse_sessions
# The set should be empty when no SSE connections are active
# (cleanup from previous tests may leave it empty)
# Just verify it's a set
assert isinstance(_active_sse_sessions, set)
class TestProtectedResourceMetadata:
"""Tests for RFC 9728 Protected Resource Metadata endpoints."""
@pytest.mark.asyncio
async def test_root_protected_resource_metadata(self):
"""/.well-known/oauth-protected-resource returns resource metadata."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
response = await client.get("/.well-known/oauth-protected-resource")
assert response.status_code == 200
data = response.json()
assert "resource" in data
assert "authorization_servers" in data
assert len(data["authorization_servers"]) >= 1
assert data["scopes_supported"] == ["read", "write"]
assert data["resource_name"] == "Fastn MCP Server"
@pytest.mark.asyncio
async def test_sse_path_protected_resource_metadata(self):
"""/.well-known/oauth-protected-resource/sse returns resource metadata."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
response = await client.get("/.well-known/oauth-protected-resource/sse")
assert response.status_code == 200
data = response.json()
assert "resource" in data
assert "authorization_servers" in data
@pytest.mark.asyncio
async def test_shttp_path_protected_resource_metadata(self):
"""/.well-known/oauth-protected-resource/shttp returns resource metadata."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"shttp-only",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
response = await client.get("/.well-known/oauth-protected-resource/shttp")
assert response.status_code == 200
data = response.json()
assert "resource" in data
@pytest.mark.asyncio
async def test_no_protected_resource_without_auth(self):
"""No protected resource metadata when auth is disabled."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app("sse", auth_enabled=False)
route_paths = []
for route in app.routes:
if hasattr(route, "path"):
route_paths.append(route.path)
assert "/.well-known/oauth-protected-resource" not in route_paths
assert "/.well-known/oauth-protected-resource/sse" not in route_paths
class TestSSETransportIntegration:
"""Integration tests for the SSE transport using httpx TestClient."""
@pytest.mark.asyncio
async def test_sse_endpoint_exists(self):
"""SSE app responds to GET /sse."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app("sse", auth_enabled=False)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
# The /sse endpoint should accept connections (we just verify it doesn't 404)
# SSE is a streaming response, so we use a short timeout
import anyio
try:
with anyio.fail_after(1):
response = await client.get("/sse")
# If we get here, the connection was accepted
assert response.status_code == 200
except TimeoutError:
# Expected — SSE connections are long-lived
pass
except Exception:
# Connection accepted but may timeout — that's fine
pass
@pytest.mark.asyncio
async def test_messages_endpoint_requires_session(self):
"""SSE app rejects POST /messages/ without a valid session."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app("sse", auth_enabled=False)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
)
# Should fail without a valid session
assert response.status_code >= 400
@pytest.mark.asyncio
async def test_sse_graceful_disconnect(self):
"""SSE handler doesn't raise when the client disconnects abruptly."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
import anyio
app = create_starlette_app("sse", auth_enabled=False)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
# Connect and immediately disconnect — should not raise
try:
with anyio.fail_after(0.5):
await client.get("/sse")
except (TimeoutError, Exception):
pass # Expected — client times out / disconnects
# If we reach here without an unhandled exception, the test passes
class TestStreamableHTTPTransportIntegration:
"""Integration tests for the Streamable HTTP transport."""
@pytest.mark.asyncio
async def test_shttp_endpoint_routed(self):
"""shttp-only app routes POST /shttp to the session manager."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app("shttp-only", auth_enabled=False)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
try:
response = await client.post(
"/shttp",
json={
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "test", "version": "1.0"},
},
"id": 1,
},
headers={"Content-Type": "application/json"},
)
# 200 = fully working, 500 = route works but lifespan not started
assert response.status_code in (200, 500)
except RuntimeError as e:
# "Task group is not initialized" — confirms the route reached
# the session manager (which is what we want to verify)
assert "Task group" in str(e)
# ---------------------------------------------------------------------------
# OAuth provider tests
# ---------------------------------------------------------------------------
class TestFastnOAuthProvider:
"""Tests for the FastnOAuthProvider (MCP OAuth ↔ Keycloak bridge)."""
def _make_provider(self):
from fastn_mcp.auth import FastnOAuthProvider
return FastnOAuthProvider(server_url="http://localhost:8000")
def _make_client_info(self, client_id="test-client"):
from mcp.shared.auth import OAuthClientInformationFull
from pydantic import AnyUrl
return OAuthClientInformationFull(
client_id=client_id,
client_secret=None,
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
)
@pytest.mark.asyncio
async def test_register_and_get_client(self):
"""register_client stores client, get_client retrieves it."""
provider = self._make_provider()
client_info = self._make_client_info("my-client")
await provider.register_client(client_info)
result = await provider.get_client("my-client")
assert result is not None
assert result.client_id == "my-client"
@pytest.mark.asyncio
async def test_get_client_not_found(self):
"""get_client returns None for unknown client_id."""
provider = self._make_provider()
result = await provider.get_client("nonexistent")
assert result is None
@pytest.mark.asyncio
async def test_authorize_returns_keycloak_url(self):
"""authorize() returns a Keycloak URL with PKCE."""
from mcp.server.auth.provider import AuthorizationParams
from pydantic import AnyUrl
provider = self._make_provider()
client_info = self._make_client_info()
params = AuthorizationParams(
state="client-state",
scopes=["read"],
code_challenge="test_challenge_abc",
redirect_uri=AnyUrl("http://localhost:3000/callback"),
redirect_uri_provided_explicitly=True,
)
url = await provider.authorize(client_info, params)
assert "live.fastn.ai" in url
assert "code_challenge=" in url
assert "code_challenge_method=S256" in url
assert "redirect_uri=" in url
# The redirect URI to Keycloak should be our /callback endpoint
assert "localhost%3A8000%2Fcallback" in url or "localhost:8000/callback" in url
@pytest.mark.asyncio
async def test_authorize_stores_pending_auth(self):
"""authorize() stores pending auth state for the callback."""
from mcp.server.auth.provider import AuthorizationParams
from pydantic import AnyUrl
provider = self._make_provider()
client_info = self._make_client_info()
params = AuthorizationParams(
state="client-state",
scopes=["read"],
code_challenge="test_challenge_abc",
redirect_uri=AnyUrl("http://localhost:3000/callback"),
redirect_uri_provided_explicitly=True,
)
await provider.authorize(client_info, params)
# Should have one pending auth
assert len(provider._pending_auths) == 1
@pytest.mark.asyncio
async def test_load_authorization_code_not_found(self):
"""load_authorization_code returns None for unknown code."""
provider = self._make_provider()
client_info = self._make_client_info()
result = await provider.load_authorization_code(client_info, "bogus-code")
assert result is None
@pytest.mark.asyncio
async def test_load_authorization_code_expired(self):
"""load_authorization_code returns None for expired code."""
import time
from mcp.server.auth.provider import AuthorizationCode
from pydantic import AnyUrl
provider = self._make_provider()
client_info = self._make_client_info()
# Manually store an expired code
provider._auth_codes["expired-code"] = AuthorizationCode(
code="expired-code",
scopes=["read"],
expires_at=time.time() - 100, # expired 100s ago
client_id="test-client",
code_challenge="challenge",
redirect_uri=AnyUrl("http://localhost:3000/callback"),
redirect_uri_provided_explicitly=True,
)
result = await provider.load_authorization_code(client_info, "expired-code")
assert result is None
@pytest.mark.asyncio
async def test_load_authorization_code_wrong_client(self):
"""load_authorization_code returns None for mismatched client."""
import time
from mcp.server.auth.provider import AuthorizationCode
from pydantic import AnyUrl
provider = self._make_provider()
client_info = self._make_client_info("different-client")
provider._auth_codes["my-code"] = AuthorizationCode(
code="my-code",
scopes=["read"],
expires_at=time.time() + 300,
client_id="test-client",
code_challenge="challenge",
redirect_uri=AnyUrl("http://localhost:3000/callback"),
redirect_uri_provided_explicitly=True,
)
result = await provider.load_authorization_code(client_info, "my-code")
assert result is None
@pytest.mark.asyncio
async def test_exchange_authorization_code_issues_tokens(self):
"""exchange_authorization_code returns MCP tokens."""
import time
from mcp.server.auth.provider import AuthorizationCode
from pydantic import AnyUrl
provider = self._make_provider()
client_info = self._make_client_info()
auth_code = AuthorizationCode(
code="valid-code",
scopes=["read"],
expires_at=time.time() + 300,
client_id="test-client",
code_challenge="challenge",
redirect_uri=AnyUrl("http://localhost:3000/callback"),
redirect_uri_provided_explicitly=True,
)
provider._auth_codes["valid-code"] = auth_code
# Store a keycloak token for this code
provider._keycloak_tokens["code:valid-code"] = "kc-access-token"
token = await provider.exchange_authorization_code(client_info, auth_code)
assert token.access_token is not None
assert token.refresh_token is not None
assert token.token_type == "Bearer"
assert token.expires_in == 3600
# Auth code should be consumed
assert "valid-code" not in provider._auth_codes
# MCP token should be mapped to Keycloak token
assert provider._keycloak_tokens.get(token.access_token) == "kc-access-token"
@pytest.mark.asyncio
async def test_load_access_token_valid(self):
"""load_access_token returns token for valid non-expired token."""
import time
from mcp.server.auth.provider import AccessToken
provider = self._make_provider()
provider._access_tokens["valid-token"] = AccessToken(
token="valid-token",
client_id="test-client",
scopes=["read"],
expires_at=int(time.time()) + 3600,
)
result = await provider.load_access_token("valid-token")
assert result is not None
assert result.token == "valid-token"
@pytest.mark.asyncio
async def test_load_access_token_expired(self):
"""load_access_token returns None for expired token."""
import time
from mcp.server.auth.provider import AccessToken
provider = self._make_provider()
provider._access_tokens["expired-token"] = AccessToken(
token="expired-token",
client_id="test-client",
scopes=["read"],
expires_at=int(time.time()) - 100,
)
result = await provider.load_access_token("expired-token")
assert result is None
@pytest.mark.asyncio
async def test_load_access_token_not_found(self):
"""load_access_token returns None for unknown token."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value=None)
with patch("fastn_mcp.auth.refresh_tokens", new_callable=AsyncMock, side_effect=httpx.HTTPStatusError("Bad Request", request=MagicMock(), response=MagicMock(status_code=400))):
result = await provider.load_access_token("bogus")
assert result is None
@pytest.mark.asyncio
async def test_revoke_access_token(self):
"""revoke_token removes access token and associated refresh tokens."""
import time
from mcp.server.auth.provider import AccessToken, RefreshToken
provider = self._make_provider()
provider._access_tokens["at"] = AccessToken(
token="at",
client_id="test-client",
scopes=["read"],
expires_at=int(time.time()) + 3600,
)
provider._refresh_tokens["rt"] = RefreshToken(
token="rt",
client_id="test-client",
scopes=["read"],
expires_at=int(time.time()) + 86400,
)
provider._keycloak_tokens["at"] = "kc-token"
await provider.revoke_token(provider._access_tokens["at"])
assert "at" not in provider._access_tokens
assert "rt" not in provider._refresh_tokens
assert "at" not in provider._keycloak_tokens
@pytest.mark.asyncio
async def test_load_refresh_token_valid(self):
"""load_refresh_token returns token for valid non-expired refresh token."""
import time
from mcp.server.auth.provider import RefreshToken
provider = self._make_provider()
client_info = self._make_client_info()
provider._refresh_tokens["valid-rt"] = RefreshToken(
token="valid-rt",
client_id="test-client",
scopes=["read"],
expires_at=int(time.time()) + 86400,
)
result = await provider.load_refresh_token(client_info, "valid-rt")
assert result is not None
assert result.token == "valid-rt"
@pytest.mark.asyncio
async def test_load_refresh_token_expired(self):
"""load_refresh_token returns None for expired refresh token."""
import time
from mcp.server.auth.provider import RefreshToken
provider = self._make_provider()
client_info = self._make_client_info()
provider._refresh_tokens["expired-rt"] = RefreshToken(
token="expired-rt",
client_id="test-client",
scopes=["read"],
expires_at=int(time.time()) - 100,
)
result = await provider.load_refresh_token(client_info, "expired-rt")
assert result is None
@pytest.mark.asyncio
async def test_exchange_refresh_token_rotates(self):
"""exchange_refresh_token issues new tokens and removes old ones."""
import time
from mcp.server.auth.provider import AccessToken, RefreshToken
provider = self._make_provider()
client_info = self._make_client_info()
# Set up existing tokens
provider._access_tokens["old-at"] = AccessToken(
token="old-at",
client_id="test-client",
scopes=["read"],
expires_at=int(time.time()) + 3600,
)
provider._keycloak_tokens["old-at"] = "kc-token"
rt = RefreshToken(
token="old-rt",
client_id="test-client",
scopes=["read"],
expires_at=int(time.time()) + 86400,
)
provider._refresh_tokens["old-rt"] = rt
new_token = await provider.exchange_refresh_token(client_info, rt, ["read"])
assert new_token.access_token != "old-at"
assert new_token.refresh_token != "old-rt"
assert new_token.token_type == "Bearer"
# Old tokens should be removed
assert "old-at" not in provider._access_tokens
assert "old-rt" not in provider._refresh_tokens
# Keycloak token should be migrated to new access token
assert provider._keycloak_tokens.get(new_token.access_token) == "kc-token"
def test_get_keycloak_token(self):
"""get_keycloak_token returns mapped Keycloak token."""
provider = self._make_provider()
provider._keycloak_tokens["mcp-token"] = "kc-token-123"
assert provider.get_keycloak_token("mcp-token") == "kc-token-123"
assert provider.get_keycloak_token("nonexistent") is None
@pytest.mark.asyncio
async def test_handle_keycloak_callback_invalid_state(self):
"""handle_keycloak_callback raises ValueError for unknown state."""
provider = self._make_provider()
with pytest.raises(ValueError, match="Invalid or expired"):
await provider.handle_keycloak_callback("bogus-state", "code")
# ---------------------------------------------------------------------------
# Client registration validation tests
# ---------------------------------------------------------------------------
class TestRegisterClientValidation:
"""Tests for redirect_uri validation, rate limiting, and client cap."""
def _make_provider(self):
from fastn_mcp.auth import FastnOAuthProvider
return FastnOAuthProvider(server_url="http://localhost:8000")
def _make_client_info(self, client_id="test-client", redirect_uris=None):
from mcp.shared.auth import OAuthClientInformationFull
from pydantic import AnyUrl
if redirect_uris is None:
redirect_uris = [AnyUrl("http://localhost:3000/callback")]
return OAuthClientInformationFull(
client_id=client_id,
client_secret=None,
redirect_uris=redirect_uris,
)
# -- redirect_uri validation --
@pytest.mark.asyncio
async def test_localhost_http_allowed(self):
"""localhost with http is allowed (desktop MCP clients)."""
from pydantic import AnyUrl
provider = self._make_provider()
info = self._make_client_info(redirect_uris=[AnyUrl("http://localhost:9999/cb")])
await provider.register_client(info)
assert await provider.get_client("test-client") is not None
@pytest.mark.asyncio
async def test_localhost_https_allowed(self):
"""localhost with https is also allowed."""
from pydantic import AnyUrl
provider = self._make_provider()
info = self._make_client_info(redirect_uris=[AnyUrl("https://localhost:3000/cb")])
await provider.register_client(info)
assert await provider.get_client("test-client") is not None
@pytest.mark.asyncio
async def test_127_0_0_1_allowed(self):
"""127.0.0.1 is allowed."""
from pydantic import AnyUrl
provider = self._make_provider()
info = self._make_client_info(redirect_uris=[AnyUrl("http://127.0.0.1:5000/cb")])
await provider.register_client(info)
assert await provider.get_client("test-client") is not None
@pytest.mark.asyncio
async def test_allowed_https_domain(self):
"""HTTPS domain in the allowlist is accepted."""
from pydantic import AnyUrl
provider = self._make_provider()
info = self._make_client_info(redirect_uris=[AnyUrl("https://lovable.dev/api/mcp/callback")])
await provider.register_client(info)
assert await provider.get_client("test-client") is not None
@pytest.mark.asyncio
async def test_allowed_subdomain(self):
"""Subdomain of allowed domain is accepted."""
from pydantic import AnyUrl
provider = self._make_provider()
info = self._make_client_info(redirect_uris=[AnyUrl("https://api.lovable.dev/callback")])
await provider.register_client(info)
assert await provider.get_client("test-client") is not None
@pytest.mark.asyncio
async def test_rejected_unknown_domain(self):
"""Unknown domain is rejected."""
from pydantic import AnyUrl
from fastn_mcp.auth import RegistrationError
provider = self._make_provider()
info = self._make_client_info(redirect_uris=[AnyUrl("https://evil.com/steal")])
with pytest.raises(RegistrationError, match="not allowed"):
await provider.register_client(info)
assert await provider.get_client("test-client") is None
@pytest.mark.asyncio
async def test_rejected_http_non_localhost(self):
"""HTTP with non-localhost domain is rejected."""
from pydantic import AnyUrl
from fastn_mcp.auth import RegistrationError
provider = self._make_provider()
info = self._make_client_info(redirect_uris=[AnyUrl("http://lovable.dev/callback")])
with pytest.raises(RegistrationError, match="not allowed"):
await provider.register_client(info)
@pytest.mark.asyncio
async def test_rejected_mixed_uris_one_bad(self):
"""If any redirect_uri is invalid, the whole registration fails."""
from pydantic import AnyUrl
from fastn_mcp.auth import RegistrationError
provider = self._make_provider()
info = self._make_client_info(redirect_uris=[
AnyUrl("http://localhost:3000/cb"),
AnyUrl("https://evil.com/steal"),
])
with pytest.raises(RegistrationError, match="not allowed"):
await provider.register_client(info)
# -- rate limiting --
@pytest.mark.asyncio
async def test_rate_limit_exceeded(self):
"""Exceeding REGISTER_RATE_LIMIT raises RegistrationError."""
from fastn_mcp.auth import RegistrationError, REGISTER_RATE_LIMIT
provider = self._make_provider()
for i in range(REGISTER_RATE_LIMIT):
info = self._make_client_info(client_id=f"client-{i}")
await provider.register_client(info)
info = self._make_client_info(client_id=f"client-{REGISTER_RATE_LIMIT}")
with pytest.raises(RegistrationError, match="Too many"):
await provider.register_client(info)
# -- client cap --
@pytest.mark.asyncio
async def test_max_clients_reached(self):
"""Exceeding MAX_REGISTERED_CLIENTS raises RegistrationError."""
from fastn_mcp.auth import RegistrationError
import fastn_mcp.auth as auth_mod
provider = self._make_provider()
# Fill up to the cap by directly injecting into _clients
for i in range(auth_mod.MAX_REGISTERED_CLIENTS):
provider._clients[f"fake-{i}"] = "placeholder"
info = self._make_client_info(client_id="one-too-many")
with pytest.raises(RegistrationError, match="maximum client capacity"):
await provider.register_client(info)
# -- cleanup --
def test_cleanup_expired_tokens(self):
"""cleanup_expired removes expired access tokens."""
import time as _time
from mcp.server.auth.provider import AccessToken
provider = self._make_provider()
# Insert an expired access token
provider._access_tokens["expired-tok"] = AccessToken(
token="expired-tok",
client_id="c1",
scopes=[],
expires_at=int(_time.time()) - 100,
)
provider._keycloak_tokens["expired-tok"] = "kc-tok"
removed = provider.cleanup_expired()
assert removed >= 1
assert "expired-tok" not in provider._access_tokens
assert "expired-tok" not in provider._keycloak_tokens
def test_cleanup_expired_auth_codes(self):
"""cleanup_expired removes expired auth codes."""
import time as _time
from mcp.server.auth.provider import AuthorizationCode
from pydantic import AnyUrl
provider = self._make_provider()
provider._auth_codes["old-code"] = AuthorizationCode(
code="old-code",
client_id="c1",
scopes=[],
expires_at=_time.time() - 100,
code_challenge="ch",
redirect_uri=AnyUrl("http://localhost:3000/cb"),
redirect_uri_provided_explicitly=True,
)
removed = provider.cleanup_expired()
assert removed >= 1
assert "old-code" not in provider._auth_codes
# ---------------------------------------------------------------------------
# Redirect URI validation unit tests
# ---------------------------------------------------------------------------
class TestValidateRedirectUri:
"""Unit tests for _validate_redirect_uri."""
def test_localhost_http(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("http://localhost:3000/callback") is True
def test_localhost_https(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("https://localhost:3000/callback") is True
def test_127_0_0_1(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("http://127.0.0.1:5000/cb") is True
def test_allowed_https(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("https://lovable.dev/api/callback") is True
def test_allowed_subdomain(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("https://api.bolt.new/callback") is True
def test_mcp_ucl_dev(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("https://mcp.ucl.dev/callback") is True
def test_rejected_unknown_domain(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("https://evil.com/steal") is False
def test_rejected_http_non_localhost(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("http://lovable.dev/callback") is False
def test_rejected_fragment(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("http://localhost:3000/cb#frag") is False
def test_rejected_no_scheme(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("localhost:3000/cb") is False
def test_rejected_userinfo(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("https://user:pass@lovable.dev/cb") is False
def test_empty_string(self):
from fastn_mcp.auth import _validate_redirect_uri
assert _validate_redirect_uri("") is False
# ---------------------------------------------------------------------------
# OAuth app integration tests
# ---------------------------------------------------------------------------
class TestOAuthAppCreation:
"""Tests for OAuth-enabled Starlette app creation."""
def test_auth_enabled_adds_oauth_routes(self):
"""create_starlette_app with auth_enabled=True adds OAuth routes."""
from fastn_mcp.server import create_starlette_app
from starlette.applications import Starlette
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
assert isinstance(app, Starlette)
route_paths = []
for route in app.routes:
if hasattr(route, "path"):
route_paths.append(route.path)
# OAuth routes should be present
assert "/.well-known/oauth-authorization-server" in route_paths
assert "/authorize" in route_paths
assert "/token" in route_paths
assert "/register" in route_paths
assert "/revoke" in route_paths
assert "/callback" in route_paths
# MCP transport routes should still be present
assert "/sse" in route_paths
def test_auth_disabled_no_oauth_routes(self):
"""create_starlette_app with auth_enabled=False has no OAuth routes."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app("sse", auth_enabled=False)
route_paths = []
for route in app.routes:
if hasattr(route, "path"):
route_paths.append(route.path)
assert "/.well-known/oauth-authorization-server" not in route_paths
assert "/authorize" not in route_paths
assert "/token" not in route_paths
assert "/callback" not in route_paths
def test_shttp_only_auth_enabled(self):
"""create_starlette_app with shttp-only + auth adds OAuth routes."""
from fastn_mcp.server import create_starlette_app
app = create_starlette_app(
"shttp-only",
auth_enabled=True,
server_url="http://localhost:8000",
)
route_paths = []
for route in app.routes:
if hasattr(route, "path"):
route_paths.append(route.path)
assert "/.well-known/oauth-authorization-server" in route_paths
assert "/shttp" in route_paths
class TestOAuthEndpoints:
"""Integration tests for OAuth HTTP endpoints."""
@pytest.mark.asyncio
async def test_metadata_endpoint(self):
"""/.well-known/oauth-authorization-server returns JSON metadata."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
response = await client.get("/.well-known/oauth-authorization-server")
assert response.status_code == 200
data = response.json()
assert "authorization_endpoint" in data
assert "token_endpoint" in data
# Pydantic's AnyHttpUrl normalizes to include trailing slash
assert data["issuer"].rstrip("/") == "http://localhost:8000"
@pytest.mark.asyncio
async def test_sse_requires_auth(self):
"""SSE endpoint returns 401 without Bearer token when auth is enabled."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
import anyio
try:
with anyio.fail_after(2):
response = await client.get("/sse")
assert response.status_code == 401
except TimeoutError:
# SSE may timeout before returning — if we get here without
# 401 being returned, the test should fail, but SSE auth
# middleware may respond with 401 before streaming starts
pass
@pytest.mark.asyncio
async def test_callback_missing_params(self):
"""/callback returns 400 when state or code is missing."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
response = await client.get("/callback")
assert response.status_code == 400
data = response.json()
assert data["error"] == "invalid_request"
@pytest.mark.asyncio
async def test_callback_invalid_state(self):
"""/callback returns 400 for unknown state parameter."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
response = await client.get("/callback?state=bogus&code=some-code")
assert response.status_code == 400
data = response.json()
assert data["error"] == "invalid_request"
@pytest.mark.asyncio
async def test_callback_keycloak_error(self):
"""/callback returns 400 when Keycloak sends an error."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
response = await client.get(
"/callback?error=access_denied&error_description=User+denied+access"
)
assert response.status_code == 400
data = response.json()
assert data["error"] == "access_denied"
@pytest.mark.asyncio
async def test_register_endpoint_accepts_clients(self):
"""/register endpoint accepts Dynamic Client Registration."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
response = await client.post(
"/register",
json={
"redirect_uris": ["http://localhost:3000/callback"],
"client_name": "Test MCP Client",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"token_endpoint_auth_method": "none",
},
)
assert response.status_code == 201
data = response.json()
assert "client_id" in data
@pytest.mark.asyncio
async def test_messages_requires_auth(self):
"""POST /messages/ returns 401 without Bearer token when auth is enabled."""
from fastn_mcp.server import create_starlette_app
from httpx import ASGITransport, AsyncClient
app = create_starlette_app(
"sse",
auth_enabled=True,
server_url="http://localhost:8000",
)
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://localhost:8000") as client:
response = await client.post(
"/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
)
assert response.status_code == 401
# ---------------------------------------------------------------------------
# Direct Keycloak JWT tests
# ---------------------------------------------------------------------------
import time
import jwt as pyjwt
from jwt.exceptions import PyJWKClientError
class TestDirectKeycloakJWT:
"""Tests for direct Keycloak Bearer token validation in load_access_token()."""
def _make_provider(self):
from fastn_mcp.auth import FastnOAuthProvider
return FastnOAuthProvider(server_url="http://localhost:8000")
@pytest.mark.asyncio
async def test_valid_jwt_returns_access_token(self):
"""Valid Keycloak JWT returns an AccessToken with correct claims."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value={
"azp": "fastn-sdk",
"sub": "user-123",
"scope": "openid profile email",
"exp": int(time.time()) + 3600,
})
result = await provider.load_access_token("valid-keycloak-jwt")
assert result is not None
assert result.token == "valid-keycloak-jwt"
assert result.client_id == "fastn-sdk"
assert "openid" in result.scopes
assert "profile" in result.scopes
assert "email" in result.scopes
@pytest.mark.asyncio
async def test_valid_jwt_is_cached(self):
"""Valid JWT is cached in _access_tokens — second call skips validation."""
provider = self._make_provider()
mock_validator = MagicMock()
mock_validator.validate = MagicMock(return_value={
"azp": "fastn-sdk",
"sub": "user-123",
"scope": "openid",
"exp": int(time.time()) + 3600,
})
provider._kc_jwt_validator = mock_validator
# First call — validates JWT
result1 = await provider.load_access_token("jwt-token")
assert result1 is not None
assert mock_validator.validate.call_count == 1
# Second call — should use cached value, no validation
result2 = await provider.load_access_token("jwt-token")
assert result2 is not None
assert result2.token == "jwt-token"
# Validator should NOT be called again (cached in _access_tokens)
assert mock_validator.validate.call_count == 1
@pytest.mark.asyncio
async def test_jwt_identity_mapping(self):
"""Direct JWT maps _keycloak_tokens[token] = token for _get_client()."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value={
"azp": "fastn-sdk",
"sub": "user-123",
"exp": int(time.time()) + 3600,
})
await provider.load_access_token("my-jwt-token")
# Identity mapping: the JWT IS the Keycloak token
assert provider._keycloak_tokens.get("my-jwt-token") == "my-jwt-token"
assert provider.get_keycloak_token("my-jwt-token") == "my-jwt-token"
@pytest.mark.asyncio
async def test_invalid_jwt_returns_none(self):
"""Invalid JWT returns None (validator returns None, refresh also fails)."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value=None)
with patch("fastn_mcp.auth.refresh_tokens", new_callable=AsyncMock, side_effect=httpx.HTTPStatusError("Bad Request", request=MagicMock(), response=MagicMock(status_code=400))):
result = await provider.load_access_token("garbage-token")
assert result is None
@pytest.mark.asyncio
async def test_mcp_token_takes_priority(self):
"""MCP OAuth token is returned without calling JWT validator."""
import time as time_mod
from mcp.server.auth.provider import AccessToken
provider = self._make_provider()
mock_validator = MagicMock()
mock_validator.validate = MagicMock(return_value=None)
provider._kc_jwt_validator = mock_validator
# Pre-store an MCP token
provider._access_tokens["mcp-token"] = AccessToken(
token="mcp-token",
client_id="test-client",
scopes=["read"],
expires_at=int(time_mod.time()) + 3600,
)
result = await provider.load_access_token("mcp-token")
assert result is not None
assert result.token == "mcp-token"
# JWT validator should NOT be called
mock_validator.validate.assert_not_called()
@pytest.mark.asyncio
async def test_jwt_without_azp_falls_back_to_sub(self):
"""JWT without azp claim falls back to sub for client_id."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value={
"sub": "user-456",
"scope": "openid",
"exp": int(time.time()) + 3600,
})
result = await provider.load_access_token("jwt-no-azp")
assert result is not None
assert result.client_id == "user-456"
@pytest.mark.asyncio
async def test_jwt_without_azp_or_sub_uses_default(self):
"""JWT without azp or sub uses 'keycloak-direct' as client_id."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value={
"scope": "openid",
"exp": int(time.time()) + 3600,
})
result = await provider.load_access_token("jwt-minimal")
assert result is not None
assert result.client_id == "keycloak-direct"
@pytest.mark.asyncio
async def test_expired_cached_jwt_is_rejected(self):
"""Expired cached JWT is rejected and cleaned up."""
import time as time_mod
from mcp.server.auth.provider import AccessToken
provider = self._make_provider()
mock_validator = MagicMock()
mock_validator.validate = MagicMock(return_value=None)
provider._kc_jwt_validator = mock_validator
# Pre-cache an expired JWT token
provider._access_tokens["expired-jwt"] = AccessToken(
token="expired-jwt",
client_id="fastn-sdk",
scopes=["openid"],
expires_at=int(time_mod.time()) - 100,
)
provider._keycloak_tokens["expired-jwt"] = "expired-jwt"
result = await provider.load_access_token("expired-jwt")
assert result is None
assert "expired-jwt" not in provider._access_tokens
assert "expired-jwt" not in provider._keycloak_tokens
@pytest.mark.asyncio
async def test_jwt_without_scope(self):
"""JWT without scope claim returns empty scopes list."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value={
"azp": "fastn-sdk",
"exp": int(time.time()) + 3600,
})
result = await provider.load_access_token("jwt-no-scope")
assert result is not None
assert result.scopes == []
class TestKeycloakJWTValidator:
"""Tests for the _KeycloakJWTValidator class."""
def test_expired_jwt_returns_none(self):
"""Expired JWT returns None."""
from fastn_mcp.auth import _KeycloakJWTValidator
validator = _KeycloakJWTValidator.__new__(_KeycloakJWTValidator)
validator._issuer = "https://live.fastn.ai/auth/realms/fastn"
mock_jwk_client = MagicMock()
mock_key = MagicMock()
mock_key.key = "fake-key"
mock_jwk_client.get_signing_key_from_jwt.return_value = mock_key
validator._jwk_client = mock_jwk_client
with patch("fastn_mcp.auth.pyjwt.decode", side_effect=pyjwt.ExpiredSignatureError("expired")):
result = validator.validate("expired-token")
assert result is None
def test_bad_signature_returns_none(self):
"""Bad signature returns None."""
from fastn_mcp.auth import _KeycloakJWTValidator
validator = _KeycloakJWTValidator.__new__(_KeycloakJWTValidator)
validator._issuer = "https://live.fastn.ai/auth/realms/fastn"
mock_jwk_client = MagicMock()
mock_key = MagicMock()
mock_key.key = "fake-key"
mock_jwk_client.get_signing_key_from_jwt.return_value = mock_key
validator._jwk_client = mock_jwk_client
with patch("fastn_mcp.auth.pyjwt.decode", side_effect=pyjwt.InvalidSignatureError("bad sig")):
result = validator.validate("bad-sig-token")
assert result is None
def test_jwks_fetch_failure_returns_none(self):
"""JWKS fetch failure returns None (graceful degradation)."""
from fastn_mcp.auth import _KeycloakJWTValidator
validator = _KeycloakJWTValidator.__new__(_KeycloakJWTValidator)
validator._issuer = "https://live.fastn.ai/auth/realms/fastn"
mock_jwk_client = MagicMock()
mock_jwk_client.get_signing_key_from_jwt.side_effect = PyJWKClientError("JWKS fetch failed")
validator._jwk_client = mock_jwk_client
result = validator.validate("some-token")
assert result is None
def test_valid_jwt_returns_claims(self):
"""Valid JWT returns decoded claims dict."""
from fastn_mcp.auth import _KeycloakJWTValidator
validator = _KeycloakJWTValidator.__new__(_KeycloakJWTValidator)
validator._issuer = "https://live.fastn.ai/auth/realms/fastn"
mock_jwk_client = MagicMock()
mock_key = MagicMock()
mock_key.key = "fake-key"
mock_jwk_client.get_signing_key_from_jwt.return_value = mock_key
validator._jwk_client = mock_jwk_client
expected_claims = {
"azp": "fastn-sdk",
"sub": "user-123",
"scope": "openid profile",
"exp": int(time.time()) + 3600,
"iss": "https://live.fastn.ai/auth/realms/fastn",
}
with patch("fastn_mcp.auth.pyjwt.decode", return_value=expected_claims):
result = validator.validate("valid-token")
assert result == expected_claims
class TestRefreshTokenExchange:
"""Tests for Keycloak refresh token exchange in load_access_token() Path 3."""
def _make_provider(self):
from fastn_mcp.auth import FastnOAuthProvider
return FastnOAuthProvider(server_url="http://localhost:8000")
def _make_token_response(self, access_token="fresh-access-jwt", refresh_token="new-refresh-token", expires_in=300):
from fastn_mcp.auth import TokenResponse
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=expires_in,
)
def _selective_validator(self, valid_jwt="fresh-access-jwt"):
"""Returns a validator side_effect that accepts the JWT but rejects opaque tokens."""
def validate(token):
if token == valid_jwt:
return {
"azp": "fastn-oauth",
"sub": "user-123",
"scope": "openid profile email",
"exp": int(time.time()) + 300,
}
return None
return validate
@pytest.mark.asyncio
async def test_refresh_token_exchange_success(self):
"""Opaque refresh token is exchanged for access token via Keycloak."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(
side_effect=self._selective_validator()
)
with patch("fastn_mcp.auth.refresh_tokens", new_callable=AsyncMock, return_value=self._make_token_response()):
result = await provider.load_access_token("opaque-refresh-token")
assert result is not None
assert result.token == "opaque-refresh-token"
assert result.client_id == "fastn-oauth"
assert "openid" in result.scopes
@pytest.mark.asyncio
async def test_refresh_token_cached_on_second_call(self):
"""Second call with same refresh token uses cache (no Keycloak call)."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(
side_effect=self._selective_validator()
)
mock_refresh = AsyncMock(return_value=self._make_token_response())
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result1 = await provider.load_access_token("my-refresh-token")
result2 = await provider.load_access_token("my-refresh-token")
assert result1 is not None
assert result2 is not None
assert result2.token == "my-refresh-token"
# refresh_tokens should only be called once (second call uses cache)
assert mock_refresh.call_count == 1
@pytest.mark.asyncio
async def test_refresh_token_keycloak_mapping(self):
"""get_keycloak_token() returns the fresh JWT, not the opaque refresh token."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(
side_effect=self._selective_validator()
)
with patch("fastn_mcp.auth.refresh_tokens", new_callable=AsyncMock, return_value=self._make_token_response()):
await provider.load_access_token("my-rt")
# The keycloak token should be the FRESH JWT, not the opaque refresh token
assert provider.get_keycloak_token("my-rt") == "fresh-access-jwt"
@pytest.mark.asyncio
async def test_refresh_token_re_refresh_on_expiry(self):
"""Expired cached token triggers re-refresh with stored refresh token."""
from mcp.server.auth.provider import AccessToken
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(
side_effect=self._selective_validator("newer-access-jwt")
)
# Simulate a previously successful exchange that has now expired
provider._access_tokens["my-rt"] = AccessToken(
token="my-rt",
client_id="fastn-oauth",
scopes=["openid"],
expires_at=int(time.time()) - 100, # expired
)
provider._keycloak_tokens["my-rt"] = "old-access-jwt"
provider._refresh_token_state["my-rt"] = "stored-refresh-token"
new_response = self._make_token_response(
access_token="newer-access-jwt",
refresh_token="even-newer-rt",
)
mock_refresh = AsyncMock(return_value=new_response)
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result = await provider.load_access_token("my-rt")
assert result is not None
assert result.token == "my-rt"
# Should have refreshed with the STORED refresh token and default client_id
mock_refresh.assert_called_once_with("stored-refresh-token", client_id="fastn-oauth")
# Mappings updated
assert provider.get_keycloak_token("my-rt") == "newer-access-jwt"
assert provider._refresh_token_state["my-rt"] == "even-newer-rt"
@pytest.mark.asyncio
async def test_refresh_token_expired_cleanup(self):
"""Failed re-refresh cleans up all state and returns None."""
from mcp.server.auth.provider import AccessToken
provider = self._make_provider()
# Simulate expired cached token with refresh state
provider._access_tokens["my-rt"] = AccessToken(
token="my-rt",
client_id="fastn-oauth",
scopes=["openid"],
expires_at=int(time.time()) - 100,
)
provider._keycloak_tokens["my-rt"] = "old-jwt"
provider._refresh_token_state["my-rt"] = "expired-refresh-token"
mock_refresh = AsyncMock(
side_effect=httpx.HTTPStatusError("Bad Request", request=MagicMock(), response=MagicMock(status_code=400))
)
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result = await provider.load_access_token("my-rt")
assert result is None
assert "my-rt" not in provider._access_tokens
assert "my-rt" not in provider._keycloak_tokens
assert "my-rt" not in provider._refresh_token_state
@pytest.mark.asyncio
async def test_refresh_token_network_error(self):
"""Network error returns None and adds to negative cache."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value=None)
mock_refresh = AsyncMock(
side_effect=httpx.RequestError("Connection refused", request=MagicMock())
)
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result = await provider.load_access_token("some-token")
assert result is None
assert "some-token" in provider._failed_refresh_tokens
@pytest.mark.asyncio
async def test_refresh_token_negative_cache(self):
"""Second call within 60s skips Keycloak call (negative cache)."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value=None)
mock_refresh = AsyncMock(
side_effect=httpx.HTTPStatusError("Bad Request", request=MagicMock(), response=MagicMock(status_code=400))
)
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result1 = await provider.load_access_token("bad-token")
result2 = await provider.load_access_token("bad-token")
assert result1 is None
assert result2 is None
# First call tries fastn-oauth + fastn-sdk (2 calls), second call hits negative cache
assert mock_refresh.call_count == 2
@pytest.mark.asyncio
async def test_invalid_token_not_jwt_not_refresh(self):
"""Garbage token: JWT validation fails, refresh exchange fails → None."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value=None)
with patch("fastn_mcp.auth.refresh_tokens", new_callable=AsyncMock, side_effect=httpx.HTTPStatusError("Bad Request", request=MagicMock(), response=MagicMock(status_code=400))):
result = await provider.load_access_token("total-garbage")
assert result is None
@pytest.mark.asyncio
async def test_mcp_token_priority_over_refresh(self):
"""Cached MCP token returned without calling JWT validator or refresh."""
from mcp.server.auth.provider import AccessToken
provider = self._make_provider()
mock_validator = MagicMock()
mock_validator.validate = MagicMock()
provider._kc_jwt_validator = mock_validator
provider._access_tokens["mcp-tok"] = AccessToken(
token="mcp-tok",
client_id="test-client",
scopes=["read"],
expires_at=int(time.time()) + 3600,
)
mock_refresh = AsyncMock()
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result = await provider.load_access_token("mcp-tok")
assert result is not None
assert result.token == "mcp-tok"
mock_validator.validate.assert_not_called()
mock_refresh.assert_not_called()
@pytest.mark.asyncio
async def test_jwt_priority_over_refresh(self):
"""Valid JWT handled by Path 2 — refresh_tokens never called."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value={
"azp": "fastn-sdk",
"sub": "user-123",
"scope": "openid",
"exp": int(time.time()) + 3600,
})
mock_refresh = AsyncMock()
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result = await provider.load_access_token("valid-jwt")
assert result is not None
assert result.client_id == "fastn-sdk"
mock_refresh.assert_not_called()
@pytest.mark.asyncio
async def test_sdk_token_fallback(self):
"""fastn-oauth fails → fastn-sdk succeeds → correct azp stored."""
from fastn_mcp.auth import KEYCLOAK_CLIENT_ID, KEYCLOAK_SDK_CLIENT_ID
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(
side_effect=self._selective_validator()
)
call_count = 0
async def mock_refresh(rt, client_id=KEYCLOAK_CLIENT_ID):
nonlocal call_count
call_count += 1
if client_id == KEYCLOAK_CLIENT_ID:
# fastn-oauth rejects — token was issued by fastn-sdk
raise httpx.HTTPStatusError(
"Bad Request", request=MagicMock(), response=MagicMock(status_code=400)
)
# fastn-sdk accepts
return self._make_token_response()
with patch("fastn_mcp.auth.refresh_tokens", side_effect=mock_refresh):
result = await provider.load_access_token("sdk-refresh-token")
assert result is not None
assert result.token == "sdk-refresh-token"
assert call_count == 2 # tried fastn-oauth, then fastn-sdk
# azp from the JWT is stored for future re-refreshes
assert provider._refresh_client_ids.get("sdk-refresh-token") == "fastn-oauth"
@pytest.mark.asyncio
async def test_remembered_azp_on_re_refresh(self):
"""Re-refresh uses stored azp, not the fallback loop."""
from mcp.server.auth.provider import AccessToken
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(
side_effect=self._selective_validator("newer-jwt")
)
# Simulate a previous successful exchange from fastn-sdk
provider._access_tokens["my-rt"] = AccessToken(
token="my-rt",
client_id="fastn-sdk",
scopes=["openid"],
expires_at=int(time.time()) - 100, # expired
)
provider._keycloak_tokens["my-rt"] = "old-jwt"
provider._refresh_token_state["my-rt"] = "stored-rt"
provider._refresh_client_ids["my-rt"] = "fastn-sdk"
new_response = self._make_token_response(
access_token="newer-jwt",
refresh_token="newer-rt",
)
mock_refresh = AsyncMock(return_value=new_response)
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result = await provider.load_access_token("my-rt")
assert result is not None
# Should have used fastn-sdk directly (not the fallback loop)
mock_refresh.assert_called_once_with("stored-rt", client_id="fastn-sdk")
@pytest.mark.asyncio
async def test_both_clients_fail(self):
"""Both client_ids fail → None + negative cache."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(return_value=None)
mock_refresh = AsyncMock(
side_effect=httpx.HTTPStatusError(
"Bad Request", request=MagicMock(), response=MagicMock(status_code=400)
)
)
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result = await provider.load_access_token("bad-rt")
assert result is None
assert "bad-rt" in provider._failed_refresh_tokens
# Called twice: once with fastn-oauth, once with fastn-sdk
assert mock_refresh.call_count == 2
@pytest.mark.asyncio
async def test_oauth_client_no_fallback(self):
"""fastn-oauth succeeds first try → fastn-sdk never called."""
provider = self._make_provider()
provider._kc_jwt_validator = MagicMock()
provider._kc_jwt_validator.validate = MagicMock(
side_effect=self._selective_validator()
)
mock_refresh = AsyncMock(return_value=self._make_token_response())
with patch("fastn_mcp.auth.refresh_tokens", mock_refresh):
result = await provider.load_access_token("oauth-rt")
assert result is not None
# Only called once — fastn-oauth worked, no fallback needed
assert mock_refresh.call_count == 1
mock_refresh.assert_called_once_with("oauth-rt", client_id="fastn-oauth")
# ---------------------------------------------------------------------------
# list_projects tests
# ---------------------------------------------------------------------------
class TestListProjects:
@pytest.mark.asyncio
async def test_success(self):
"""list_projects returns projects with total count."""
client = _mock_client(
projects_list=AsyncMock(return_value=[
{"id": "proj_001", "name": "My Project", "packageType": "free"},
{"id": "proj_002", "name": "Team Project", "packageType": "pro"},
]),
)
with patch("fastn_mcp.server._get_client", return_value=client):
result = await list_projects({})
data = _parse_result(result)
assert data["total"] == 2
assert len(data["projects"]) == 2
assert data["projects"][0]["id"] == "proj_001"
@pytest.mark.asyncio
async def test_auth_error_propagates(self):
"""list_projects propagates AuthError to centralized handler."""
client = _mock_client(
projects_list=AsyncMock(side_effect=AuthError("No auth token available.")),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(AuthError):
await list_projects({})
@pytest.mark.asyncio
async def test_fastn_error_propagates(self):
"""list_projects propagates FastnError to centralized handler."""
client = _mock_client(
projects_list=AsyncMock(side_effect=FastnError("Something broke")),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(FastnError):
await list_projects({})
@pytest.mark.asyncio
async def test_closes_client(self):
"""list_projects always closes the client even on error."""
client = _mock_client(
projects_list=AsyncMock(side_effect=AuthError("fail")),
)
with patch("fastn_mcp.server._get_client", return_value=client), \
pytest.raises(AuthError):
await list_projects({})
client.close.assert_awaited_once()
# ---------------------------------------------------------------------------
# Server mode tests
# ---------------------------------------------------------------------------
class TestParseModePath:
"""Tests for _parse_mode_from_path — URL sub-path → (mode, project_id, skill_id)."""
def test_root_returns_agent(self):
from fastn_mcp.server import _parse_mode_from_path
assert _parse_mode_from_path("/") == ("agent", None, None)
def test_empty_returns_agent(self):
from fastn_mcp.server import _parse_mode_from_path
assert _parse_mode_from_path("") == ("agent", None, None)
def test_ucl_returns_ucl_mode(self):
from fastn_mcp.server import _parse_mode_from_path
assert _parse_mode_from_path("/ucl") == ("ucl", None, None)
def test_ucl_with_project_id(self):
from fastn_mcp.server import _parse_mode_from_path
mode, pid, sid = _parse_mode_from_path("/ucl/c1653d47-abcd-1234-ef01-567890abcdef")
assert mode == "ucl"
assert pid == "c1653d47-abcd-1234-ef01-567890abcdef"
assert sid is None
def test_ucl_with_project_and_skill(self):
from fastn_mcp.server import _parse_mode_from_path
mode, pid, sid = _parse_mode_from_path("/ucl/c1653d47-abcd-1234-ef01-567890abcdef/a1b2c3d4-e5f6-7890-abcd-ef1234567890")
assert mode == "ucl"
assert pid == "c1653d47-abcd-1234-ef01-567890abcdef"
assert sid == "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
def test_unknown_path_returns_agent(self):
from fastn_mcp.server import _parse_mode_from_path
assert _parse_mode_from_path("/something") == ("agent", None, None)
class TestServerMode:
"""Tests for server mode — module globals (stdio) and per-mode servers (HTTP).
The stdio transport uses module-level _server_mode / _server_project_id.
HTTP transports use separate MCP Server instances per URL path
(created by _create_mcp_server), so mode filtering is baked in at
server creation time rather than resolved at runtime.
"""
def setup_method(self):
"""Reset server mode/project to defaults before each test."""
_server_mod._server_mode = "agent"
_server_mod._server_project_id = None
_server_mod._request_project_id.set(None)
@pytest.mark.asyncio
async def test_all_mode_returns_all_tools(self):
"""Default stdio mode returns every tool."""
tools = await handle_list_tools()
assert len(tools) == len(TOOLS)
@pytest.mark.asyncio
async def test_ucl_mode_via_module_global(self):
"""Module-level _server_mode='ucl' returns UCL tools (stdio)."""
_server_mod._server_mode = "ucl"
tools = await handle_list_tools()
names = {t.name for t in tools}
assert names == UCL_TOOL_NAMES
@pytest.mark.asyncio
async def test_ucl_mode_with_project_excludes_list_projects(self):
"""UCL mode with project_id excludes list_projects."""
_server_mod._server_mode = "ucl"
_server_mod._server_project_id = "proj_123"
tools = await handle_list_tools()
names = {t.name for t in tools}
assert names == UCL_TOOL_NAMES - {"list_projects"}
assert "list_projects" not in names
@pytest.mark.asyncio
async def test_ucl_mode_excludes_flow_tools(self):
"""In 'ucl' mode, flow tools are not returned."""
_server_mod._server_mode = "ucl"
tools = await handle_list_tools()
names = {t.name for t in tools}
agent_tools = {"list_flows", "run_flow", "delete_flow", "create_flow",
"update_flow", "configure_custom_auth"}
assert names.isdisjoint(agent_tools)
def test_get_client_uses_contextvar_project_id(self):
"""_get_client reads project_id from contextvar when not in arguments."""
token_p = _server_mod._request_project_id.set("ctx_project_123")
try:
from fastn_mcp.server import _get_client
with patch("fastn_mcp.server.AsyncFastnClient") as MockClient:
_get_client({"access_token": "tok"})
call_kwargs = MockClient.call_args[1]
assert call_kwargs["project_id"] == "ctx_project_123"
finally:
_server_mod._request_project_id.reset(token_p)
def test_get_client_argument_overrides_contextvar(self):
"""project_id from arguments takes precedence over contextvar."""
token_p = _server_mod._request_project_id.set("ctx_project")
try:
from fastn_mcp.server import _get_client
with patch("fastn_mcp.server.AsyncFastnClient") as MockClient:
_get_client({"access_token": "tok", "project_id": "arg_project"})
call_kwargs = MockClient.call_args[1]
assert call_kwargs["project_id"] == "arg_project"
finally:
_server_mod._request_project_id.reset(token_p)
def test_get_client_module_global_fallback(self):
"""_get_client falls back to module global when no contextvar or arg."""
_server_mod._server_project_id = "module_project"
from fastn_mcp.server import _get_client
with patch("fastn_mcp.server.AsyncFastnClient") as MockClient:
_get_client({"access_token": "tok"})
call_kwargs = MockClient.call_args[1]
assert call_kwargs["project_id"] == "module_project"
class TestPerModeServers:
"""Tests for per-mode MCP Server instances used by HTTP transports."""
def test_create_mcp_server_returns_server(self):
"""_create_mcp_server returns a valid Server instance."""
from fastn_mcp.server import _create_mcp_server
srv = _create_mcp_server(list(TOOLS))
assert srv is not None
@pytest.mark.asyncio
async def test_ucl_tools_correct(self):
"""UCL tool list contains exactly the expected tools."""
ucl_tools = [t for t in TOOLS if t.name in UCL_TOOL_NAMES]
names = {t.name for t in ucl_tools}
assert names == {"find_tools", "execute_tool", "list_connectors", "list_skills", "list_projects"}
@pytest.mark.asyncio
async def test_ucl_tools_no_proj_correct(self):
"""UCL-with-project tool list excludes list_projects."""
ucl_no_proj = [t for t in TOOLS if t.name in (UCL_TOOL_NAMES - {"list_projects"})]
names = {t.name for t in ucl_no_proj}
assert names == {"find_tools", "execute_tool", "list_connectors", "list_skills"}
assert "list_projects" not in names
@pytest.mark.asyncio
async def test_per_mode_server_call_tool_dispatches(self):
"""Per-mode servers share the same call_tool dispatch logic."""
from fastn_mcp.server import _create_mcp_server
ucl_tools = [t for t in TOOLS if t.name in UCL_TOOL_NAMES]
srv = _create_mcp_server(ucl_tools)
assert srv is not None
def test_ucl_tool_names_constant(self):
"""UCL_TOOL_NAMES contains exactly the expected tool names."""
assert UCL_TOOL_NAMES == {"find_tools", "execute_tool", "list_connectors", "list_skills", "list_projects"}