from __future__ import print_function
import sys
from types import ModuleType
from inspect import getsourcelines
from contextlib import contextmanager
from tqdm import tqdm
from .._util import Capture, DummyBar, NotebookWrapper
from ..error import Error, Missing, MultipleFragments
from .context import Context
from .descriptor import Descriptor, MissingValueException
[docs]class Calculator(object):
r"""descriptor calculator.
:param descs: see :py:meth:`register` method
"""
__slots__ = (
'_descriptors', '_explicit_hydrogens', '_kekulizes', '_require_3D',
'_cache', '_debug', '_progress_bar'
)
def __setstate__(self, dict):
self._descriptors = dict.get('_descriptors', [])
self._explicit_hydrogens = dict.get('_explicit_hydrogens', set([True, False]))
self._kekulizes = dict.get('_kekulizes', set([True, False]))
self._require_3D = dict.get('_require_3D', False)
def __reduce_ex__(self, version):
return self.__class__, (), {
'_descriptors': self._descriptors,
'_explicit_hydrogens': self._explicit_hydrogens,
'_kekulizes': self._kekulizes,
'_require_3D': self._require_3D,
}
def __init__(self, descs=[], ignore_3D=False):
self._descriptors = []
self._explicit_hydrogens = set()
self._kekulizes = set()
self._require_3D = False
self._debug = False
self.register(descs, ignore_3D=ignore_3D)
@property
def descriptors(self):
r'''all descriptors.
you can get/set/delete descriptor.
'''
return tuple(self._descriptors)
@descriptors.setter
def descriptors(self, descs):
del self.descriptors
self.register(descs)
@descriptors.deleter
def descriptors(self):
self._descriptors = []
self._explicit_hydrogens.clear()
self._kekulizes.clear()
self._require_3D = False
def __len__(self):
return len(self._descriptors)
def _register_one(self, desc, check_only=False, ignore_3D=False):
if not isinstance(desc, Descriptor):
raise ValueError('{!r} is not descriptor'.format(desc))
if ignore_3D and desc.require_3D:
return
self._explicit_hydrogens.add(bool(desc.explicit_hydrogens))
self._kekulizes.add(bool(desc.kekulize))
self._require_3D |= desc.require_3D
for dep in (desc.dependencies() or {}).values():
if isinstance(dep, Descriptor):
self._register_one(dep, check_only=True)
if not check_only:
self._descriptors.append(desc)
[docs] def register(self, desc, ignore_3D=False):
r"""register descriptors.
:type desc: :py:class:`module`,
:py:class:`Descriptor` class/instance or
:py:class:`Iterable`
:param desc: descriptors to register
* :py:class:`module`: Descriptors in module
* :py:class:`Descriptor` class: use :py:meth:`Descriptor.preset`
"""
if not hasattr(desc, '__iter__'):
if Descriptor.is_descriptor_class(desc):
for d in desc.preset():
self._register_one(d, ignore_3D=ignore_3D)
elif isinstance(desc, ModuleType):
self.register(get_descriptors_from_module(desc), ignore_3D=ignore_3D)
else:
self._register_one(desc, ignore_3D=ignore_3D)
else:
for d in desc:
self.register(d, ignore_3D=ignore_3D)
def _calculate_one(self, cxt, desc, reset):
if desc in self._cache:
return self._cache[desc]
if reset:
cxt.reset()
desc._context = cxt
cxt.add_stack(desc)
if desc.require_connected and desc._context.n_frags != 1:
desc.fail(MultipleFragments())
args = {
name: self._calculate_one(cxt, dep, False)
if dep is not None else None
for name, dep in (desc.dependencies() or {}).items()
}
r = desc.calculate(**args)
if self._debug:
self._check_rtype(desc, r)
self._cache[desc] = r
return r
def _check_rtype(self, desc, result):
if desc.rtype is None:
return
if isinstance(result, Error):
return
if not isinstance(result, desc.rtype):
raise TypeError('{} not match {}'.format(result, desc.rtype))
def _calculate(self, cxt):
self._cache = {}
for desc in self.descriptors:
try:
yield self._calculate_one(cxt, desc, True)
except MissingValueException as e:
yield Missing(e.error, desc._context.get_stack())
except Exception as e:
yield Error(e, desc._context.get_stack())
finally:
if hasattr(desc, '_context'):
del desc._context
def __call__(self, mol, id=-1):
r"""calculate descriptors.
:type mol: rdkit.Chem.Mol
:param mol: molecular
:type id: int
:param id: conformer id
:rtype: [scalar or Error]
:returns: iterator of descriptor and value
"""
return list(self._calculate(Context.from_calculator(self, mol, id)))
def _serial(self, mols, nmols, quiet, ipynb, id):
with self._progress(quiet, nmols, ipynb) as bar:
for m in mols:
with Capture() as capture:
r = list(self._calculate(Context.from_calculator(self, m, id)))
for e in capture.result:
e = e.rstrip()
if not e:
continue
bar.write(e, file=capture.orig)
yield r
bar.update()
@contextmanager
def _progress(self, quiet, total, ipynb):
args = {
'dynamic_ncols': True,
'leave': True,
'total': total
}
if quiet:
Bar = DummyBar
elif ipynb:
Bar = NotebookWrapper
else:
Bar = tqdm
try:
with Bar(**args) as self._progress_bar:
yield self._progress_bar
finally:
if hasattr(self, '_progress_bar'):
del self._progress_bar
[docs] def echo(self, s, file=sys.stdout, end='\n'):
'''output message'''
p = getattr(self, '_progress_bar', None)
if p is not None:
p.write(s, file=file, end='\n')
return
print(s, file=file, end='\n')
[docs] def map(self, mols, nproc=None, nmols=None, quiet=False, ipynb=False, id=-1):
r"""calculate descriptors over mols.
:type mols: :py:class:`Iterable` (:py:class:`Mol`)
:param mols: moleculars
:type nproc: :py:class:`int` or :py:class:`None`
:param nproc: number of process. None is :py:func:`multiprocessing.cpu_count`
:type nmols: :py:class:`None` or :py:class:`int`
:param nmols: number of all mols for display progress bar
:type quiet: :py:class:`bool`
:param quiet: suppress progress bar
:type ipynb: :py:class:`bool`
:param ipynb: use ipython notebook progress bar
:type id: :py:class:`int`
:param id: conformer id
:rtype: :py:class:`Iterator` [scalar]
"""
if hasattr(mols, '__len__'):
nmols = len(mols)
if nproc == 1:
return self._serial(mols, nmols=nmols, quiet=quiet, ipynb=ipynb, id=id)
else:
return self._parallel(mols, nproc, nmols=nmols, quiet=quiet, ipynb=ipynb, id=id)
[docs] def pandas(self, mols, nproc=None, nmols=None, quiet=False, ipynb=False, id=-1):
r"""calculate descriptors over mols.
:type mol_name: str
:param mol_name: molecular column name
:rtype: :py:class:`pandas.DataFrame`
"""
import pandas
return pandas.DataFrame(
self.map(mols, nproc, nmols, quiet, ipynb, id),
columns=[str(d) for d in self.descriptors]
)
[docs]def get_descriptors_from_module(mdl):
r"""get descriptors from module.
:type mdl: module
:param mdl: module to search
:rtype: [:py:class:`Descriptor`]
"""
__all__ = getattr(mdl, '__all__', None)
if __all__ is None:
__all__ = dir(mdl)
descs = [
fn
for fn in (getattr(mdl, name) for name in __all__ if name[:1] != '_')
if Descriptor.is_descriptor_class(fn)
]
def key_by_def(d):
try:
return getsourcelines(d)[1]
except IOError:
return sys.maxsize
descs.sort(key=key_by_def)
return descs