process.pyx•26.9 kB
@cython.no_gc_clear
cdef class UVProcess(UVHandle):
    """Abstract class; wrapper over uv_process_t handle."""
    def __cinit__(self):
        self.uv_opt_env = NULL
        self.uv_opt_args = NULL
        self._returncode = None
        self._pid = None
        self._fds_to_close = list()
        self._preexec_fn = None
        self._restore_signals = True
        self.context = Context_CopyCurrent()
    cdef _close_process_handle(self):
        # XXX: This is a workaround for a libuv bug:
        # - https://github.com/libuv/libuv/issues/1933
        # - https://github.com/libuv/libuv/pull/551
        if self._handle is NULL:
            return
        self._handle.data = NULL
        uv.uv_close(self._handle, __uv_close_process_handle_cb)
        self._handle = NULL  # close callback will free() the memory
    cdef _init(self, Loop loop, list args, dict env,
               cwd, start_new_session,
               _stdin, _stdout, _stderr,  # std* can be defined as macros in C
               pass_fds, debug_flags, preexec_fn, restore_signals):
        global __forking
        global __forking_loop
        global __forkHandler
        cdef int err
        self._start_init(loop)
        self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(
            sizeof(uv.uv_process_t))
        if self._handle is NULL:
            self._abort_init()
            raise MemoryError()
        # Too early to call _finish_init, but still a lot of work to do.
        # Let's set handle.data to NULL, so in case something goes wrong,
        # callbacks have a chance to avoid casting *something* into UVHandle.
        self._handle.data = NULL
        force_fork = False
        if system.PLATFORM_IS_APPLE and not (
            preexec_fn is None
            and not pass_fds
        ):
            # see _execute_child() in CPython/subprocess.py
            force_fork = True
        try:
            self._init_options(args, env, cwd, start_new_session,
                               _stdin, _stdout, _stderr, force_fork)
            restore_inheritable = set()
            if pass_fds:
                for fd in pass_fds:
                    if not os_get_inheritable(fd):
                        restore_inheritable.add(fd)
                        os_set_inheritable(fd, True)
        except Exception:
            self._abort_init()
            raise
        if __forking or loop.active_process_handler is not None:
            # Our pthread_atfork handlers won't work correctly when
            # another loop is forking in another thread (even though
            # GIL should help us to avoid that.)
            self._abort_init()
            raise RuntimeError(
                'Racing with another loop to spawn a process.')
        self._errpipe_read, self._errpipe_write = os_pipe()
        fds_to_close = self._fds_to_close
        self._fds_to_close = None
        fds_to_close.append(self._errpipe_read)
        # add the write pipe last so we can close it early
        fds_to_close.append(self._errpipe_write)
        try:
            os_set_inheritable(self._errpipe_write, True)
            self._preexec_fn = preexec_fn
            self._restore_signals = restore_signals
            loop.active_process_handler = self
            __forking = 1
            __forking_loop = loop
            system.setForkHandler(<system.OnForkHandler>&__get_fork_handler)
            PyOS_BeforeFork()
            err = uv.uv_spawn(loop.uvloop,
                              <uv.uv_process_t*>self._handle,
                              &self.options)
            __forking = 0
            __forking_loop = None
            system.resetForkHandler()
            loop.active_process_handler = None
            PyOS_AfterFork_Parent()
            if err < 0:
                self._close_process_handle()
                self._abort_init()
                raise convert_error(err)
            self._finish_init()
            # close the write pipe early
            os_close(fds_to_close.pop())
            if preexec_fn is not None:
                errpipe_data = bytearray()
                while True:
                    # XXX: This is a blocking code that has to be
                    # rewritten (using loop.connect_read_pipe() or
                    # otherwise.)
                    part = os_read(self._errpipe_read, 50000)
                    errpipe_data += part
                    if not part or len(errpipe_data) > 50000:
                        break
        finally:
            while fds_to_close:
                os_close(fds_to_close.pop())
            for fd in restore_inheritable:
                os_set_inheritable(fd, False)
        # asyncio caches the PID in BaseSubprocessTransport,
        # so that the transport knows what the PID was even
        # after the process is finished.
        self._pid = (<uv.uv_process_t*>self._handle).pid
        # Track the process handle (create a strong ref to it)
        # to guarantee that __dealloc__ doesn't happen in an
        # uncontrolled fashion.  We want to wait until the process
        # exits and libuv calls __uvprocess_on_exit_callback,
        # which will call `UVProcess._close()`, which will, in turn,
        # untrack this handle.
        self._loop._track_process(self)
        if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK:
            time_sleep(1)
        if preexec_fn is not None and errpipe_data:
            # preexec_fn has raised an exception.  The child
            # process must be dead now.
            try:
                exc_name, exc_msg = errpipe_data.split(b':', 1)
                exc_name = exc_name.decode()
                exc_msg = exc_msg.decode()
            except Exception:
                self._close()
                raise subprocess_SubprocessError(
                    'Bad exception data from child: {!r}'.format(
                        errpipe_data))
            exc_cls = getattr(__builtins__, exc_name,
                              subprocess_SubprocessError)
            exc = subprocess_SubprocessError(
                'Exception occurred in preexec_fn.')
            exc.__cause__ = exc_cls(exc_msg)
            self._close()
            raise exc
    cdef _after_fork(self):
        # See CPython/_posixsubprocess.c for details
        cdef int err
        if self._restore_signals:
            _Py_RestoreSignals()
        PyOS_AfterFork_Child()
        err = uv.uv_loop_fork(self._loop.uvloop)
        if err < 0:
            raise convert_error(err)
        if self._preexec_fn is not None:
            try:
                gc_disable()
                self._preexec_fn()
            except BaseException as ex:
                try:
                    with open(self._errpipe_write, 'wb') as f:
                        f.write(str(ex.__class__.__name__).encode())
                        f.write(b':')
                        f.write(str(ex.args[0]).encode())
                finally:
                    system._exit(255)
                    return
            else:
                os_close(self._errpipe_write)
        else:
            os_close(self._errpipe_write)
    cdef _close_after_spawn(self, int fd):
        if self._fds_to_close is None:
            raise RuntimeError(
                'UVProcess._close_after_spawn called after uv_spawn')
        self._fds_to_close.append(fd)
    def __dealloc__(self):
        if self.uv_opt_env is not NULL:
            PyMem_RawFree(self.uv_opt_env)
            self.uv_opt_env = NULL
        if self.uv_opt_args is not NULL:
            PyMem_RawFree(self.uv_opt_args)
            self.uv_opt_args = NULL
    cdef char** __to_cstring_array(self, list arr):
        cdef:
            int i
            ssize_t arr_len = len(arr)
            bytes el
            char **ret
        ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *))
        if ret is NULL:
            raise MemoryError()
        for i in range(arr_len):
            el = arr[i]
            # NB: PyBytes_AsString doesn't copy the data;
            # we have to be careful when the "arr" is GCed,
            # and it shouldn't be ever mutated.
            ret[i] = PyBytes_AsString(el)
        ret[arr_len] = NULL
        return ret
    cdef _init_options(self, list args, dict env, cwd, start_new_session,
                       _stdin, _stdout, _stderr, bint force_fork):
        memset(&self.options, 0, sizeof(uv.uv_process_options_t))
        self._init_env(env)
        self.options.env = self.uv_opt_env
        self._init_args(args)
        self.options.file = self.uv_opt_file
        self.options.args = self.uv_opt_args
        if start_new_session:
            self.options.flags |= uv.UV_PROCESS_DETACHED
        if force_fork:
            # This is a hack to work around the change in libuv 1.44:
            #    > macos: use posix_spawn instead of fork
            # where Python subprocess options like preexec_fn are
            # crippled. CPython only uses posix_spawn under a pretty
            # strict list of conditions (see subprocess.py), and falls
            # back to using fork() otherwise. We'd like to simulate such
            # behavior with libuv, but unfortunately libuv doesn't
            # provide explicit API to choose such implementation detail.
            # Based on current (libuv 1.46) behavior, setting
            # UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make
            # libuv fallback to use fork, so let's just use it for now.
            self.options.flags |= uv.UV_PROCESS_SETUID
            self.options.uid = uv.getuid()
        if cwd is not None:
            cwd = os_fspath(cwd)
            if isinstance(cwd, str):
                cwd = PyUnicode_EncodeFSDefault(cwd)
            if not isinstance(cwd, bytes):
                raise ValueError('cwd must be a str or bytes object')
            self.__cwd = cwd
            self.options.cwd = PyBytes_AsString(self.__cwd)
        self.options.exit_cb = &__uvprocess_on_exit_callback
        self._init_files(_stdin, _stdout, _stderr)
    cdef _init_args(self, list args):
        cdef:
            bytes path
            int an = len(args)
        if an < 1:
            raise ValueError('cannot spawn a process: args are empty')
        self.__args = args.copy()
        for i in range(an):
            arg = os_fspath(args[i])
            if isinstance(arg, str):
                self.__args[i] = PyUnicode_EncodeFSDefault(arg)
            elif not isinstance(arg, bytes):
                raise TypeError('all args must be str or bytes')
        path = self.__args[0]
        self.uv_opt_file = PyBytes_AsString(path)
        self.uv_opt_args = self.__to_cstring_array(self.__args)
    cdef _init_env(self, dict env):
        if env is not None:
            self.__env = list()
            for key in env:
                val = env[key]
                if isinstance(key, str):
                    key = PyUnicode_EncodeFSDefault(key)
                elif not isinstance(key, bytes):
                    raise TypeError(
                        'all environment vars must be bytes or str')
                if isinstance(val, str):
                    val = PyUnicode_EncodeFSDefault(val)
                elif not isinstance(val, bytes):
                    raise TypeError(
                        'all environment values must be bytes or str')
                self.__env.append(key + b'=' + val)
            self.uv_opt_env = self.__to_cstring_array(self.__env)
        else:
            self.__env = None
    cdef _init_files(self, _stdin, _stdout, _stderr):
        self.options.stdio_count = 0
    cdef _kill(self, int signum):
        cdef int err
        self._ensure_alive()
        err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum)
        if err < 0:
            raise convert_error(err)
    cdef _on_exit(self, int64_t exit_status, int term_signal):
        if term_signal:
            # From Python docs:
            #    A negative value -N indicates that the child was
            #    terminated by signal N (POSIX only).
            self._returncode = -term_signal
        else:
            self._returncode = exit_status
        self._close()
    cdef _close(self):
        try:
            if self._loop is not None:
                self._loop._untrack_process(self)
        finally:
            UVHandle._close(self)
DEF _CALL_PIPE_DATA_RECEIVED = 0
DEF _CALL_PIPE_CONNECTION_LOST = 1
DEF _CALL_PROCESS_EXITED = 2
DEF _CALL_CONNECTION_LOST = 3
@cython.no_gc_clear
cdef class UVProcessTransport(UVProcess):
    def __cinit__(self):
        self._exit_waiters = []
        self._protocol = None
        self._init_futs = []
        self._pending_calls = []
        self._stdio_ready = 0
        self._stdin = self._stdout = self._stderr = None
        self.stdin_proto = self.stdout_proto = self.stderr_proto = None
        self._finished = 0
    cdef _on_exit(self, int64_t exit_status, int term_signal):
        UVProcess._on_exit(self, exit_status, term_signal)
        if self._stdio_ready:
            self._loop.call_soon(self._protocol.process_exited,
                                 context=self.context)
        else:
            self._pending_calls.append((_CALL_PROCESS_EXITED, None, None))
        self._try_finish()
        for waiter in self._exit_waiters:
            if not waiter.cancelled():
                waiter.set_result(self._returncode)
        self._exit_waiters.clear()
        self._close()
    cdef _check_proc(self):
        if not self._is_alive() or self._returncode is not None:
            raise ProcessLookupError()
    cdef _pipe_connection_lost(self, int fd, exc):
        if self._stdio_ready:
            self._loop.call_soon(self._protocol.pipe_connection_lost, fd, exc,
                                 context=self.context)
            self._try_finish()
        else:
            self._pending_calls.append((_CALL_PIPE_CONNECTION_LOST, fd, exc))
    cdef _pipe_data_received(self, int fd, data):
        if self._stdio_ready:
            self._loop.call_soon(self._protocol.pipe_data_received, fd, data,
                                 context=self.context)
        else:
            self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data))
    cdef _file_redirect_stdio(self, int fd):
        fd = os_dup(fd)
        os_set_inheritable(fd, True)
        self._close_after_spawn(fd)
        return fd
    cdef _file_devnull(self):
        dn = os_open(os_devnull, os_O_RDWR)
        os_set_inheritable(dn, True)
        self._close_after_spawn(dn)
        return dn
    cdef _file_outpipe(self):
        r, w = __socketpair()
        os_set_inheritable(w, True)
        self._close_after_spawn(w)
        return r, w
    cdef _file_inpipe(self):
        r, w = __socketpair()
        os_set_inheritable(r, True)
        self._close_after_spawn(r)
        return r, w
    cdef _init_files(self, _stdin, _stdout, _stderr):
        cdef uv.uv_stdio_container_t *iocnt
        UVProcess._init_files(self, _stdin, _stdout, _stderr)
        io = [None, None, None]
        self.options.stdio_count = 3
        self.options.stdio = self.iocnt
        if _stdin is not None:
            if _stdin == subprocess_PIPE:
                r, w = self._file_inpipe()
                io[0] = r
                self.stdin_proto = WriteSubprocessPipeProto(self, 0)
                waiter = self._loop._new_future()
                self._stdin = WriteUnixTransport.new(
                    self._loop, self.stdin_proto, None, waiter)
                self._init_futs.append(waiter)
                self._stdin._open(w)
                self._stdin._init_protocol()
            elif _stdin == subprocess_DEVNULL:
                io[0] = self._file_devnull()
            elif _stdout == subprocess_STDOUT:
                raise ValueError(
                    'subprocess.STDOUT is supported only by stderr parameter')
            else:
                io[0] = self._file_redirect_stdio(_stdin)
        else:
            io[0] = self._file_redirect_stdio(0)
        if _stdout is not None:
            if _stdout == subprocess_PIPE:
                # We can't use UV_CREATE_PIPE here, since 'stderr' might be
                # set to 'subprocess.STDOUT', and there is no way to
                # emulate that functionality with libuv high-level
                # streams API. Therefore, we create pipes for stdout and
                # stderr manually.
                r, w = self._file_outpipe()
                io[1] = w
                self.stdout_proto = ReadSubprocessPipeProto(self, 1)
                waiter = self._loop._new_future()
                self._stdout = ReadUnixTransport.new(
                    self._loop, self.stdout_proto, None, waiter)
                self._init_futs.append(waiter)
                self._stdout._open(r)
                self._stdout._init_protocol()
            elif _stdout == subprocess_DEVNULL:
                io[1] = self._file_devnull()
            elif _stdout == subprocess_STDOUT:
                raise ValueError(
                    'subprocess.STDOUT is supported only by stderr parameter')
            else:
                io[1] = self._file_redirect_stdio(_stdout)
        else:
            io[1] = self._file_redirect_stdio(1)
        if _stderr is not None:
            if _stderr == subprocess_PIPE:
                r, w = self._file_outpipe()
                io[2] = w
                self.stderr_proto = ReadSubprocessPipeProto(self, 2)
                waiter = self._loop._new_future()
                self._stderr = ReadUnixTransport.new(
                    self._loop, self.stderr_proto, None, waiter)
                self._init_futs.append(waiter)
                self._stderr._open(r)
                self._stderr._init_protocol()
            elif _stderr == subprocess_STDOUT:
                if io[1] is None:
                    # shouldn't ever happen
                    raise RuntimeError('cannot apply subprocess.STDOUT')
                io[2] = self._file_redirect_stdio(io[1])
            elif _stderr == subprocess_DEVNULL:
                io[2] = self._file_devnull()
            else:
                io[2] = self._file_redirect_stdio(_stderr)
        else:
            io[2] = self._file_redirect_stdio(2)
        assert len(io) == 3
        for idx in range(3):
            iocnt = &self.iocnt[idx]
            if io[idx] is not None:
                iocnt.flags = uv.UV_INHERIT_FD
                iocnt.data.fd = io[idx]
            else:
                iocnt.flags = uv.UV_IGNORE
    cdef _call_connection_made(self, waiter):
        try:
            # we're always called in the right context, so just call the user's
            self._protocol.connection_made(self)
        except (KeyboardInterrupt, SystemExit):
            raise
        except BaseException as ex:
            if waiter is not None and not waiter.cancelled():
                waiter.set_exception(ex)
            else:
                raise
        else:
            if waiter is not None and not waiter.cancelled():
                waiter.set_result(True)
        self._stdio_ready = 1
        if self._pending_calls:
            pending_calls = self._pending_calls.copy()
            self._pending_calls.clear()
            for (type, fd, arg) in pending_calls:
                if type == _CALL_PIPE_CONNECTION_LOST:
                    self._pipe_connection_lost(fd, arg)
                elif type == _CALL_PIPE_DATA_RECEIVED:
                    self._pipe_data_received(fd, arg)
                elif type == _CALL_PROCESS_EXITED:
                    self._loop.call_soon(self._protocol.process_exited)
                elif type == _CALL_CONNECTION_LOST:
                    self._loop.call_soon(self._protocol.connection_lost, None)
    cdef _try_finish(self):
        if self._returncode is None or self._finished:
            return
        if ((self.stdin_proto is None or self.stdin_proto.disconnected) and
                (self.stdout_proto is None or
                    self.stdout_proto.disconnected) and
                (self.stderr_proto is None or
                    self.stderr_proto.disconnected)):
            self._finished = 1
            if self._stdio_ready:
                # copy self.context for simplicity
                self._loop.call_soon(self._protocol.connection_lost, None,
                                     context=self.context)
            else:
                self._pending_calls.append((_CALL_CONNECTION_LOST, None, None))
    def __stdio_inited(self, waiter, stdio_fut):
        exc = stdio_fut.exception()
        if exc is not None:
            if waiter is None:
                raise exc
            else:
                waiter.set_exception(exc)
        else:
            self._loop._call_soon_handle(
                new_MethodHandle1(self._loop,
                                  "UVProcessTransport._call_connection_made",
                                  <method1_t>self._call_connection_made,
                                  None,  # means to copy the current context
                                  self, waiter))
    @staticmethod
    cdef UVProcessTransport new(Loop loop, protocol, args, env,
                                cwd, start_new_session,
                                _stdin, _stdout, _stderr, pass_fds,
                                waiter,
                                debug_flags,
                                preexec_fn,
                                restore_signals):
        cdef UVProcessTransport handle
        handle = UVProcessTransport.__new__(UVProcessTransport)
        handle._protocol = protocol
        handle._init(loop, args, env, cwd, start_new_session,
                     __process_convert_fileno(_stdin),
                     __process_convert_fileno(_stdout),
                     __process_convert_fileno(_stderr),
                     pass_fds,
                     debug_flags,
                     preexec_fn,
                     restore_signals)
        if handle._init_futs:
            handle._stdio_ready = 0
            init_fut = aio_gather(*handle._init_futs)
            # add_done_callback will copy the current context and run the
            # callback within the context
            init_fut.add_done_callback(
                ft_partial(handle.__stdio_inited, waiter))
        else:
            handle._stdio_ready = 1
            loop._call_soon_handle(
                new_MethodHandle1(loop,
                                  "UVProcessTransport._call_connection_made",
                                  <method1_t>handle._call_connection_made,
                                  None,  # means to copy the current context
                                  handle, waiter))
        return handle
    def get_protocol(self):
        return self._protocol
    def set_protocol(self, protocol):
        self._protocol = protocol
    def get_pid(self):
        return self._pid
    def get_returncode(self):
        return self._returncode
    def get_pipe_transport(self, fd):
        if fd == 0:
            return self._stdin
        elif fd == 1:
            return self._stdout
        elif fd == 2:
            return self._stderr
    def terminate(self):
        self._check_proc()
        self._kill(uv.SIGTERM)
    def kill(self):
        self._check_proc()
        self._kill(uv.SIGKILL)
    def send_signal(self, int signal):
        self._check_proc()
        self._kill(signal)
    def is_closing(self):
        return self._closed
    def close(self):
        if self._returncode is None:
            self._kill(uv.SIGKILL)
        if self._stdin is not None:
            self._stdin.close()
        if self._stdout is not None:
            self._stdout.close()
        if self._stderr is not None:
            self._stderr.close()
        if self._returncode is not None:
            # The process is dead, just close the UV handle.
            #
            # (If "self._returncode is None", the process should have been
            # killed already and we're just waiting for a SIGCHLD; after
            # which the transport will be GC'ed and the uvhandle will be
            # closed in UVHandle.__dealloc__.)
            self._close()
    def get_extra_info(self, name, default=None):
        return default
    def _wait(self):
        fut = self._loop._new_future()
        if self._returncode is not None:
            fut.set_result(self._returncode)
            return fut
        self._exit_waiters.append(fut)
        return fut
class WriteSubprocessPipeProto(aio_BaseProtocol):
    def __init__(self, proc, fd):
        if UVLOOP_DEBUG:
            if type(proc) is not UVProcessTransport:
                raise TypeError
            if not isinstance(fd, int):
                raise TypeError
        self.proc = proc
        self.fd = fd
        self.pipe = None
        self.disconnected = False
    def connection_made(self, transport):
        self.pipe = transport
    def __repr__(self):
        return ('<%s fd=%s pipe=%r>'
                % (self.__class__.__name__, self.fd, self.pipe))
    def connection_lost(self, exc):
        self.disconnected = True
        (<UVProcessTransport>self.proc)._pipe_connection_lost(self.fd, exc)
        self.proc = None
    def pause_writing(self):
        (<UVProcessTransport>self.proc)._protocol.pause_writing()
    def resume_writing(self):
        (<UVProcessTransport>self.proc)._protocol.resume_writing()
class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
                              aio_Protocol):
    def data_received(self, data):
        (<UVProcessTransport>self.proc)._pipe_data_received(self.fd, data)
cdef __process_convert_fileno(object obj):
    if obj is None or isinstance(obj, int):
        return obj
    fileno = obj.fileno()
    if not isinstance(fileno, int):
        raise TypeError(
            '{!r}.fileno() returned non-integer'.format(obj))
    return fileno
cdef void __uvprocess_on_exit_callback(
    uv.uv_process_t *handle,
    int64_t exit_status,
    int term_signal,
) noexcept with gil:
    if __ensure_handle_data(<uv.uv_handle_t*>handle,
                            "UVProcess exit callback") == 0:
        return
    cdef UVProcess proc = <UVProcess> handle.data
    try:
        proc._on_exit(exit_status, term_signal)
    except BaseException as ex:
        proc._error(ex, False)
cdef __socketpair():
    cdef:
        int fds[2]
        int err
    err = system.socketpair(uv.AF_UNIX, uv.SOCK_STREAM, 0, fds)
    if err:
        exc = convert_error(-err)
        raise exc
    os_set_inheritable(fds[0], False)
    os_set_inheritable(fds[1], False)
    return fds[0], fds[1]
cdef void __uv_close_process_handle_cb(
    uv.uv_handle_t* handle
) noexcept with gil:
    PyMem_RawFree(handle)