basetransport.pyx•9.55 kB
cdef class UVBaseTransport(UVSocketHandle):
    def __cinit__(self):
        # Flow control
        self._high_water = FLOW_CONTROL_HIGH_WATER * 1024
        self._low_water = FLOW_CONTROL_HIGH_WATER // 4
        self._protocol = None
        self._protocol_connected = 0
        self._protocol_paused = 0
        self._protocol_data_received = None
        self._server = None
        self._waiter = None
        self._extra_info = None
        self._conn_lost = 0
        self._closing = 0
    cdef size_t _get_write_buffer_size(self):
        return 0
    cdef inline _schedule_call_connection_made(self):
        self._loop._call_soon_handle(
            new_MethodHandle(self._loop,
                             "UVTransport._call_connection_made",
                             <method_t>self._call_connection_made,
                             self.context,
                             self))
    cdef inline _schedule_call_connection_lost(self, exc):
        self._loop._call_soon_handle(
            new_MethodHandle1(self._loop,
                              "UVTransport._call_connection_lost",
                              <method1_t>self._call_connection_lost,
                              self.context,
                              self, exc))
    cdef _fatal_error(self, exc, throw, reason=None):
        # Overload UVHandle._fatal_error
        self._force_close(exc)
        if not isinstance(exc, OSError):
            if throw or self._loop is None:
                raise exc
            msg = f'Fatal error on transport {self.__class__.__name__}'
            if reason is not None:
                msg = f'{msg} ({reason})'
            self._loop.call_exception_handler({
                'message': msg,
                'exception': exc,
                'transport': self,
                'protocol': self._protocol,
            })
    cdef inline _maybe_pause_protocol(self):
        cdef:
            size_t size = self._get_write_buffer_size()
        if size <= self._high_water:
            return
        if not self._protocol_paused:
            self._protocol_paused = 1
            try:
                # _maybe_pause_protocol() is always triggered from user-calls,
                # so we must copy the context to avoid entering context twice
                run_in_context(
                    self.context.copy(), self._protocol.pause_writing,
                )
            except (KeyboardInterrupt, SystemExit):
                raise
            except BaseException as exc:
                self._loop.call_exception_handler({
                    'message': 'protocol.pause_writing() failed',
                    'exception': exc,
                    'transport': self,
                    'protocol': self._protocol,
                })
    cdef inline _maybe_resume_protocol(self):
        cdef:
            size_t size = self._get_write_buffer_size()
        if self._protocol_paused and size <= self._low_water:
            self._protocol_paused = 0
            try:
                # We're copying the context to avoid entering context twice,
                # even though it's not always necessary to copy - it's easier
                # to copy here than passing down a copied context.
                run_in_context(
                    self.context.copy(), self._protocol.resume_writing,
                )
            except (KeyboardInterrupt, SystemExit):
                raise
            except BaseException as exc:
                self._loop.call_exception_handler({
                    'message': 'protocol.resume_writing() failed',
                    'exception': exc,
                    'transport': self,
                    'protocol': self._protocol,
                })
    cdef _wakeup_waiter(self):
        if self._waiter is not None:
            if not self._waiter.cancelled():
                if not self._is_alive():
                    self._waiter.set_exception(
                        RuntimeError(
                            'closed Transport handle and unset waiter'))
                else:
                    self._waiter.set_result(True)
            self._waiter = None
    cdef _call_connection_made(self):
        if self._protocol is None:
            raise RuntimeError(
                'protocol is not set, cannot call connection_made()')
        # We use `_is_alive()` and not `_closing`, because we call
        # `transport._close()` in `loop.create_connection()` if an
        # exception happens during `await waiter`.
        if not self._is_alive():
            # A connection waiter can be cancelled between
            # 'await loop.create_connection()' and
            # `_schedule_call_connection_made` and
            # the actual `_call_connection_made`.
            self._wakeup_waiter()
            return
        # Set _protocol_connected to 1 before calling "connection_made":
        # if transport is aborted or closed, "connection_lost" will
        # still be scheduled.
        self._protocol_connected = 1
        try:
            self._protocol.connection_made(self)
        except BaseException:
            self._wakeup_waiter()
            raise
        if not self._is_alive():
            # This might happen when "transport.abort()" is called
            # from "Protocol.connection_made".
            self._wakeup_waiter()
            return
        self._start_reading()
        self._wakeup_waiter()
    cdef _call_connection_lost(self, exc):
        if self._waiter is not None:
            if not self._waiter.done():
                self._waiter.set_exception(exc)
            self._waiter = None
        if self._closed:
            # The handle is closed -- likely, _call_connection_lost
            # was already called before.
            return
        try:
            if self._protocol_connected:
                self._protocol.connection_lost(exc)
        finally:
            self._clear_protocol()
            self._close()
            server = self._server
            if server is not None:
                (<Server>server)._detach()
                self._server = None
    cdef inline _set_server(self, Server server):
        self._server = server
        (<Server>server)._attach()
    cdef inline _set_waiter(self, object waiter):
        if waiter is not None and not isfuture(waiter):
            raise TypeError(
                f'invalid waiter object {waiter!r}, expected asyncio.Future')
        self._waiter = waiter
    cdef _set_protocol(self, object protocol):
        self._protocol = protocol
        # Store a reference to the bound method directly
        try:
            self._protocol_data_received = protocol.data_received
        except AttributeError:
            pass
    cdef _clear_protocol(self):
        self._protocol = None
        self._protocol_data_received = None
    cdef inline _init_protocol(self):
        self._loop._track_transport(self)
        if self._protocol is None:
            raise RuntimeError('invalid _init_protocol call')
        self._schedule_call_connection_made()
    cdef inline _add_extra_info(self, str name, object obj):
        if self._extra_info is None:
            self._extra_info = {}
        self._extra_info[name] = obj
    cdef bint _is_reading(self):
        raise NotImplementedError
    cdef _start_reading(self):
        raise NotImplementedError
    cdef _stop_reading(self):
        raise NotImplementedError
    # === Public API ===
    property _paused:
        # Used by SSLProto.  Might be removed in the future.
        def __get__(self):
            return bool(not self._is_reading())
    def get_protocol(self):
        return self._protocol
    def set_protocol(self, protocol):
        self._set_protocol(protocol)
        if self._is_reading():
            self._stop_reading()
            self._start_reading()
    def _force_close(self, exc):
        # Used by SSLProto.  Might be removed in the future.
        if self._conn_lost or self._closed:
            return
        if not self._closing:
            self._closing = 1
            self._stop_reading()
        self._conn_lost += 1
        self._schedule_call_connection_lost(exc)
    def abort(self):
        self._force_close(None)
    def close(self):
        if self._closing or self._closed:
            return
        self._closing = 1
        self._stop_reading()
        if not self._get_write_buffer_size():
            # The write buffer is empty
            self._conn_lost += 1
            self._schedule_call_connection_lost(None)
    def is_closing(self):
        return self._closing
    def get_write_buffer_size(self):
        return self._get_write_buffer_size()
    def set_write_buffer_limits(self, high=None, low=None):
        self._ensure_alive()
        self._high_water, self._low_water = add_flowcontrol_defaults(
            high, low, FLOW_CONTROL_HIGH_WATER)
        self._maybe_pause_protocol()
    def get_write_buffer_limits(self):
        return (self._low_water, self._high_water)
    def get_extra_info(self, name, default=None):
        if self._extra_info is not None and name in self._extra_info:
            return self._extra_info[name]
        if name == 'socket':
            return self._get_socket()
        if name == 'sockname':
            return self._get_socket().getsockname()
        if name == 'peername':
            try:
                return self._get_socket().getpeername()
            except socket_error:
                return default
        return default