send_message.py•2.13 kB
# messages/send_message.py
import logging
import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from .message_types.json_rpc_message import JSONRPCMessage
async def send_message(
    read_stream: MemoryObjectReceiveStream,
    write_stream: MemoryObjectSendStream,
    message: JSONRPCMessage,
    timeout: float = 5,
    retries: int = 3,
) -> dict:
    """
    Send a JSON-RPC message to the server and return the response.
    Args:
        read_stream (MemoryObjectReceiveStream): The stream to read responses.
        write_stream (MemoryObjectSendStream): The stream to send requests.
        message (JSONRPCMessage): The JSON-RPC message to send.
        timeout (float): Timeout in seconds to wait for a response.
        retries (int): Number of retry attempts.
    Returns:
        dict: The server's response as a dictionary.
    Raises:
        TimeoutError: If no response is received within the timeout.
        Exception: If an unexpected error occurs.
    """
    for attempt in range(1, retries + 1):
        try:
            logging.debug(f"Attempt {attempt}/{retries}: Sending message: {message}")
            await write_stream.send(message)
            with anyio.fail_after(timeout):
                async for response in read_stream:
                    if not isinstance(response, Exception):
                        logging.debug(f"Received response: {response.model_dump()}")
                        return response.model_dump()
                    else:
                        logging.error(f"Server error: {response}")
                        raise response
        except TimeoutError:
            logging.error(
                f"Timeout waiting for response to message '{message.method}' (Attempt {attempt}/{retries})"
            )
            if attempt == retries:
                raise
        except Exception as e:
            logging.error(
                f"Unexpected error during '{message.method}' request: {e} (Attempt {attempt}/{retries})"
            )
            if attempt == retries:
                raise
        await anyio.sleep(2)