privateGPT MCP Server

# messages/send_message.py import logging import anyio from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from .message_types.json_rpc_message import JSONRPCMessage async def send_message( read_stream: MemoryObjectReceiveStream, write_stream: MemoryObjectSendStream, message: JSONRPCMessage, timeout: float = 5, retries: int = 3, ) -> dict: """ Send a JSON-RPC message to the server and return the response. Args: read_stream (MemoryObjectReceiveStream): The stream to read responses. write_stream (MemoryObjectSendStream): The stream to send requests. message (JSONRPCMessage): The JSON-RPC message to send. timeout (float): Timeout in seconds to wait for a response. retries (int): Number of retry attempts. Returns: dict: The server's response as a dictionary. Raises: TimeoutError: If no response is received within the timeout. Exception: If an unexpected error occurs. """ for attempt in range(1, retries + 1): try: logging.debug(f"Attempt {attempt}/{retries}: Sending message: {message}") await write_stream.send(message) with anyio.fail_after(timeout): async for response in read_stream: if not isinstance(response, Exception): logging.debug(f"Received response: {response.model_dump()}") return response.model_dump() else: logging.error(f"Server error: {response}") raise response except TimeoutError: logging.error( f"Timeout waiting for response to message '{message.method}' (Attempt {attempt}/{retries})" ) if attempt == retries: raise except Exception as e: logging.error( f"Unexpected error during '{message.method}' request: {e} (Attempt {attempt}/{retries})" ) if attempt == retries: raise await anyio.sleep(2)