test_catalogue.py•1.1 kB
import json
from unittest import mock
import grpc
import pytest
from src.adapters import load_adapters
from src.server import MCPServicer
from src.generated import mcp_pb2
@pytest.fixture(scope="module")
def servicer():
return MCPServicer()
def test_catalogue_contains_adapters(servicer):
resp = servicer.ListMethods(mcp_pb2.ListMethodsRequest(), grpc.ServicerContext()) # type: ignore[arg-type]
names = {m.fq_name for m in resp.methods}
adapters = load_adapters()
for adapter in adapters.values():
for fq in list(adapter.methods)[:5]: # sample a few
assert fq in names
def test_invoke_with_mock(servicer):
adapters = load_adapters()
k8s_adapter = adapters["kubernetes"]
# choose a method and patch it
target = next(iter(k8s_adapter.methods))
with mock.patch.dict(k8s_adapter.methods, {target: lambda: "ok"}):
req = mcp_pb2.InvokeRequest(fq_name=target, json_args=json.dumps({}))
resp = servicer.Invoke(req, grpc.ServicerContext()) # type: ignore[arg-type]
assert json.loads(resp.json_result) == "ok"