Source code for mordred._base.calculator

from __future__ import print_function

import sys
from types import ModuleType
from inspect import getsourcelines
from contextlib import contextmanager

from tqdm import tqdm

from .._util import Capture, DummyBar, NotebookWrapper
from ..error import Error, Missing, MultipleFragments
from .context import Context
from .descriptor import Descriptor, MissingValueException


[docs]class Calculator(object): r"""descriptor calculator. :param descs: see :py:meth:`register` method """ __slots__ = ( '_descriptors', '_explicit_hydrogens', '_kekulizes', '_require_3D', '_cache', '_debug', '_progress_bar' ) def __setstate__(self, dict): self._descriptors = dict.get('_descriptors', []) self._explicit_hydrogens = dict.get('_explicit_hydrogens', set([True, False])) self._kekulizes = dict.get('_kekulizes', set([True, False])) self._require_3D = dict.get('_require_3D', False) def __reduce_ex__(self, version): return self.__class__, (), { '_descriptors': self._descriptors, '_explicit_hydrogens': self._explicit_hydrogens, '_kekulizes': self._kekulizes, '_require_3D': self._require_3D, } def __init__(self, descs=[], ignore_3D=False): self._descriptors = [] self._explicit_hydrogens = set() self._kekulizes = set() self._require_3D = False self._debug = False self.register(descs, ignore_3D=ignore_3D) @property def descriptors(self): r'''all descriptors. you can get/set/delete descriptor. ''' return tuple(self._descriptors) @descriptors.setter def descriptors(self, descs): del self.descriptors self.register(descs) @descriptors.deleter def descriptors(self): self._descriptors = [] self._explicit_hydrogens.clear() self._kekulizes.clear() self._require_3D = False def __len__(self): return len(self._descriptors) def _register_one(self, desc, check_only=False, ignore_3D=False): if not isinstance(desc, Descriptor): raise ValueError('{!r} is not descriptor'.format(desc)) if ignore_3D and desc.require_3D: return self._explicit_hydrogens.add(bool(desc.explicit_hydrogens)) self._kekulizes.add(bool(desc.kekulize)) self._require_3D |= desc.require_3D for dep in (desc.dependencies() or {}).values(): if isinstance(dep, Descriptor): self._register_one(dep, check_only=True) if not check_only: self._descriptors.append(desc)
[docs] def register(self, desc, ignore_3D=False): r"""register descriptors. :type desc: :py:class:`module`, :py:class:`Descriptor` class/instance or :py:class:`Iterable` :param desc: descriptors to register * :py:class:`module`: Descriptors in module * :py:class:`Descriptor` class: use :py:meth:`Descriptor.preset` """ if not hasattr(desc, '__iter__'): if Descriptor.is_descriptor_class(desc): for d in desc.preset(): self._register_one(d, ignore_3D=ignore_3D) elif isinstance(desc, ModuleType): self.register(get_descriptors_from_module(desc), ignore_3D=ignore_3D) else: self._register_one(desc, ignore_3D=ignore_3D) else: for d in desc: self.register(d, ignore_3D=ignore_3D)
def _calculate_one(self, cxt, desc, reset): if desc in self._cache: return self._cache[desc] if reset: cxt.reset() desc._context = cxt cxt.add_stack(desc) if desc.require_connected and desc._context.n_frags != 1: desc.fail(MultipleFragments()) args = { name: self._calculate_one(cxt, dep, False) if dep is not None else None for name, dep in (desc.dependencies() or {}).items() } r = desc.calculate(**args) if self._debug: self._check_rtype(desc, r) self._cache[desc] = r return r def _check_rtype(self, desc, result): if desc.rtype is None: return if isinstance(result, Error): return if not isinstance(result, desc.rtype): raise TypeError('{} not match {}'.format(result, desc.rtype)) def _calculate(self, cxt): self._cache = {} for desc in self.descriptors: try: yield self._calculate_one(cxt, desc, True) except MissingValueException as e: yield Missing(e.error, desc._context.get_stack()) except Exception as e: yield Error(e, desc._context.get_stack()) finally: if hasattr(desc, '_context'): del desc._context def __call__(self, mol, id=-1): r"""calculate descriptors. :type mol: rdkit.Chem.Mol :param mol: molecular :type id: int :param id: conformer id :rtype: [scalar or Error] :returns: iterator of descriptor and value """ return list(self._calculate(Context.from_calculator(self, mol, id))) def _serial(self, mols, nmols, quiet, ipynb, id): with self._progress(quiet, nmols, ipynb) as bar: for m in mols: with Capture() as capture: r = list(self._calculate(Context.from_calculator(self, m, id))) for e in capture.result: e = e.rstrip() if not e: continue bar.write(e, file=capture.orig) yield r bar.update() @contextmanager def _progress(self, quiet, total, ipynb): args = { 'dynamic_ncols': True, 'leave': True, 'total': total } if quiet: Bar = DummyBar elif ipynb: Bar = NotebookWrapper else: Bar = tqdm try: with Bar(**args) as self._progress_bar: yield self._progress_bar finally: if hasattr(self, '_progress_bar'): del self._progress_bar
[docs] def echo(self, s, file=sys.stdout, end='\n'): '''output message''' p = getattr(self, '_progress_bar', None) if p is not None: p.write(s, file=file, end='\n') return print(s, file=file, end='\n')
[docs] def map(self, mols, nproc=None, nmols=None, quiet=False, ipynb=False, id=-1): r"""calculate descriptors over mols. :type mols: :py:class:`Iterable` (:py:class:`Mol`) :param mols: moleculars :type nproc: :py:class:`int` or :py:class:`None` :param nproc: number of process. None is :py:func:`multiprocessing.cpu_count` :type nmols: :py:class:`None` or :py:class:`int` :param nmols: number of all mols for display progress bar :type quiet: :py:class:`bool` :param quiet: suppress progress bar :type ipynb: :py:class:`bool` :param ipynb: use ipython notebook progress bar :type id: :py:class:`int` :param id: conformer id :rtype: :py:class:`Iterator` [scalar] """ if hasattr(mols, '__len__'): nmols = len(mols) if nproc == 1: return self._serial(mols, nmols=nmols, quiet=quiet, ipynb=ipynb, id=id) else: return self._parallel(mols, nproc, nmols=nmols, quiet=quiet, ipynb=ipynb, id=id)
[docs] def pandas(self, mols, nproc=None, nmols=None, quiet=False, ipynb=False, id=-1): r"""calculate descriptors over mols. :type mol_name: str :param mol_name: molecular column name :rtype: :py:class:`pandas.DataFrame` """ import pandas return pandas.DataFrame( self.map(mols, nproc, nmols, quiet, ipynb, id), columns=[str(d) for d in self.descriptors] )
[docs]def get_descriptors_from_module(mdl): r"""get descriptors from module. :type mdl: module :param mdl: module to search :rtype: [:py:class:`Descriptor`] """ __all__ = getattr(mdl, '__all__', None) if __all__ is None: __all__ = dir(mdl) descs = [ fn for fn in (getattr(mdl, name) for name in __all__ if name[:1] != '_') if Descriptor.is_descriptor_class(fn) ] def key_by_def(d): try: return getsourcelines(d)[1] except IOError: return sys.maxsize descs.sort(key=key_by_def) return descs