Parallel processing#
Generating an IFP in parallel — prolif.parallel#
This module provides classes that handle parallel processing for the
run_from_iterable() and
run() methods.
TL;DR for MD trajectories
For trajectories, ProLIF has some quite basic heuristics to decide between two
parallelization strategies, chunk and queue, as well as the maximum number
of cores to use. You can override this by specifying the parallel_strategy
and n_jobs parameters to run(). We
encourage users to benchmark these on their specific system to optimize the
performance of their analysis.
For run_from_iterable(), parallel execution is
handled by MolIterablePool.
For run(), the analysis is performed on a
trajectory and 2 strategies are available, handled by TrajectoryPool (chunk
strategy) or TrajectoryPoolQueue (queue strategy):
chunk: splits the trajectory into chunks and distributes one chunk per worker process. This strategy requires pickling the trajectory object on each worker, so performance scales poorly if the trajectory object is large (e.g. contains complex topology information). However, for simple objects it offers better scaling as each worker can efficiently iterate through its assigned chunk.queue: the main process iterates through the trajectory and converts frames into lightweight RDKit molecules, which are then put into a queue consumed by worker processes. This strategy avoids the overhead of pickling large trajectory objects, but the main process can become a bottleneck if frame conversion is slow.
By default, the strategy is automatically chosen based on the estimated pickle size of
the trajectory object: if it exceeds
prolif.parallel.MDA_PARALLEL_STRATEGY_THRESHOLD (300 kB), the queue
strategy is used, otherwise chunk. This is a pretty rough heuristic and might not
always be optimal, so you can override it by passing an explicit
parallel_strategy='chunk' or parallel_strategy='queue' to
run().
Because processing each frame and converting the topology to RDKit molecules with 3D
information can be quite slow and the main bottleneck of parallel processing, by default
the number of logical cores is capped at prolif.parallel.MAX_JOBS
(PROLIF_MAX_JOBS env variable) when the analysis is performed on a trajectory. No
capping is done if an explicit number of cores is passed through the n_jobs
parameter or the PROLIF_N_JOBS env variable.
- prolif.parallel.MAX_JOBS = 10#
Limits the max number of processes (unless the number of jobs is specified by the user directly) to avoid oversubscription as IO tends to be the bottleneck.
- prolif.parallel.MDA_PARALLEL_STRATEGY_THRESHOLD: int = 300000#
Threshold used to determine the parallel strategy for MDAnalysis trajectories. If the pickled trajectory object is larger than this threshold,
queueis used, otherwisechunk.
- class prolif.parallel.MolIterablePool(n_processes: int | None, fingerprint: Fingerprint, prot_mol: Molecule, residues: ResidueSelection, tqdm_kwargs: dict)[source]#
Process pool for a parallelized IFP analysis on an iterable of ligands. Must be used in a
withstatement.- Parameters:
n_processes (int | None) – Max number of processes
fingerprint (prolif.fingerprint.Fingerprint) – Fingerprint instance used to generate the IFP
prot_mol (prolif.molecule.Molecule) – Protein molecule
residues (list, optional) – List of protein residues considered for the IFP
tqdm_kwargs (dict) – Parameters for the
tqdmprogress bar
- pool#
The underlying pool instance.
- Type:
- classmethod executor(mol: Molecule) IFP[source]#
Classmethod executed by each child process on a single ligand molecule from the input iterable.
- Returns:
ifp_data – A dictionary indexed by
(ligand, protein)residue pairs, and each value is a sparse dictionary of metadata indexed by interaction name.- Return type:
- classmethod initializer(fingerprint: Fingerprint, prot_mol: Molecule, residues: ResidueSelection) None[source]#
Initializer classmethod passed to the pool so that each child process can access these objects without copying them.
- class prolif.parallel.Progress(killswitch: Event, tracker: Synchronized, **kwargs: Any)[source]#
Helper class to track the number of frames processed by the
TrajectoryPool.- Parameters:
killswitch (threading.Event) – A threading.Event instance created and controlled by the
TrajectoryPoolto kill the thread updating thetqdmprogress bar.tracker (multiprocess.Value) – Value holding a
ctypes.c_uint32ctype updated by theTrajectoryPool, storing how many frames were processed since the last progress bar update.**kwargs (object) – Used to create the
tqdmprogress bar.
- delay#
Delay between progress bar updates. This requires locking access to the
trackerobject which theTrajectoryPoolneeds access to, so too small values might cause delays in the analysis.- Type:
float, default = 0.5
- tqdm_pbar#
The progress bar displayed
- Type:
tqdm.std.tqdm
- close() None[source]#
Cleanup after the
TrajectoryPoolcomputation is done
- class prolif.parallel.TrajectoryPool(n_processes: int | None, fingerprint: Fingerprint, residues: ResidueSelection, tqdm_kwargs: dict, rdkitconverter_kwargs: tuple[dict, dict], use_segid: bool)[source]#
Process pool for a parallelized IFP analysis on an MD trajectory. Must be used in a
withstatement.- Parameters:
n_processes (int | None) – Max number of processes
fingerprint (prolif.fingerprint.Fingerprint) – Fingerprint instance used to generate the IFP
residues (list, optional) – List of protein residues considered for the IFP
tqdm_kwargs (dict) – Parameters for the
tqdmprogress barrdkitconverter_kwargs (tuple[dict, dict]) – Parameters for the
RDKitConverterfrom MDAnalysis: the first for the ligand, and the second for the proteinuse_segid (bool) – Use the segment number rather than the chain identifier as a chain.
- tracker#
Value holding a
ctypes.c_uint32ctype storing how many frames were processed since the last progress bar update.- Type:
multiprocess.Value
- pool#
The underlying pool instance.
- Type:
- .. versionchanged:: 2.1.0
Added use_segid.
- classmethod executor(args: tuple['Trajectory', 'MDAObject', 'MDAObject', 'NDArray']) IFPResults[source]#
Classmethod executed by each child process on a chunk of the trajectory
- Returns:
ifp_chunk – A dictionary of
IFPindexed by frame number- Return type:
- classmethod initializer(tracker: Synchronized, fingerprint: Fingerprint, residues: ResidueSelection, rdkitconverter_kwargs: tuple[dict, dict], use_segid: bool) None[source]#
Initializer classmethod passed to the pool so that each child process can access these objects without copying them.
- class prolif.parallel.TrajectoryPoolQueue(n_processes: int | None, fingerprint: Fingerprint, residues: ResidueSelection, tqdm_kwargs: dict, rdkitconverter_kwargs: tuple[dict, dict], use_segid: bool)[source]#
Queue-based process pool for IFP analysis on an MD trajectory.
This implementation uses a producer-consumer pattern where: - A producer thread iterates over the trajectory and converts frames to Molecules - Worker processes consume Molecule pairs from a queue and compute fingerprints - This avoids pickling MDAnalysis objects, which is the bottleneck in TrajectoryPool
Must be used in a
withstatement.- Parameters:
n_processes (int | None) – Max number of worker processes
fingerprint (prolif.fingerprint.Fingerprint) – Fingerprint instance used to generate the IFP
residues (list, optional) – List of protein residues considered for the IFP
tqdm_kwargs (dict) – Parameters for the
tqdmprogress barrdkitconverter_kwargs (tuple[dict, dict]) – Parameters for the
RDKitConverterfrom MDAnalysis: the first for the ligand, and the second for the proteinuse_segid (bool) – Use the segment number rather than the chain identifier as a chain.
versionadded: (..) – 2.1.0:
- classmethod executor(args: tuple[int, prolif.molecule.Molecule, prolif.molecule.Molecule]) tuple[int, 'IFP'][source]#
Classmethod executed by each child process on a single frame.
- classmethod initializer(fingerprint: Fingerprint, residues: ResidueSelection) None[source]#
Initializer classmethod passed to the pool so that each child process can access these objects without copying them.
- process(traj: Trajectory, lig: MDAObject, prot: MDAObject) IFPResults[source]#
Process the trajectory using a producer-consumer pattern.
- Parameters:
traj (Trajectory) – MDAnalysis trajectory or sliced trajectory
lig (MDAObject) – Ligand AtomGroup
prot (MDAObject) – Protein AtomGroup
- Returns:
ifp – A dictionary of
IFPindexed by frame number- Return type:
- prolif.parallel.get_mda_parallel_strategy(strategy: Optional[Literal['chunk', 'queue']], traj: Trajectory) Literal['chunk', 'queue'][source]#
Get the parallel strategy to use for MDAnalysis based on how large the trajectory object is. For small objects,
chunkis faster, whilequeueis faster for large objects.
- prolif.parallel.get_n_jobs(n_jobs: int | None = None, capped: bool = False) int | None[source]#
Get the number of parallel jobs to use.
Prioritizes the
n_jobsparameter, then thePROLIF_N_JOBSenvironment variable, then the number of logical cores (capped toPROLIF_MAX_JOBS(10 by default) ifcapped=True), finallyNoneifpsutil.cpu_count()couldn’t retrieve the number of logical cores.