"""
Generating an IFP in parallel --- :mod:`prolif.parallel`
========================================================
This module provides two classes, :class:`TrajectoryPool` and :class:`MolIterablePool`
to execute the analysis in parallel. These are used in the parallel implementation used
in :meth:`~prolif.fingerprint.Fingerprint.run` and
:meth:`~prolif.fingerprint.Fingerprint.run_from_iterable` respectively.
"""
from collections.abc import Iterable
from ctypes import c_uint32
from threading import Event, Thread
from time import sleep
from typing import TYPE_CHECKING, Any, ClassVar, cast
from multiprocess import Value
from multiprocess.pool import Pool
from tqdm.auto import tqdm
from prolif.molecule import Molecule
from prolif.pickling import PICKLE_HANDLER
if TYPE_CHECKING:
from multiprocessing.pool import Pool as BuiltinPool
from multiprocessing.sharedctypes import Synchronized
from numpy.typing import NDArray
from prolif.fingerprint import Fingerprint
from prolif.ifp import IFP
from prolif.typeshed import IFPResults, MDAObject, ResidueSelection, Trajectory
[docs]class Progress:
"""Helper class to track the number of frames processed by the
:class:`TrajectoryPool`.
Parameters
----------
killswitch : threading.Event
A threading.Event instance created and controlled by the :class:`TrajectoryPool`
to kill the thread updating the :class:`~tqdm.std.tqdm` progress bar.
tracker : multiprocess.Value
Value holding a :class:`ctypes.c_uint32` ctype updated by the
:class:`TrajectoryPool`, storing how many frames were processed since the last
progress bar update.
**kwargs : object
Used to create the :class:`~tqdm.std.tqdm` progress bar.
Attributes
----------
delay : float, default = 0.5
Delay between progress bar updates. This requires locking access to the
``tracker`` object which the :class:`TrajectoryPool` needs access to, so too
small values might cause delays in the analysis.
tqdm_pbar : tqdm.std.tqdm
The progress bar displayed
"""
delay: ClassVar[float] = 0.5
def __init__(
self, killswitch: Event, tracker: "Synchronized", **kwargs: Any
) -> None:
self.tqdm_pbar = tqdm(**kwargs)
self.killswitch = killswitch
self.tracker = tracker
[docs] def close(self) -> None:
"""Cleanup after the :class:`TrajectoryPool` computation is done"""
self.update()
self.tqdm_pbar.close()
[docs] def update(self) -> None:
"""Update the value displayed by the progress bar"""
if self.tracker.value != 0:
with self.tracker.get_lock():
increment = self.tracker.value
self.tracker.value = 0
self.tqdm_pbar.update(increment)
[docs] def event_loop(self) -> None:
"""Event loop targeted by a separate thread"""
while True:
if self.killswitch.is_set():
break
self.update()
sleep(self.delay)
[docs]class TrajectoryPool:
"""Process pool for a parallelized IFP analysis on an MD trajectory. Must be used
in a ``with`` statement.
Parameters
----------
n_processes : int | None
Max number of processes
fingerprint : prolif.fingerprint.Fingerprint
Fingerprint instance used to generate the IFP
residues : list, optional
List of protein residues considered for the IFP
tqdm_kwargs : dict
Parameters for the :class:`~tqdm.std.tqdm` progress bar
rdkitconverter_kwargs : tuple[dict, dict]
Parameters for the :class:`~MDAnalysis.converters.RDKit.RDKitConverter`
from MDAnalysis: the first for the ligand, and the second for the protein
use_segid: bool
Use the segment number rather than the chain identifier as a chain.
Attributes
----------
tracker : multiprocess.Value
Value holding a :class:`ctypes.c_uint32` ctype storing how many frames were
processed since the last progress bar update.
pool : multiprocess.pool.Pool
The underlying pool instance.
.. versionchanged:: 2.1.0
Added `use_segid`.
"""
tracker: ClassVar["Synchronized"] = cast("Synchronized", Value(c_uint32, lock=True))
fp: ClassVar["Fingerprint"]
residues: ClassVar["ResidueSelection"]
converter_kwargs: ClassVar[tuple[dict, dict]]
use_segid: bool
def __init__(
self,
n_processes: int | None,
fingerprint: "Fingerprint",
residues: "ResidueSelection",
tqdm_kwargs: dict,
rdkitconverter_kwargs: tuple[dict, dict],
use_segid: bool,
) -> None:
self.tqdm_kwargs = tqdm_kwargs
self.pool = cast(
"BuiltinPool",
Pool(
n_processes,
initializer=self.initializer,
initargs=(
self.tracker,
fingerprint,
residues,
rdkitconverter_kwargs,
use_segid,
),
),
)
[docs] @classmethod
def initializer(
cls,
tracker: "Synchronized",
fingerprint: "Fingerprint",
residues: "ResidueSelection",
rdkitconverter_kwargs: tuple[dict, dict],
use_segid: bool,
) -> None:
"""Initializer classmethod passed to the pool so that each child process can
access these objects without copying them."""
cls.tracker = tracker
cls.fp = fingerprint
cls.residues = residues
cls.converter_kwargs = rdkitconverter_kwargs
cls.use_segid = use_segid
[docs] @classmethod
def executor(
cls, args: tuple["Trajectory", "MDAObject", "MDAObject", "NDArray"]
) -> "IFPResults":
"""Classmethod executed by each child process on a chunk of the trajectory
Returns
-------
ifp_chunk: dict[int, prolif.ifp.IFP]
A dictionary of :class:`~prolif.ifp.IFP` indexed by frame number
"""
traj, lig, prot, chunk = args
ifp: "IFPResults" = {}
for ts in traj[chunk]:
lig_mol = Molecule.from_mda(
lig, use_segid=cls.use_segid, **cls.converter_kwargs[0]
)
prot_mol = Molecule.from_mda(
prot, use_segid=cls.use_segid, **cls.converter_kwargs[1]
)
data = cls.fp.generate(
lig_mol,
prot_mol,
residues=cls.residues,
metadata=True,
)
ifp[int(ts.frame)] = data
with cls.tracker.get_lock():
cls.tracker.value += 1
return ifp
[docs] def process(
self,
args_iterable: Iterable[
tuple["Trajectory", "MDAObject", "MDAObject", "NDArray"]
],
) -> list["IFPResults"]:
"""Maps the input iterable of arguments to the executor function.
Parameters
----------
args_iterable : typing.Iterable[tuple]
Iterable of tuple of trajectory, ligand atomgroup, protein atomgroup, and
array of frame indices.
Returns
-------
ifp: list[dict[int, prolif.ifp.IFP]]
An iterable of dictionaries of :class:`~prolif.ifp.IFP` indexed by frame
number.
"""
return self.pool.map(self.executor, args_iterable, chunksize=1)
def __enter__(self) -> "TrajectoryPool":
"""Sets up the :class:`Progress` instance and associated killswitch event, and
starts the progress event loop in a separate thread."""
self.killswitch = Event()
self.tracker.value = 0
self.progress = Progress(self.killswitch, self.tracker, **self.tqdm_kwargs)
self.progress_thread = Thread(target=self.progress.event_loop)
self.progress_thread.start()
return self
def __exit__(self, *exc: Any) -> None:
"""Call the killswitch and close the progress."""
self.killswitch.set()
self.progress.close()
[docs]class MolIterablePool:
"""Process pool for a parallelized IFP analysis on an iterable of ligands. Must be
used in a ``with`` statement.
Parameters
----------
n_processes : int | None
Max number of processes
fingerprint : prolif.fingerprint.Fingerprint
Fingerprint instance used to generate the IFP
prot_mol : prolif.molecule.Molecule
Protein molecule
residues : list, optional
List of protein residues considered for the IFP
tqdm_kwargs : dict
Parameters for the :class:`~tqdm.std.tqdm` progress bar
Attributes
----------
pool : multiprocess.pool.Pool
The underlying pool instance.
"""
fp: ClassVar["Fingerprint"]
pmol: ClassVar[Molecule]
residues: ClassVar["ResidueSelection"]
def __init__(
self,
n_processes: int | None,
fingerprint: "Fingerprint",
prot_mol: Molecule,
residues: "ResidueSelection",
tqdm_kwargs: dict,
) -> None:
self.tqdm_kwargs = tqdm_kwargs
self.pool = cast(
"BuiltinPool",
Pool(
n_processes,
initializer=self.initializer,
initargs=(fingerprint, prot_mol, residues),
),
)
[docs] @classmethod
def initializer(
cls,
fingerprint: "Fingerprint",
prot_mol: Molecule,
residues: "ResidueSelection",
) -> None:
"""Initializer classmethod passed to the pool so that each child process can
access these objects without copying them."""
cls.fp = fingerprint
cls.pmol = prot_mol
cls.residues = residues
[docs] @classmethod
def executor(cls, mol: Molecule) -> "IFP":
"""Classmethod executed by each child process on a single ligand molecule from
the input iterable.
Returns
-------
ifp_data : prolif.ifp.IFP
A dictionary indexed by ``(ligand, protein)`` residue pairs, and each value
is a sparse dictionary of metadata indexed by interaction name.
"""
return cls.fp.generate(mol, cls.pmol, residues=cls.residues, metadata=True)
[docs] def process(self, args_iterable: Iterable[Molecule]) -> Iterable["IFP"]:
"""Maps the input iterable of molecules to the executor function.
Parameters
----------
args_iterable : typing.Iterable[prolif.molecule.Molecule]
An iterable yielding ligand molecules
Returns
-------
ifp : typing.Iterable[prolif.ifp.IFP]
An iterable of :class:`~prolif.ifp.IFP` dictionaries.
"""
results = self.pool.imap(self.executor, args_iterable, chunksize=1)
return tqdm(results, **self.tqdm_kwargs)
def __enter__(self) -> "MolIterablePool":
"""Sets up which properties will be pickled by RDKit by default"""
PICKLE_HANDLER.set()
return self
def __exit__(self, *exc: Any) -> None:
"""Resets RDKit's default pickled properties"""
PICKLE_HANDLER.reset()