import math
import os
import sys
from abc import ABCMeta, abstractmethod
from importlib import import_module
from inspect import getsourcelines, isabstract
from sys import maxsize
from types import ModuleType
from rdkit import Chem
from rdkit.Chem.rdPartialCharges import ComputeGasteigerCharges
import six
class MordredException(Exception):
pass
class MordredAttributeError(AttributeError, MordredException):
def __init__(self, desc, args):
super(AttributeError, self).__init__()
self.desc = desc
self.args = args
def __reduce_ex__(self, version):
return self.__class__, (self.desc, self.args)
def __str__(self):
return '{}({})'.format(self.args, self.desc)
class DescriptorException(MordredException):
def __init__(self, desc, e, mol, parent=None):
self.desc = desc
self.e = e
self.mol = mol
self.parent = parent
def __reduce_ex__(self, version):
return self.__class__, (self.desc, self.e, self.mol, self.parent)
def __str__(self):
if self.parent is None:
return '{}({!r}): {}'.format(
self.desc,
Chem.MolToSmiles(self.mol),
self.e,
)
return '{}/{}({!r}): {}'.format(
self.parent,
self.desc,
Chem.MolToSmiles(self.mol),
self.e,
)
def pretty(a):
p = getattr(a, 'name', None)
return repr(a if p is None else p)
[docs]class Descriptor(six.with_metaclass(ABCMeta, object)):
r"""abstract base class of descriptors."""
explicit_hydrogens = True
gasteiger_charges = False
kekulize = False
require_connected = False
_reduce_ex_version = 3
@abstractmethod
def __reduce_ex__(self, version):
pass
def __repr__(self):
cls, args = self.__reduce_ex__(self._reduce_ex_version)
return '{}({})'.format(cls.__name__, ', '.join(map(pretty, args)))
def __hash__(self):
return hash(self.__reduce_ex__(self._reduce_ex_version))
def __eq__(self, other):
l = self.__reduce_ex__(self._reduce_ex_version)
r = other.__reduce_ex__(self._reduce_ex_version)
return l.__eq__(r)
def __ne__(self, other):
return not self == other
def __lt__(self, other):
l = self.__reduce_ex__(self._reduce_ex_version)
r = other.__reduce_ex__(self._reduce_ex_version)
return l.__lt__(r)
rtype = type(None)
@classmethod
[docs] def preset(cls):
r"""generate preset descriptor instances.
(abstract classmethod)
:rtype: iterable
"""
pass
[docs] def dependencies(self):
r"""descriptor dependencies.
:rtype: {:py:class:`str`: (:py:class:`Descriptor` or :py:class:`None`)} or :py:class:`None`
"""
pass
@abstractmethod
[docs] def calculate(self, mol):
r"""calculate descriptor value.
(abstract method)
"""
pass
def __call__(self, mol):
r"""calculate single descriptor value.
:returns: descriptor result
:rtype: scalar
"""
return Calculator(self)(mol)[0][1]
@classmethod
[docs] def is_descriptor_class(cls, desc):
r"""check calculatable descriptor class or not.
:rtype: :py:class:`bool`
"""
return (
isinstance(desc, type) and
issubclass(desc, cls) and
not isabstract(desc)
)
class Molecule(object):
def __init__(self, orig):
Chem.SanitizeMol(orig)
self.orig = orig
self.hydration_cache = dict()
self.kekulize_cache = dict()
self.gasteiger_cache = dict()
self.is_connected = len(Chem.GetMolFrags(orig)) == 1
def hydration(self, explicitH):
if explicitH in self.hydration_cache:
return self.hydration_cache[explicitH]
mol = Chem.AddHs(self.orig) if explicitH else Chem.RemoveHs(self.orig)
self.hydration_cache[explicitH] = mol
return mol
def kekulize(self, mol, explicitH):
if explicitH in self.kekulize_cache:
return self.kekulize_cache[explicitH]
mol = Chem.Mol(mol)
Chem.Kekulize(mol)
self.kekulize_cache[explicitH] = mol
return mol
def gasteiger(self, mol, explicitH, kekulize):
key = explicitH, kekulize
if key in self.gasteiger_cache:
return self.gasteiger_cache[key]
ComputeGasteigerCharges(mol)
self.gasteiger_cache[key] = mol
return mol
def get(self, explicitH, kekulize, gasteiger):
mol = self.hydration(explicitH)
if kekulize:
mol = self.kekulize(mol, explicitH)
if gasteiger:
mol = self.gasteiger(mol, explicitH, kekulize)
return mol
[docs]class Calculator(object):
r"""descriptor calculator.
:param descs: see :py:meth:`register` method
"""
def __init__(self, *descs):
self.descriptors = []
self.explicitH = False
self.gasteiger = False
self.kekulize = False
self.register(*descs)
def __reduce_ex__(self, version):
return self.__class__, tuple(self.descriptors)
def _register_one(self, desc):
if not isinstance(desc, Descriptor):
raise ValueError('{!r} is not descriptor'.format(desc))
self.descriptors.append(desc)
if desc.explicit_hydrogens:
self.explicitH = True
if desc.gasteiger_charges:
self.gasteiger = True
if desc.kekulize:
self.kekulize = True
[docs] def register(self, *descs):
r"""register descriptors.
:type descs: :py:class:`module`,
:py:class:`Descriptor` class/instance or
:py:class:`Iterable`
:param descs: descriptors to register
* :py:class:`module`: Descriptors in module
* :py:class:`Descriptor` class: use :py:meth:`Descriptor.preset`
"""
for desc in descs:
if not hasattr(desc, '__iter__'):
if Descriptor.is_descriptor_class(desc):
for d in desc.preset():
self._register_one(d)
elif isinstance(desc, ModuleType):
self.register(get_descriptors_from_module(desc))
else:
self._register_one(desc)
else:
for d in desc:
self.register(d)
def _calculate(self, desc, cache, parent=None):
if desc in cache:
return cache[desc]
if desc.require_connected and not self.molecule.is_connected:
cache[desc] = float('nan')
return float('nan')
args = {
name: self._calculate(dep, cache, parent or desc)
if dep is not None else None
for name, dep in (desc.dependencies() or {}).items()
}
mol = self.molecule.get(
explicitH=desc.explicit_hydrogens,
gasteiger=desc.gasteiger_charges,
kekulize=desc.kekulize,
)
try:
r = desc.calculate(mol, **args)
except Exception as e:
raise DescriptorException(desc, e, mol, parent)
cache[desc] = r
return r
def __call__(self, mol, error_callback=None):
r"""calculate descriptors.
:type mol: rdkit.Chem.Mol
:param mol: molecular
:type error_callback: callable
:param error_callback: call when ransed Exception
:rtype: [(Descriptor, scalar or nan)]
:returns: iterator of descriptor and value
"""
cache = {}
self.molecule = Molecule(mol)
if error_callback is None:
def raise_error(e):
raise e
error_callback = raise_error
rs = []
for desc in self.descriptors:
try:
r = self._calculate(desc, cache)
except Exception as e:
r = error_callback(e)
if not math.isnan(r) and not isinstance(r, desc.rtype):
r = error_callback(DescriptorException(
desc,
TypeError('{!r}({}) is not {!r}'.format(r, type(r), desc.rtype)),
mol
))
if math.isnan(r):
rs.append((desc, float('nan')))
continue
rs.append((desc, desc.rtype(r)))
return rs
def _parallel(self, mols, processes, error_mode, callback, error_callback):
from multiprocessing import Pool
try:
pool = Pool(
processes,
initializer=initializer,
initargs=(self, error_mode),
)
kws = dict()
if callback is not None:
kws['callback'] = callback
if error_callback is not None:
kws['error_callback'] = error_callback
def do_task(m):
return pool.apply_async(
worker,
(m.ToBinary(),),
**kws
)
for m, result in [(m, do_task(m)) for m in mols]:
if six.PY3:
yield m, result.get()
else:
# timeout: avoid python2 KeyboardInterrupt bug.
# http://stackoverflow.com/a/1408476
yield m, result.get(1e9)
finally:
pool.terminate()
pool.join()
def _serial(self, mols, error_mode, callback, error_callback):
calculate = make_calculator(self, error_mode)
for m in mols:
if error_callback is not None:
try:
r = calculate(m)
except Exception as e:
r = error_callback(e)
else:
r = calculate(m)
if callback is not None:
callback(r)
yield m, r
[docs] def map(self, mols, processes=None, error_mode='raise', callback=None, error_callback=None):
r"""calculate descriptors over mols.
:type mols: :py:class:`Iterable` (:py:class:`Mol`)
:param mols: moleculars
:type processes: :py:class:`int` or :py:class:`None`
:param processes: number of process. None is :py:func:`multiprocessing.cpu_count`
:type error_mode: :py:class:`str`
:param error_mode:
* 'raise': raise Exception
* 'ignore': ignore Exception
* 'log': print Exception to stderr and ingore Exception
:type callback: :py:class:`Callable` ([(:py:class:`Descriptor`, scalar)])
-> :py:class:`None`
:param callback: call when calculate finished par molecule
:type error_callback: :py:class:`Callable` (:py:class:`Exception`) -> scalar
:param error_callback: call when Exception raised
:rtype: :py:class:`Iterator` ((:py:class:`Mol`, [(:py:class:`Descriptor`, scalar)]]))
"""
assert error_mode in set(['raise', 'ignore', 'log'])
if processes == 1:
return self._serial(mols, error_mode, callback, error_callback)
else:
return self._parallel(mols, processes, error_mode, callback, error_callback)
calculate = None
def initializer(calc, e_mode):
global calculate
calculate = make_calculator(calc, e_mode)
def make_calculator(calc, e_mode):
if e_mode == 'raise':
return calc
elif e_mode == 'ignore':
def ignore(e):
return float('nan')
return lambda m: calc(m, error_callback=ignore)
else:
def ignore_and_log(e):
sys.stderr.write('{}\n'.format(e))
return float('nan')
return lambda m: calc(m, error_callback=ignore_and_log)
def worker(binary):
return calculate(Chem.Mol(binary))
[docs]def all_descriptors():
r"""yield all descriptors.
:returns: all modules
:rtype: :py:class:`Iterator` (:py:class:`module`)
"""
base_dir = os.path.dirname(__file__)
for name in os.listdir(base_dir):
name, ext = os.path.splitext(name)
if name[:1] == '_' or ext != '.py':
continue
yield import_module('..' + name, __name__)
[docs]def get_descriptors_from_module(mdl):
r"""get descriptors from module.
:type mdl: module
:param mdl: module to search
:rtype: [:py:class:`Descriptor`]
"""
descs = []
for name in dir(mdl):
if name[:1] == '_':
continue
desc = getattr(mdl, name)
if Descriptor.is_descriptor_class(desc):
descs.append(desc)
def key_by_def(d):
try:
return getsourcelines(d)[1]
except IOError:
return maxsize
descs.sort(key=key_by_def)
return descs
def parse_enum(enum, v):
if isinstance(v, enum):
return v
else:
return enum[v]