Parallel processing#

Generating an IFP in parallel — prolif.parallel#

This module provides classes that handle parallel processing for the run_from_iterable() and run() methods.

TL;DR for MD trajectories

For trajectories, ProLIF has some quite basic heuristics to decide between two parallelization strategies, chunk and queue, as well as the maximum number of cores to use. You can override this by specifying the parallel_strategy and n_jobs parameters to run(). We encourage users to benchmark these on their specific system to optimize the performance of their analysis.

For run_from_iterable(), parallel execution is handled by MolIterablePool. For run(), the analysis is performed on a trajectory and 2 strategies are available, handled by TrajectoryPool (chunk strategy) or TrajectoryPoolQueue (queue strategy):

  • chunk: splits the trajectory into chunks and distributes one chunk per worker process. This strategy requires pickling the trajectory object on each worker, so performance scales poorly if the trajectory object is large (e.g. contains complex topology information). However, for simple objects it offers better scaling as each worker can efficiently iterate through its assigned chunk.

  • queue: the main process iterates through the trajectory and converts frames into lightweight RDKit molecules, which are then put into a queue consumed by worker processes. This strategy avoids the overhead of pickling large trajectory objects, but the main process can become a bottleneck if frame conversion is slow.

By default, the strategy is automatically chosen based on the estimated pickle size of the trajectory object: if it exceeds prolif.parallel.MDA_PARALLEL_STRATEGY_THRESHOLD (300 kB), the queue strategy is used, otherwise chunk. This is a pretty rough heuristic and might not always be optimal, so you can override it by passing an explicit parallel_strategy='chunk' or parallel_strategy='queue' to run().

Because processing each frame and converting the topology to RDKit molecules with 3D information can be quite slow and the main bottleneck of parallel processing, by default the number of logical cores is capped at prolif.parallel.MAX_JOBS (PROLIF_MAX_JOBS env variable) when the analysis is performed on a trajectory. No capping is done if an explicit number of cores is passed through the n_jobs parameter or the PROLIF_N_JOBS env variable.

prolif.parallel.MAX_JOBS = 10#

Limits the max number of processes (unless the number of jobs is specified by the user directly) to avoid oversubscription as IO tends to be the bottleneck.

prolif.parallel.MDA_PARALLEL_STRATEGY_THRESHOLD: int = 300000#

Threshold used to determine the parallel strategy for MDAnalysis trajectories. If the pickled trajectory object is larger than this threshold, queue is used, otherwise chunk.

class prolif.parallel.MolIterablePool(n_processes: int | None, fingerprint: Fingerprint, prot_mol: Molecule, residues: ResidueSelection, tqdm_kwargs: dict)[source]#

Process pool for a parallelized IFP analysis on an iterable of ligands. Must be used in a with statement.

Parameters:
  • n_processes (int | None) – Max number of processes

  • fingerprint (prolif.fingerprint.Fingerprint) – Fingerprint instance used to generate the IFP

  • prot_mol (prolif.molecule.Molecule) – Protein molecule

  • residues (list, optional) – List of protein residues considered for the IFP

  • tqdm_kwargs (dict) – Parameters for the tqdm progress bar

pool#

The underlying pool instance.

Type:

multiprocess.pool.Pool

classmethod executor(mol: Molecule) IFP[source]#

Classmethod executed by each child process on a single ligand molecule from the input iterable.

Returns:

ifp_data – A dictionary indexed by (ligand, protein) residue pairs, and each value is a sparse dictionary of metadata indexed by interaction name.

Return type:

prolif.ifp.IFP

classmethod initializer(fingerprint: Fingerprint, prot_mol: Molecule, residues: ResidueSelection) None[source]#

Initializer classmethod passed to the pool so that each child process can access these objects without copying them.

process(args_iterable: Iterable[Molecule]) Iterable[IFP][source]#

Maps the input iterable of molecules to the executor function.

Parameters:

args_iterable (Iterable[prolif.molecule.Molecule]) – An iterable yielding ligand molecules

Returns:

ifp – An iterable of IFP dictionaries.

Return type:

Iterable[prolif.ifp.IFP]

class prolif.parallel.Progress(killswitch: Event, tracker: Synchronized, **kwargs: Any)[source]#

Helper class to track the number of frames processed by the TrajectoryPool.

Parameters:
  • killswitch (threading.Event) – A threading.Event instance created and controlled by the TrajectoryPool to kill the thread updating the tqdm progress bar.

  • tracker (multiprocess.Value) – Value holding a ctypes.c_uint32 ctype updated by the TrajectoryPool, storing how many frames were processed since the last progress bar update.

  • **kwargs (object) – Used to create the tqdm progress bar.

delay#

Delay between progress bar updates. This requires locking access to the tracker object which the TrajectoryPool needs access to, so too small values might cause delays in the analysis.

Type:

float, default = 0.5

tqdm_pbar#

The progress bar displayed

Type:

tqdm.std.tqdm

close() None[source]#

Cleanup after the TrajectoryPool computation is done

event_loop() None[source]#

Event loop targeted by a separate thread

update() None[source]#

Update the value displayed by the progress bar

class prolif.parallel.TrajectoryPool(n_processes: int | None, fingerprint: Fingerprint, residues: ResidueSelection, tqdm_kwargs: dict, rdkitconverter_kwargs: tuple[dict, dict], use_segid: bool)[source]#

Process pool for a parallelized IFP analysis on an MD trajectory. Must be used in a with statement.

Parameters:
  • n_processes (int | None) – Max number of processes

  • fingerprint (prolif.fingerprint.Fingerprint) – Fingerprint instance used to generate the IFP

  • residues (list, optional) – List of protein residues considered for the IFP

  • tqdm_kwargs (dict) – Parameters for the tqdm progress bar

  • rdkitconverter_kwargs (tuple[dict, dict]) – Parameters for the RDKitConverter from MDAnalysis: the first for the ligand, and the second for the protein

  • use_segid (bool) – Use the segment number rather than the chain identifier as a chain.

tracker#

Value holding a ctypes.c_uint32 ctype storing how many frames were processed since the last progress bar update.

Type:

multiprocess.Value

pool#

The underlying pool instance.

Type:

multiprocess.pool.Pool

.. versionchanged:: 2.1.0

Added use_segid.

classmethod executor(args: tuple['Trajectory', 'MDAObject', 'MDAObject', 'NDArray']) IFPResults[source]#

Classmethod executed by each child process on a chunk of the trajectory

Returns:

ifp_chunk – A dictionary of IFP indexed by frame number

Return type:

dict[int, prolif.ifp.IFP]

classmethod initializer(tracker: Synchronized, fingerprint: Fingerprint, residues: ResidueSelection, rdkitconverter_kwargs: tuple[dict, dict], use_segid: bool) None[source]#

Initializer classmethod passed to the pool so that each child process can access these objects without copying them.

process(args_iterable: Iterable[tuple['Trajectory', 'MDAObject', 'MDAObject', 'NDArray']]) list['IFPResults'][source]#

Maps the input iterable of arguments to the executor function.

Parameters:

args_iterable (Iterable[tuple]) – Iterable of tuple of trajectory, ligand atomgroup, protein atomgroup, and array of frame indices.

Returns:

ifp – An iterable of dictionaries of IFP indexed by frame number.

Return type:

list[dict[int, prolif.ifp.IFP]]

class prolif.parallel.TrajectoryPoolQueue(n_processes: int | None, fingerprint: Fingerprint, residues: ResidueSelection, tqdm_kwargs: dict, rdkitconverter_kwargs: tuple[dict, dict], use_segid: bool)[source]#

Queue-based process pool for IFP analysis on an MD trajectory.

This implementation uses a producer-consumer pattern where: - A producer thread iterates over the trajectory and converts frames to Molecules - Worker processes consume Molecule pairs from a queue and compute fingerprints - This avoids pickling MDAnalysis objects, which is the bottleneck in TrajectoryPool

Must be used in a with statement.

Parameters:
  • n_processes (int | None) – Max number of worker processes

  • fingerprint (prolif.fingerprint.Fingerprint) – Fingerprint instance used to generate the IFP

  • residues (list, optional) – List of protein residues considered for the IFP

  • tqdm_kwargs (dict) – Parameters for the tqdm progress bar

  • rdkitconverter_kwargs (tuple[dict, dict]) – Parameters for the RDKitConverter from MDAnalysis: the first for the ligand, and the second for the protein

  • use_segid (bool) – Use the segment number rather than the chain identifier as a chain.

  • versionadded: (..) – 2.1.0:

classmethod executor(args: tuple[int, prolif.molecule.Molecule, prolif.molecule.Molecule]) tuple[int, 'IFP'][source]#

Classmethod executed by each child process on a single frame.

Parameters:

args (tuple[int, Molecule, Molecule]) – Tuple of (frame_number, ligand_mol, protein_mol)

Returns:

result – Tuple of (frame_number, IFP data)

Return type:

tuple[int, prolif.ifp.IFP]

classmethod initializer(fingerprint: Fingerprint, residues: ResidueSelection) None[source]#

Initializer classmethod passed to the pool so that each child process can access these objects without copying them.

process(traj: Trajectory, lig: MDAObject, prot: MDAObject) IFPResults[source]#

Process the trajectory using a producer-consumer pattern.

Parameters:
  • traj (Trajectory) – MDAnalysis trajectory or sliced trajectory

  • lig (MDAObject) – Ligand AtomGroup

  • prot (MDAObject) – Protein AtomGroup

Returns:

ifp – A dictionary of IFP indexed by frame number

Return type:

dict[int, prolif.ifp.IFP]

prolif.parallel.get_mda_parallel_strategy(strategy: Optional[Literal['chunk', 'queue']], traj: Trajectory) Literal['chunk', 'queue'][source]#

Get the parallel strategy to use for MDAnalysis based on how large the trajectory object is. For small objects, chunk is faster, while queue is faster for large objects.

prolif.parallel.get_n_jobs(n_jobs: int | None = None, capped: bool = False) int | None[source]#

Get the number of parallel jobs to use.

Prioritizes the n_jobs parameter, then the PROLIF_N_JOBS environment variable, then the number of logical cores (capped to PROLIF_MAX_JOBS (10 by default) if capped=True), finally None if psutil.cpu_count() couldn’t retrieve the number of logical cores.