Optimized Memory MCP Server V2

from datetime import timedelta from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from pydantic import AnyUrl import mcp.types as types from mcp.shared.session import BaseSession from mcp.shared.version import SUPPORTED_PROTOCOL_VERSIONS class ClientSession( BaseSession[ types.ClientRequest, types.ClientNotification, types.ClientResult, types.ServerRequest, types.ServerNotification, ] ): def __init__( self, read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception], write_stream: MemoryObjectSendStream[types.JSONRPCMessage], read_timeout_seconds: timedelta | None = None, ) -> None: super().__init__( read_stream, write_stream, types.ServerRequest, types.ServerNotification, read_timeout_seconds=read_timeout_seconds, ) async def initialize(self) -> types.InitializeResult: result = await self.send_request( types.ClientRequest( types.InitializeRequest( method="initialize", params=types.InitializeRequestParams( protocolVersion=types.LATEST_PROTOCOL_VERSION, capabilities=types.ClientCapabilities( sampling=None, experimental=None, roots=types.RootsCapability( # TODO: Should this be based on whether we # _will_ send notifications, or only whether # they're supported? listChanged=True ), ), clientInfo=types.Implementation(name="mcp", version="0.1.0"), ), ) ), types.InitializeResult, ) if result.protocolVersion not in SUPPORTED_PROTOCOL_VERSIONS: raise RuntimeError( "Unsupported protocol version from the server: " f"{result.protocolVersion}" ) await self.send_notification( types.ClientNotification( types.InitializedNotification(method="notifications/initialized") ) ) return result async def send_ping(self) -> types.EmptyResult: """Send a ping request.""" return await self.send_request( types.ClientRequest( types.PingRequest( method="ping", ) ), types.EmptyResult, ) async def send_progress_notification( self, progress_token: str | int, progress: float, total: float | None = None ) -> None: """Send a progress notification.""" await self.send_notification( types.ClientNotification( types.ProgressNotification( method="notifications/progress", params=types.ProgressNotificationParams( progressToken=progress_token, progress=progress, total=total, ), ), ) ) async def set_logging_level(self, level: types.LoggingLevel) -> types.EmptyResult: """Send a logging/setLevel request.""" return await self.send_request( types.ClientRequest( types.SetLevelRequest( method="logging/setLevel", params=types.SetLevelRequestParams(level=level), ) ), types.EmptyResult, ) async def list_resources(self) -> types.ListResourcesResult: """Send a resources/list request.""" return await self.send_request( types.ClientRequest( types.ListResourcesRequest( method="resources/list", ) ), types.ListResourcesResult, ) async def read_resource(self, uri: AnyUrl) -> types.ReadResourceResult: """Send a resources/read request.""" return await self.send_request( types.ClientRequest( types.ReadResourceRequest( method="resources/read", params=types.ReadResourceRequestParams(uri=uri), ) ), types.ReadResourceResult, ) async def subscribe_resource(self, uri: AnyUrl) -> types.EmptyResult: """Send a resources/subscribe request.""" return await self.send_request( types.ClientRequest( types.SubscribeRequest( method="resources/subscribe", params=types.SubscribeRequestParams(uri=uri), ) ), types.EmptyResult, ) async def unsubscribe_resource(self, uri: AnyUrl) -> types.EmptyResult: """Send a resources/unsubscribe request.""" return await self.send_request( types.ClientRequest( types.UnsubscribeRequest( method="resources/unsubscribe", params=types.UnsubscribeRequestParams(uri=uri), ) ), types.EmptyResult, ) async def call_tool( self, name: str, arguments: dict | None = None ) -> types.CallToolResult: """Send a tools/call request.""" return await self.send_request( types.ClientRequest( types.CallToolRequest( method="tools/call", params=types.CallToolRequestParams(name=name, arguments=arguments), ) ), types.CallToolResult, ) async def list_prompts(self) -> types.ListPromptsResult: """Send a prompts/list request.""" return await self.send_request( types.ClientRequest( types.ListPromptsRequest( method="prompts/list", ) ), types.ListPromptsResult, ) async def get_prompt( self, name: str, arguments: dict[str, str] | None = None ) -> types.GetPromptResult: """Send a prompts/get request.""" return await self.send_request( types.ClientRequest( types.GetPromptRequest( method="prompts/get", params=types.GetPromptRequestParams(name=name, arguments=arguments), ) ), types.GetPromptResult, ) async def complete( self, ref: types.ResourceReference | types.PromptReference, argument: dict ) -> types.CompleteResult: """Send a completion/complete request.""" return await self.send_request( types.ClientRequest( types.CompleteRequest( method="completion/complete", params=types.CompleteRequestParams( ref=ref, argument=types.CompletionArgument(**argument), ), ) ), types.CompleteResult, ) async def list_tools(self) -> types.ListToolsResult: """Send a tools/list request.""" return await self.send_request( types.ClientRequest( types.ListToolsRequest( method="tools/list", ) ), types.ListToolsResult, ) async def send_roots_list_changed(self) -> None: """Send a roots/list_changed notification.""" await self.send_notification( types.ClientNotification( types.RootsListChangedNotification( method="notifications/roots/list_changed", ) ) )