Source code for swamp.search.searchtarget

import os
import swamp
import joblib
import conkit.io
import itertools
import pandas as pd
from pyjob import TaskFactory
from swamp.logger import SwampLogger
from swamp.search.searchjob import SearchJob
from swamp.utils.swamplibrary import SwampLibrary
from swamp.utils import TargetSplit, renumber_hierarchy


[docs]class SearchTarget(object): """Class to search the SWAMP library and rank search models according to their CMO with a given target. Using CMO alignment tools determine the best fragments in the library to be used as search models for a given \ target. First, the target will be split into several subtargets (one for each helical pair with enough \ interhelical contact information, and several :py:obj:`~swamp.search.searchjob.SearchJob` instances will be \ executed. :param str workdir: working directory for the :py:obj:`~swamp.search.searchjob.SearchJob` instances :param str conpred: contact prediction file of the target :param str sspred: secondary structure prediction file of the target (must be topcons format file) :param str conformat: format of the contact prediction file provided for the target (default: 'psicov') :param str nthreads: number of parallel threads to use in the library search (default: 1) :param tuple template_subset: set of templates to be used instead of the full fragment library (deafult: None) :param str target_pdb_benchmark: provide a target's pdb file for benchmark purposes (default: None) :param str alignment_algorithm_name: algorithm used for CMO calculation (default: 'mapalign') :param `~swamp.logger.swamplogger.SwampLogger` logger: logging interface for the search (default None) :param int n_contacts_threshold: min. no. of interhelical cont. to include a subtarget in the search (default: 28) :param str platform: scheduler system where the array will be executed (default 'sge') :param str queue_name: name of the scheduler queue where the tasks should be sent (default None) :param str queue_environment: name of the scheduler environment where the tasks should be sent (default None) :param str python_interpreter: python interpreter to be used for task execution (default '$CCP4/bin/ccp4-python') :ivar str shell_interpreter: shell interpreter to be used for task execution (default '/bin/bash') :ivar bool error: True if errors have occurred at some point in the pipeline :ivar `~swamp.utils.targetsplit.TargetSplit` target: contains information about the target and subtargets :ivar dict con_precision_dict: a dictionary with the contact precission for each given subtarget prediction :ivar dict search_pickle_dict: a dictionary with the :py:attr:`~swamp.search.searchjob.SearchJob.pickle_fname` \ created by each :py:obj:`~swamp.search.searchjob.SearchJob` instance in this search :ivar list results: a nested list with the results obtained in the search againts the library :ivar list scripts: a list with the instances of :py:obj:`pyjob.Script` that will be executed to complete the search :ivar `pandas.DataFrame` ranked_searchmodels: a dataframe with the search models ranked by the CMO obtained in the \ search :example: >>> from swamp.search import SearchTarget >>> my_rank = SearchTarget('<workdir>', '<conpred>', '<sspred>') >>> my_rank.search() >>> my_rank.rank() """ def __init__(self, workdir, conpred, sspred, conformat="psicov", nthreads=1, template_subset=None, logger=None, target_pdb_benchmark=None, alignment_algorithm_name='mapalign', n_contacts_threshold=28, python_interpreter=os.path.join(os.environ['CCP4'], 'bin', 'ccp4-python'), platform='sge', queue_name=None, queue_environment=None): self.workdir = workdir self.conpred = conpred self.sspred = sspred self.conformat = conformat self.nthreads = nthreads self.target_pdb_benchmark = target_pdb_benchmark self.alignment_algorithm_name = alignment_algorithm_name self.template_subset = template_subset self.n_contacts_threshold = n_contacts_threshold self.platform = platform self.queue_name = queue_name self.queue_environment = queue_environment self.python_interpreter = python_interpreter self.shell_interpreter = '/bin/bash' self.con_precision_dict = None self.search_pickle_dict = None self.scripts = None self.results = None self.ranked_searchmodels = None self._make_workdir() if logger is None: self.logger = SwampLogger(__name__) self.logger.init(logfile=os.path.join(self.workdir, "swamp_rank.log"), use_console=True, console_level='info', logfile_level='debug') else: self.logger = logger self.target = TargetSplit(workdir=self.workdir, conpred=self.conpred, sspred=self.sspred, logger=self.logger, conformat=self.conformat, pdb_benchmark=self.target_pdb_benchmark) if self.target_pdb_benchmark is not None and not os.path.isdir(swamp.FRAG_PDB_DB): self.logger.warning('PDB benchmark was requested but %s PDB library was not found!' % swamp.FRAG_PDB_DB) self.target_pdb_benchmark = None def __repr__(self): return '{}(workdir="{_workdir}", conpred="{_conpred}", sspred="{_sspred}", conformat="{_conformat}", ' \ 'nthreads="{_nthreads}", template_subset="{_template_subset}", logger="{_logger}", ' \ 'target_pdb_benchmark="{_target_pdb_benchmark}", alignment_algorithm_name="{_alignment_algorithm_name}' \ '")'.format(self.__class__.__name__, **self.__dict__) # ------------------ Properties ------------------ @property def search_header(self): """Header displayed when initiating :py:obj:`~swamp.logger.swamplogger.SwampLogger`""" return """********************************************************************** ***************** SWAMP SEARCH ***************** ********************************************************************** """ @property def _tmp_cmap(self): """A temporary file name to contain the contact predicionts of each subtarget at \ :py:attr:`swamp.utils.targetsplit.TargetSplit.ranked_subtargets`""" return os.path.join(self.workdir, "tmp_cmap_{}.map") @property def _search_workdir(self): """The workind girectory for each :py:obj:`~swamp.search.searchjob.SearchJob` instance in this search""" return os.path.join(self.workdir, "search_{}") @property def _tmp_pdb(self): """A temporary file name to contain the pdb file each subtarget at \ :py:attr:`swamp.utils.targetsplit.TargetSplit.ranked_subtargets`""" if self.target_pdb_benchmark is not None: return os.path.join(self.workdir, 'tmp_strct_{}.pdb') else: return None @property def template_library(self): """Location of the template library to be used with the :py:obj:`~swamp.search.searchjob.SearchJob` instances""" if self.alignment_algorithm_name == 'aleigen': return swamp.FRAG_ALEIGEN_DB elif self.alignment_algorithm_name == 'mapalign': return swamp.FRAG_MAPALIGN_DB else: raise ValueError("Unrecognised alignment tool! %s" % self.alignment_algorithm_name) @property def library_format(self): """Dictionary specifying the template library format to be used in each \ :py:obj:`~swamp.search.searchjob.SearchJob` instance""" if self.alignment_algorithm_name == 'aleigen': return 'aleigen' elif self.alignment_algorithm_name == 'mapalign': return 'mapalign' else: raise ValueError("Unrecognised alignment tool! %s" % self.alignment_algorithm_name) @property def _other_task_info(self): """Dictionary with the extra **kwags passed to :py:obj:`pyjob.TaskFactory`""" info = {'directory': self.workdir, 'shell': self.shell_interpreter, 'name': 'swamp'} if self.platform == 'local': info['processes'] = self.nthreads else: info['max_array_size'] = self.nthreads if self.queue_environment is not None: info['environment'] = self.queue_environment if self.queue_name is not None: info['queue'] = self.queue_name return info @property def _column_reference(self): """A list of column names for :py:attr:`~swamp.search.searchtarget.SearchTarget.results`""" if self.alignment_algorithm_name == 'aleigen': return ["SUBTRGT_RANK", "SUBTRGT_ID", "N_CON_MAP_A", "MAP_A", "MAP_B", "CON_SCO", "C1", "C2", "CMO", "ALI_LEN", "QSCORE", "RMSD", "SEQ_ID", "N_ALIGN"] elif self.alignment_algorithm_name == 'mapalign': return ["SUBTRGT_RANK", "SUBTRGT_ID", "N_CON_MAP_A", "MAP_A", "MAP_B", "CON_SCO", "GAP_SCO", "TOTAL_SCO", "ALI_LEN", "QSCORE", "RMSD", "SEQ_ID", "N_ALIGN"] else: raise ValueError("Unrecognised alignment tool! %s" % self.alignment_algorithm_name) # ------------------ Hidden methods ------------------ def _make_workdir(self): """Create the :py:attr:`~swamp.search.searchtarget.SearchTarget.workdir`""" if not os.path.isdir(self.workdir): os.mkdir(self.workdir) def _create_scripts(self): """Populate the :py:attr:`~swamp.search.searchtarget.SearchTarget.scripts` list with all the \ :py:obj:`pyjob.Script` instances that need to be executed. There is one instance for each of the subtargets \ at :py:attr:`swamp.utils.targetsplit.TargetSplit.ranked_subtargets` that passes the selected \ :py:attr:`~swamp.search.searchtarget.SearchTarget.n_contacts_threshold` threshold""" self.scripts = [] self.con_precision_dict = {} self.search_pickle_dict = {} for idx, subtarget in enumerate(self.target.ranked_subtargets, 1): self.logger.info("%s interhelical contacts found for subtarget %s" % (subtarget.ncontacts, idx)) if subtarget.ncontacts >= self.n_contacts_threshold: self.logger.info('The no. of contacts passes the threshold. Creating a task array for subtarget.') conkit.io.write(fname=self._tmp_cmap.format(idx), format=self.library_format, hierarchy=subtarget) if self.target_pdb_benchmark is not None: renumber_hierarchy(self.target.subtargets_pdb[subtarget.id]) self.target.subtargets_pdb[subtarget.id].write_pdb(self._tmp_pdb.format(idx)) perfect_contacts = conkit.io.read(self._tmp_pdb.format(idx), 'pdb').top_map subtarget.sequence = perfect_contacts.sequence.deepcopy() subtarget.set_sequence_register() precision = subtarget.match(perfect_contacts).precision self.con_precision_dict[subtarget.id] = precision searcher = SearchJob(**self._search_info(idx)) self.scripts.append(searcher.script) self.search_pickle_dict[searcher.pickle_fname] = subtarget else: self.logger.info("No. of contacts below the %s threshold. " "Subtarget will not be used in the search." % self.n_contacts_threshold) self.logger.info('%s subtargets will be used in the search. Creating task now.' % len(self.search_pickle_dict)) def _search_info(self, idx): """Create **kwargs passed to a :py:obj:`~swamp.search.searchjob.SearchJob` instance :param int idx: the index of the search job, used as :py:attr:`~swamp.search.searchjob.SearchJob.id` :returns: dictionary with the **kwargs to be passed into :py:obj:`~swamp.search.searchjob.SearchJob` instance """ info = {'workdir': self._search_workdir.format(idx), 'pdb_library': swamp.FRAG_PDB_DB, 'query': self._tmp_cmap.format(idx), 'algorithm': self.alignment_algorithm_name, 'template_subset': self.template_subset, 'python_interpreter': self.python_interpreter, 'template_library': self.template_library, 'library_format': self.library_format, 'logger': self.logger, 'con_format': self.library_format, 'id': idx} if self.target_pdb_benchmark is not None: info['query_pdb_benchmark'] = self._tmp_pdb.format(idx), return info def _make_dataframe(self, results, **kwargs): """Convert the :py:attr:`~swamp.search.searchtarget.SearchTarget.results` into a :py:obj:`pandas.Dataframe` :param list results: Nested list with the results of each of the CMO scans :param kwargs: arguments are passed to :py:func:`~swamp.search.seachtarget.SearchTarget._get_best_alignment` """ self.results = pd.DataFrame(results) self.results.columns = self._column_reference self.results["CENTROID_ID"] = [os.path.basename(fname).split('.')[0] for fname in self.results.MAP_B.to_list()] self._get_best_alignment(**kwargs) def _get_best_alignment(self, select_by=("CENTROID_ID", "SUBTRGT_ID")): """Update the :py:attr:`~swamp.search.searchtarget.SearchTarget.results` dataframe to include only the results \ with the optimal CMO alignment between helical pairs. This method considers inverted fragments and takes \ highest CMO score to determine the optimal alignment :param list select_by: indicate the figure of merit by which the alignments will be grouped """ if self.results is None: return else: self.results.CENTROID_ID = self.results.CENTROID_ID.apply(lambda x: SwampLibrary._get_unique_frag_id(x)) self.results["max_score"] = self.results.groupby(list(select_by), sort=False)["CON_SCO"].transform(max) self.results = self.results[self.results.CON_SCO == self.results.max_score] self.results.drop("max_score", 1, inplace=True) self.results.drop_duplicates(subset=list(select_by), inplace=True) # ------------------ Some general methods ------------------
[docs] def recover_results(self): """Recover the results from all the :py:attr:`~swamp.search.searchjob.SearchJob.pickle_fname` indicated in \ :py:attr:`~swamp.search.searchtarget.SearchTargert.search_pickle_dict` :returns: a list with the results loaded from the pickle files at \ :py:attr:`~swamp.search.searchtarget.SearchTargert.search_pickle_dict` """ results = [] for pickle_fname, subtarget in zip(self.search_pickle_dict.keys(), self.search_pickle_dict.values()): if os.path.isfile(pickle_fname): self.logger.debug('Retrieving results from %s' % pickle_fname) current_job_results = joblib.load(pickle_fname) for result in current_job_results: results.append([self.target.ranked_subtargets.index(subtarget) + 1, subtarget.id, subtarget.ncontacts] + result) else: self.logger.warning('Cannot find pickle file! %s' % pickle_fname) return results
[docs] def search(self): """Search the library by calculating the CMO between the observed contacts and the query predicted contacts This method will run a :py:obj:`~swamp.search.searchjob.SearchJob` instance for each subtarget \ at :py:attr:`swamp.utils.targetsplit.TargetSplit.ranked_subtargets` that passes the selected \ :py:attr:`~swamp.search.searchtarget.SearchTarget.n_contacts_threshold` threshold """ self.logger.info(self.search_header) self.logger.info("Splitting the target into sets of contacting helical pairs") self.target.split() if self.target.error: self.logger.warning('Previous errors prevent scanning the library with the target contacts!') return self.logger.info('Creating a list of jobs to search the library using contacts.') self._create_scripts() self.logger.info('Sending jobs now.') with TaskFactory(self.platform, tuple(self.scripts), **self._other_task_info) as task: task.run() self.logger.info('Waiting for workers...') self.logger.info('All search tasks have been completed! Retrieving results') results = self.recover_results() self._make_dataframe(results)
[docs] def rank(self, consco_threshold=0.75, combine_searchmodels=False): """ Get the top search models as indicated by the results of the search. This method can also try to combine \ multiple search models by adding their CMOs. Takes in consideration same combination of fragments may appear \ more than once with fragments in matching subtargets. :param float consco_threshold: CMO threshold to consider an alignment valid (default 0.75) :param bool combine_searchmodels: if True combine search models matching different subtargets (default False) """ if self.results is None: self.logger.error('Need to rank the fragments first!') return # Get the valid searchmodels (subtarget with enough contacts and fragment with enough consco) valid_searchmodels = self.results.loc[self.results.CON_SCO >= consco_threshold,] if len(set(list(valid_searchmodels.SUBTRGT_ID))) == 0: self.logger.warning('None of the search models meet CMO criteria!') return elif not combine_searchmodels: self.logger.info('%s search models meet CMO criteria' % len(set(valid_searchmodels.CENTROID_ID.tolist()))) self.ranked_searchmodels = pd.DataFrame() self.ranked_searchmodels['searchmodels'] = valid_searchmodels.CENTROID_ID self.ranked_searchmodels['consco'] = valid_searchmodels.CON_SCO self.ranked_searchmodels.sort_values(by='consco', inplace=True, ascending=False) return elif len(set(list(valid_searchmodels.SUBTRGT_ID))) == 1: self.logger.warning('Only subtarget %s has valid searchmodels!' % list(valid_searchmodels.SUBTRGT_ID)[0]) self.ranked_searchmodels = pd.DataFrame() self.ranked_searchmodels['searchmodels'] = valid_searchmodels.CENTROID_ID self.ranked_searchmodels['consco'] = valid_searchmodels.CON_SCO self.ranked_searchmodels.sort_values(by='consco', inplace=True, ascending=False) return self.logger.info('Retrieving ranked combinations of search models...') # Get a dictionary with the valid fragments for each subtarget all_valid_searchmodels = {} for subtarget in set(valid_searchmodels.SUBTRGT_ID): all_valid_searchmodels[subtarget] = list( valid_searchmodels[valid_searchmodels.SUBTRGT_ID == subtarget].CENTROID_ID) # All valid combinations between valid searchmodels all_valid_combinations = list(itertools.product(*list(all_valid_searchmodels.values()))) # Make a dictionary with the searchmodel combination and the combined consco searchmodel_combinations = {} for combination in all_valid_combinations: avg_consco = [] for centroid, subtarget in zip(combination, list(all_valid_searchmodels.keys())): current_consco = valid_searchmodels.loc[ (valid_searchmodels.CENTROID_ID == centroid) & (valid_searchmodels.SUBTRGT_ID == subtarget)].CON_SCO avg_consco.append(float(current_consco)) avg_consco = sum(avg_consco) / len(all_valid_searchmodels.keys()) # Only append the combination with this score if it is higher than the one already stored key = ' '.join(sorted(combination)) if key not in searchmodel_combinations.keys(): searchmodel_combinations[key] = avg_consco elif avg_consco > searchmodel_combinations[key]: searchmodel_combinations[key] = avg_consco ranked_combination = sorted(searchmodel_combinations.items(), key=lambda kv: (kv[1], kv[0])) self.ranked_searchmodels = pd.DataFrame(ranked_combination) self.ranked_searchmodels.columns = ['searchmodels', 'consco'] self.ranked_searchmodels.sort_values(by='consco', inplace=True, ascending=False)