Datetime MCP Server

# TODO: Implement these tests later # """ # Integration tests for the datetime_mcp_server. # These tests verify that the server correctly implements the MCP protocol # by testing the full server lifecycle with mocked streams. # """ # import asyncio # import json # import anyio # from typing import Any, Dict, List, Optional, Tuple, cast # from typing import TYPE_CHECKING # import pytest # from pydantic import AnyUrl # from mcp.server.models import InitializationOptions # import mcp.types as types # from datetime_mcp_server.server import server, notes # from mcp.server import NotificationOptions, Server # from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream # if TYPE_CHECKING: # from _pytest.capture import CaptureFixture # from _pytest.fixtures import FixtureRequest # from _pytest.logging import LogCaptureFixture # from _pytest.monkeypatch import MonkeyPatch # from pytest_mock.plugin import MockerFixture # class MockStream: # """ # A mock stream for testing the server. # """ # def __init__(self) -> None: # """Initialize the mock stream.""" # self.read_buffer: List[bytes] = [] # self.write_buffer: List[bytes] = [] # print("MockStream initialized") # async def __aenter__(self) -> "MockStream": # """Enter the async context.""" # print("MockStream.__aenter__ called") # return self # async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: # """Exit the async context.""" # print("MockStream.__aexit__ called") # self.close() # def __aiter__(self) -> "MockStream": # """Return an async iterator.""" # print("MockStream.__aiter__ called") # return self # async def __anext__(self) -> types.JSONRPCMessage: # """ # Get the next item from the async iterator. # Returns: # A JSONRPCMessage object. # """ # print("MockStream.__anext__ called") # if not self.read_buffer: # raise StopAsyncIteration # data = self.read_buffer.pop(0) # print(f"MockStream.__anext__ returning: {data}") # # Parse the JSON data and convert to a JSONRPCMessage # json_data = json.loads(data.decode()) # # Check if this is a JSONRPCMessage with a root attribute # if isinstance(json_data, dict) and "root" in json_data: # # Extract the inner message # inner_data = json_data["root"] # # Convert to appropriate JSONRPCMessage type # if "result" in inner_data: # return types.JSONRPCResponse(**inner_data) # elif "method" in inner_data: # return types.JSONRPCRequest(**inner_data) # elif "error" in inner_data: # return types.JSONRPCErrorResponse(**inner_data) # else: # # Convert to appropriate JSONRPCMessage type directly # if "result" in json_data: # return types.JSONRPCResponse(**json_data) # elif "method" in json_data: # return types.JSONRPCRequest(**json_data) # elif "error" in json_data: # return types.JSONRPCErrorResponse(**json_data) # else: # raise ValueError(f"Unknown message type: {json_data}") # def feed_data(self, data: types.JSONRPCMessage) -> None: # """ # Feed data into the stream. # Args: # data: The data to feed into the stream. # """ # print(f"MockStream.feed_data called with: {data}") # # Convert the JSONRPCMessage to a JSON string and then to bytes # if isinstance(data, types.JSONRPCMessage): # # If it's already a JSONRPCMessage, convert it to a dict # data_dict = data.dict() # json_data = json.dumps(data_dict).encode() # else: # # If it's already serialized, just encode it # json_data = json.dumps(data).encode() # print(f"MockStream.feed_data: Adding to read_buffer: {json_data}") # self.read_buffer.append(json_data) # def write(self, data: bytes) -> None: # """ # Write data to the stream. # Args: # data: The data to write to the stream. # """ # print(f"MockStream.write called with data: {data}") # self.write_buffer.append(data) # def close(self) -> None: # """Close the stream.""" # print("MockStream.close called") # self.read_buffer = [] # self.write_buffer = [] # async def receive(self) -> types.JSONRPCMessage: # """ # Receive a message from the stream. # Returns: # A JSONRPCMessage object. # Raises: # ValueError: If there is no data in the write_buffer. # """ # print("MockStream.receive called") # # Check if there's data in the write_buffer # if not self.write_buffer: # print("No data in write_buffer, waiting...") # # Wait for data to arrive (up to 1 second) # for _ in range(10): # await asyncio.sleep(0.1) # if self.write_buffer: # break # else: # raise ValueError("No data in write_buffer after waiting") # # Get the first message from the write_buffer # data = self.write_buffer.pop(0) # print(f"MockStream.receive returning: {data}") # # Parse the JSON data and convert to a JSONRPCMessage # json_data = json.loads(data.decode()) # # Check if this is a JSONRPCMessage with a root attribute # if isinstance(json_data, dict) and "root" in json_data: # # Extract the inner message # inner_data = json_data["root"] # # Convert to appropriate JSONRPCMessage type # if "result" in inner_data: # return types.JSONRPCResponse(**inner_data) # elif "method" in inner_data: # return types.JSONRPCRequest(**inner_data) # elif "error" in inner_data: # return types.JSONRPCErrorResponse(**inner_data) # else: # # Convert to appropriate JSONRPCMessage type directly # if "result" in json_data: # return types.JSONRPCResponse(**json_data) # elif "method" in json_data: # return types.JSONRPCRequest(**json_data) # elif "error" in json_data: # return types.JSONRPCErrorResponse(**json_data) # else: # raise ValueError(f"Unknown message type: {json_data}") # @pytest.fixture # def reset_server_state() -> None: # """ # Reset the server state before each test. # This ensures tests don't affect each other by clearing the notes dictionary. # """ # # Clear all notes # notes.clear() # # Add some test notes for the tests # notes["test1"] = "This is a test note" # notes["test2"] = "This is another test note" # def create_request(method: str, params: Dict[str, Any]) -> types.JSONRPCMessage: # """ # Create a JSON-RPC request message. # Args: # method: The method to call. # params: The parameters to pass to the method. # Returns: # A JSON-RPC request message. # """ # if method == "initialize": # # Create a properly structured initialize request # init_params = types.InitializeRequestParams( # protocolVersion="0.1.0", # capabilities={}, # clientInfo=types.Implementation( # name="test-client", # version="1.0.0" # ) # ) # # Convert to dictionary for JSONRPCRequest # params_dict = init_params.model_dump() # return types.JSONRPCMessage( # types.JSONRPCRequest( # jsonrpc="2.0", # id="test-request-1", # Add a request ID # method="initialize", # params=params_dict # ) # ) # else: # # For other methods, create a generic request # return types.JSONRPCMessage( # types.JSONRPCRequest( # jsonrpc="2.0", # id="test-request-1", # Add a request ID # method=method, # params=params # ) # ) # async def get_response(stream: MemoryObjectReceiveStream[types.JSONRPCMessage]) -> Dict[str, Any]: # """ # Get a response from the stream and parse it. # Args: # stream: The memory object receive stream. # Returns: # The parsed JSON-RPC response. # """ # print(f"get_response called with stream: {stream}") # # Receive a message from the stream # message = await stream.receive() # print(f"get_response received message: {message}") # # Convert the message to a dictionary # if isinstance(message, types.ServerResult): # return message.dict() # return {"error": "Unexpected message type"} # @pytest.mark.asyncio # async def test_server_initialization(reset_server_state: None) -> None: # """ # Test that the server initializes correctly. # Args: # reset_server_state: Fixture to reset the server state before the test. # """ # # Create memory object streams for the server # send_channel, receive_channel = anyio.create_memory_object_stream[types.JSONRPCMessage | Exception](max_buffer_size=10) # response_send_channel, response_receive_channel = anyio.create_memory_object_stream[types.JSONRPCMessage](max_buffer_size=10) # # Prepare the initialization request # init_request = create_request("initialize", {}) # # Send the request to the server # await send_channel.send(init_request) # # Run the server # server_task = asyncio.create_task( # server.run( # receive_channel, # response_send_channel, # InitializationOptions( # server_name="datetime-mcp-server", # server_version="0.1.0", # capabilities=server.get_capabilities( # notification_options=NotificationOptions(), # experimental_capabilities={}, # ), # ), # ) # ) # # Add a small delay to give the server time to process the request # await asyncio.sleep(0.5) # # Get the initialization response # response = await get_response(response_receive_channel) # # Verify the response # assert response is not None # # Clean up # server_task.cancel() # try: # await server_task # except asyncio.CancelledError: # pass # @pytest.mark.asyncio # async def test_list_resources_protocol(reset_server_state: None) -> None: # """ # Test that the server correctly handles listResources requests. # Args: # reset_server_state: Fixture to reset the server state before the test. # """ # input_stream = MockStream() # output_stream = MockStream() # # Initialize the server # init_request = create_request("initialize", { # "capabilities": {}, # "serverInfo": {"name": "test-client", "version": "1.0.0"} # }) # input_stream.feed_data(init_request) # # Run the server # server_task = asyncio.create_task( # server.run( # input_stream, # output_stream, # InitializationOptions( # server_name="datetime-mcp-server", # server_version="0.1.0", # capabilities=server.get_capabilities( # notification_options=NotificationOptions(), # experimental_capabilities={}, # ), # ), # ) # ) # # Skip the initialization response # await get_response(output_stream) # # Send a listResources request # input_stream.feed_data(create_request("resources/list", {})) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "result" in response # assert "resources" in response["result"] # assert len(response["result"]["resources"]) >= 5 # 2 notes + 3 datetime resources # # Verify we have both types of resources # resource_types = set() # for resource in response["result"]["resources"]: # uri = resource["uri"] # if uri.startswith("datetime://"): # resource_types.add("datetime") # elif uri.startswith("note://"): # resource_types.add("note") # assert "datetime" in resource_types # assert "note" in resource_types # # Shutdown the server # input_stream.feed_data(create_request("shutdown", {})) # await asyncio.sleep(0.1) # # Exit the server # input_stream.feed_data(create_request("exit", {})) # await asyncio.sleep(0.1) # await server_task # @pytest.mark.asyncio # async def test_read_resource_protocol(reset_server_state: None) -> None: # """ # Test that the server correctly handles resources/read requests. # Args: # reset_server_state: Fixture to reset the server state before the test. # """ # input_stream = MockStream() # output_stream = MockStream() # # Initialize the server # init_request = create_request("initialize", { # "capabilities": {}, # "serverInfo": {"name": "test-client", "version": "1.0.0"} # }) # input_stream.feed_data(init_request) # # Run the server # server_task = asyncio.create_task( # server.run( # input_stream, # output_stream, # InitializationOptions( # server_name="datetime-mcp-server", # server_version="0.1.0", # capabilities=server.get_capabilities( # notification_options=NotificationOptions(), # experimental_capabilities={}, # ), # ), # ) # ) # # Skip the initialization response # await get_response(output_stream) # # Send a resources/read request for a note # input_stream.feed_data(create_request("resources/read", { # "uri": "note://internal/test1" # })) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "result" in response # assert response["result"]["content"] == "This is a test note" # # Send a resources/read request for a datetime resource # input_stream.feed_data(create_request("resources/read", { # "uri": "datetime://today" # })) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "result" in response # assert "content" in response["result"] # # Verify date format (YYYY-MM-DD) # date_str = response["result"]["content"] # date_parts = date_str.split("-") # assert len(date_parts) == 3 # assert len(date_parts[0]) == 4 # Year (YYYY) # assert len(date_parts[1]) == 2 # Month (MM) # assert len(date_parts[2]) == 2 # Day (DD) # # Shutdown the server # input_stream.feed_data(create_request("shutdown", {})) # await asyncio.sleep(0.1) # # Exit the server # input_stream.feed_data(create_request("exit", {})) # await asyncio.sleep(0.1) # await server_task # @pytest.mark.asyncio # async def test_tools_protocol(reset_server_state: None) -> None: # """ # Test that the server correctly handles tools-related requests. # Args: # reset_server_state: Fixture to reset the server state before the test. # """ # input_stream = MockStream() # output_stream = MockStream() # # Initialize the server # init_request = create_request("initialize", { # "capabilities": {}, # "serverInfo": {"name": "test-client", "version": "1.0.0"} # }) # input_stream.feed_data(init_request) # # Run the server # server_task = asyncio.create_task( # server.run( # input_stream, # output_stream, # InitializationOptions( # server_name="datetime-mcp-server", # server_version="0.1.0", # capabilities=server.get_capabilities( # notification_options=NotificationOptions(), # experimental_capabilities={}, # ), # ), # ) # ) # # Skip the initialization response # await get_response(output_stream) # # Send a tools/list request # input_stream.feed_data(create_request("tools/list", {})) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "result" in response # assert "tools" in response["result"] # # Verify we have the expected tools # tool_names = [tool["name"] for tool in response["result"]["tools"]] # assert "add-note" in tool_names # assert "get-current-time" in tool_names # assert "format-date" in tool_names # # Send a tools/call request for add-note # input_stream.feed_data(create_request("tools/call", { # "name": "add-note", # "arguments": { # "name": "integration-test", # "content": "This is a test from integration test" # } # })) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "result" in response # assert "results" in response["result"] # assert len(response["result"]["results"]) == 1 # assert response["result"]["results"][0]["type"] == "text" # assert "Added note" in response["result"]["results"][0]["text"] # # Check that we also got a notification about resources changing # notification = await get_response(output_stream) # assert notification["jsonrpc"] == "2.0" # assert "id" not in notification # assert notification["method"] == "resources/listChanged" # # Verify the note was added by sending a resources/read request # input_stream.feed_data(create_request("resources/read", { # "uri": "note://internal/integration-test" # })) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "result" in response # assert response["result"]["content"] == "This is a test from integration test" # # Test the get-current-time tool # input_stream.feed_data(create_request("tools/call", { # "name": "get-current-time", # "arguments": { # "format": "iso" # } # })) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "result" in response # assert "results" in response["result"] # assert len(response["result"]["results"]) == 1 # assert response["result"]["results"][0]["type"] == "text" # # Shutdown the server # input_stream.feed_data(create_request("shutdown", {})) # await asyncio.sleep(0.1) # # Exit the server # input_stream.feed_data(create_request("exit", {})) # await asyncio.sleep(0.1) # await server_task # @pytest.mark.asyncio # async def test_prompts_protocol(reset_server_state: None) -> None: # """ # Test that the server correctly handles prompts-related requests. # Args: # reset_server_state: Fixture to reset the server state before the test. # """ # input_stream = MockStream() # output_stream = MockStream() # # Initialize the server # init_request = create_request("initialize", { # "capabilities": {}, # "serverInfo": {"name": "test-client", "version": "1.0.0"} # }) # input_stream.feed_data(init_request) # # Run the server # server_task = asyncio.create_task( # server.run( # input_stream, # output_stream, # InitializationOptions( # server_name="datetime-mcp-server", # server_version="0.1.0", # capabilities=server.get_capabilities( # notification_options=NotificationOptions(), # experimental_capabilities={}, # ), # ), # ) # ) # # Skip the initialization response # await get_response(output_stream) # # Send a prompts/list request # input_stream.feed_data(create_request("prompts/list", {})) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "result" in response # assert "prompts" in response["result"] # # Verify we have the expected prompts # prompt_names = [prompt["name"] for prompt in response["result"]["prompts"]] # assert "summarize-notes" in prompt_names # assert "schedule-event" in prompt_names # # Send a prompts/get request for schedule-event # input_stream.feed_data(create_request("prompts/get", { # "name": "schedule-event", # "arguments": { # "event": "Meeting", # "time": "14:30" # } # })) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "result" in response # assert "description" in response["result"] # assert "messages" in response["result"] # assert len(response["result"]["messages"]) == 1 # assert response["result"]["messages"][0]["role"] == "user" # assert response["result"]["messages"][0]["content"]["type"] == "text" # assert "Meeting" in response["result"]["messages"][0]["content"]["text"] # assert "14:30" in response["result"]["messages"][0]["content"]["text"] # # Shutdown the server # input_stream.feed_data(create_request("shutdown", {})) # await asyncio.sleep(0.1) # # Exit the server # input_stream.feed_data(create_request("exit", {})) # await asyncio.sleep(0.1) # await server_task # @pytest.mark.asyncio # async def test_error_handling_protocol(reset_server_state: None) -> None: # """ # Test that the server correctly handles error conditions in the protocol. # Args: # reset_server_state: Fixture to reset the server state before the test. # """ # input_stream = MockStream() # output_stream = MockStream() # # Initialize the server # init_request = create_request("initialize", { # "capabilities": {}, # "serverInfo": {"name": "test-client", "version": "1.0.0"} # }) # input_stream.feed_data(init_request) # # Run the server # server_task = asyncio.create_task( # server.run( # input_stream, # output_stream, # InitializationOptions( # server_name="datetime-mcp-server", # server_version="0.1.0", # capabilities=server.get_capabilities( # notification_options=NotificationOptions(), # experimental_capabilities={}, # ), # ), # ) # ) # # Skip the initialization response # await get_response(output_stream) # # Send a resources/read request for a non-existent resource # input_stream.feed_data(create_request("resources/read", { # "uri": "note://internal/nonexistent" # })) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "error" in response # assert "message" in response["error"] # assert "Note not found: nonexistent" in response["error"]["message"] # # Send a tools/call request for a non-existent tool # input_stream.feed_data(create_request("tools/call", { # "name": "nonexistent-tool", # "arguments": {} # })) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "error" in response # assert "message" in response["error"] # assert "Unknown tool: nonexistent-tool" in response["error"]["message"] # # Send a prompts/get request for a non-existent prompt # input_stream.feed_data(create_request("prompts/get", { # "name": "nonexistent-prompt", # "arguments": {} # })) # # Get the response # response = await get_response(output_stream) # # Check the response # assert response["jsonrpc"] == "2.0" # assert response["id"] == 1 # assert "error" in response # assert "message" in response["error"] # assert "Unknown prompt: nonexistent-prompt" in response["error"]["message"] # # Shutdown the server # input_stream.feed_data(create_request("shutdown", {})) # await asyncio.sleep(0.1) # # Exit the server # input_stream.feed_data(create_request("exit", {})) # await asyncio.sleep(0.1) # await server_task