verifier.pyā¢11.2 kB
#
# DEPRECATED: implementation for ffi.verify()
#
import sys, os, binascii, shutil, io
from . import __version_verifier_modules__
from . import ffiplatform
from .error import VerificationError
if sys.version_info >= (3, 3):
    import importlib.machinery
    def _extension_suffixes():
        return importlib.machinery.EXTENSION_SUFFIXES[:]
else:
    import imp
    def _extension_suffixes():
        return [suffix for suffix, _, type in imp.get_suffixes()
                if type == imp.C_EXTENSION]
if sys.version_info >= (3,):
    NativeIO = io.StringIO
else:
    class NativeIO(io.BytesIO):
        def write(self, s):
            if isinstance(s, unicode):
                s = s.encode('ascii')
            super(NativeIO, self).write(s)
class Verifier(object):
    def __init__(self, ffi, preamble, tmpdir=None, modulename=None,
                 ext_package=None, tag='', force_generic_engine=False,
                 source_extension='.c', flags=None, relative_to=None, **kwds):
        if ffi._parser._uses_new_feature:
            raise VerificationError(
                "feature not supported with ffi.verify(), but only "
                "with ffi.set_source(): %s" % (ffi._parser._uses_new_feature,))
        self.ffi = ffi
        self.preamble = preamble
        if not modulename:
            flattened_kwds = ffiplatform.flatten(kwds)
        vengine_class = _locate_engine_class(ffi, force_generic_engine)
        self._vengine = vengine_class(self)
        self._vengine.patch_extension_kwds(kwds)
        self.flags = flags
        self.kwds = self.make_relative_to(kwds, relative_to)
        #
        if modulename:
            if tag:
                raise TypeError("can't specify both 'modulename' and 'tag'")
        else:
            key = '\x00'.join(['%d.%d' % sys.version_info[:2],
                               __version_verifier_modules__,
                               preamble, flattened_kwds] +
                              ffi._cdefsources)
            if sys.version_info >= (3,):
                key = key.encode('utf-8')
            k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff)
            k1 = k1.lstrip('0x').rstrip('L')
            k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff)
            k2 = k2.lstrip('0').rstrip('L')
            modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key,
                                              k1, k2)
        suffix = _get_so_suffixes()[0]
        self.tmpdir = tmpdir or _caller_dir_pycache()
        self.sourcefilename = os.path.join(self.tmpdir, modulename + source_extension)
        self.modulefilename = os.path.join(self.tmpdir, modulename + suffix)
        self.ext_package = ext_package
        self._has_source = False
        self._has_module = False
    def write_source(self, file=None):
        """Write the C source code.  It is produced in 'self.sourcefilename',
        which can be tweaked beforehand."""
        with self.ffi._lock:
            if self._has_source and file is None:
                raise VerificationError(
                    "source code already written")
            self._write_source(file)
    def compile_module(self):
        """Write the C source code (if not done already) and compile it.
        This produces a dynamic link library in 'self.modulefilename'."""
        with self.ffi._lock:
            if self._has_module:
                raise VerificationError("module already compiled")
            if not self._has_source:
                self._write_source()
            self._compile_module()
    def load_library(self):
        """Get a C module from this Verifier instance.
        Returns an instance of a FFILibrary class that behaves like the
        objects returned by ffi.dlopen(), but that delegates all
        operations to the C module.  If necessary, the C code is written
        and compiled first.
        """
        with self.ffi._lock:
            if not self._has_module:
                self._locate_module()
                if not self._has_module:
                    if not self._has_source:
                        self._write_source()
                    self._compile_module()
            return self._load_library()
    def get_module_name(self):
        basename = os.path.basename(self.modulefilename)
        # kill both the .so extension and the other .'s, as introduced
        # by Python 3: 'basename.cpython-33m.so'
        basename = basename.split('.', 1)[0]
        # and the _d added in Python 2 debug builds --- but try to be
        # conservative and not kill a legitimate _d
        if basename.endswith('_d') and hasattr(sys, 'gettotalrefcount'):
            basename = basename[:-2]
        return basename
    def get_extension(self):
        if not self._has_source:
            with self.ffi._lock:
                if not self._has_source:
                    self._write_source()
        sourcename = ffiplatform.maybe_relative_path(self.sourcefilename)
        modname = self.get_module_name()
        return ffiplatform.get_extension(sourcename, modname, **self.kwds)
    def generates_python_module(self):
        return self._vengine._gen_python_module
    def make_relative_to(self, kwds, relative_to):
        if relative_to and os.path.dirname(relative_to):
            dirname = os.path.dirname(relative_to)
            kwds = kwds.copy()
            for key in ffiplatform.LIST_OF_FILE_NAMES:
                if key in kwds:
                    lst = kwds[key]
                    if not isinstance(lst, (list, tuple)):
                        raise TypeError("keyword '%s' should be a list or tuple"
                                        % (key,))
                    lst = [os.path.join(dirname, fn) for fn in lst]
                    kwds[key] = lst
        return kwds
    # ----------
    def _locate_module(self):
        if not os.path.isfile(self.modulefilename):
            if self.ext_package:
                try:
                    pkg = __import__(self.ext_package, None, None, ['__doc__'])
                except ImportError:
                    return      # cannot import the package itself, give up
                    # (e.g. it might be called differently before installation)
                path = pkg.__path__
            else:
                path = None
            filename = self._vengine.find_module(self.get_module_name(), path,
                                                 _get_so_suffixes())
            if filename is None:
                return
            self.modulefilename = filename
        self._vengine.collect_types()
        self._has_module = True
    def _write_source_to(self, file):
        self._vengine._f = file
        try:
            self._vengine.write_source_to_f()
        finally:
            del self._vengine._f
    def _write_source(self, file=None):
        if file is not None:
            self._write_source_to(file)
        else:
            # Write our source file to an in memory file.
            f = NativeIO()
            self._write_source_to(f)
            source_data = f.getvalue()
            # Determine if this matches the current file
            if os.path.exists(self.sourcefilename):
                with open(self.sourcefilename, "r") as fp:
                    needs_written = not (fp.read() == source_data)
            else:
                needs_written = True
            # Actually write the file out if it doesn't match
            if needs_written:
                _ensure_dir(self.sourcefilename)
                with open(self.sourcefilename, "w") as fp:
                    fp.write(source_data)
            # Set this flag
            self._has_source = True
    def _compile_module(self):
        # compile this C source
        tmpdir = os.path.dirname(self.sourcefilename)
        outputfilename = ffiplatform.compile(tmpdir, self.get_extension())
        try:
            same = ffiplatform.samefile(outputfilename, self.modulefilename)
        except OSError:
            same = False
        if not same:
            _ensure_dir(self.modulefilename)
            shutil.move(outputfilename, self.modulefilename)
        self._has_module = True
    def _load_library(self):
        assert self._has_module
        if self.flags is not None:
            return self._vengine.load_library(self.flags)
        else:
            return self._vengine.load_library()
# ____________________________________________________________
_FORCE_GENERIC_ENGINE = False      # for tests
def _locate_engine_class(ffi, force_generic_engine):
    if _FORCE_GENERIC_ENGINE:
        force_generic_engine = True
    if not force_generic_engine:
        if '__pypy__' in sys.builtin_module_names:
            force_generic_engine = True
        else:
            try:
                import _cffi_backend
            except ImportError:
                _cffi_backend = '?'
            if ffi._backend is not _cffi_backend:
                force_generic_engine = True
    if force_generic_engine:
        from . import vengine_gen
        return vengine_gen.VGenericEngine
    else:
        from . import vengine_cpy
        return vengine_cpy.VCPythonEngine
# ____________________________________________________________
_TMPDIR = None
def _caller_dir_pycache():
    if _TMPDIR:
        return _TMPDIR
    result = os.environ.get('CFFI_TMPDIR')
    if result:
        return result
    filename = sys._getframe(2).f_code.co_filename
    return os.path.abspath(os.path.join(os.path.dirname(filename),
                           '__pycache__'))
def set_tmpdir(dirname):
    """Set the temporary directory to use instead of __pycache__."""
    global _TMPDIR
    _TMPDIR = dirname
def cleanup_tmpdir(tmpdir=None, keep_so=False):
    """Clean up the temporary directory by removing all files in it
    called `_cffi_*.{c,so}` as well as the `build` subdirectory."""
    tmpdir = tmpdir or _caller_dir_pycache()
    try:
        filelist = os.listdir(tmpdir)
    except OSError:
        return
    if keep_so:
        suffix = '.c'   # only remove .c files
    else:
        suffix = _get_so_suffixes()[0].lower()
    for fn in filelist:
        if fn.lower().startswith('_cffi_') and (
                fn.lower().endswith(suffix) or fn.lower().endswith('.c')):
            try:
                os.unlink(os.path.join(tmpdir, fn))
            except OSError:
                pass
    clean_dir = [os.path.join(tmpdir, 'build')]
    for dir in clean_dir:
        try:
            for fn in os.listdir(dir):
                fn = os.path.join(dir, fn)
                if os.path.isdir(fn):
                    clean_dir.append(fn)
                else:
                    os.unlink(fn)
        except OSError:
            pass
def _get_so_suffixes():
    suffixes = _extension_suffixes()
    if not suffixes:
        # bah, no C_EXTENSION available.  Occurs on pypy without cpyext
        if sys.platform == 'win32':
            suffixes = [".pyd"]
        else:
            suffixes = [".so"]
    return suffixes
def _ensure_dir(filename):
    dirname = os.path.dirname(filename)
    if dirname and not os.path.isdir(dirname):
        os.makedirs(dirname)