Source code for swamp.parsers.gesamtparser
import numpy as np
import pandas as pd
from enum import Enum
from swamp.parsers.parser import Parser
[docs]class GesamtErrorCodes(Enum):
"""An enumerator to handle the possible gesamt error codes"""
DISSIMILAR = 1
ERROR_2 = 2
NO_STDOUT = 3
READ_ERRORS = 4
[docs]class GesamtParser(Parser):
"""Gesamt output parser
:param str mode: corresponds with :py:attr:`~swamp.wrappers.gesamt.Gesamt.mode` used to create the output to be \
parsed
:param str stdout: the stdout to be parsed (default None)
:param str fname: the file name to be parsed (default None)
:param `~swamp.logger.swamplogger.SwampLogger` logger: logging interface for the parser (default None)
:ivar bool error: if True an error has occurred along the process
:ivar float qscore: qscore as reported by gesamt
:ivar float rmsd: the obtained rmsd as reported by gesamt
:ivar float seq_id: sequence identity between the input structures
:ivar int n_align: number of aligned residues
:example:
>>> from swamp.parsers import GesamtParser
>>> my_parser = GesamtParser('<mode>', '<stdout>', '<fname>')
>>> my_parser.parse()
"""
def __init__(self, mode, stdout=None, fname=None, logger=None):
self.mode = mode
self.qscore = None
self.rmsd = None
self.seq_id = None
self.n_align = None
self.hits_df = None
super(GesamtParser, self).__init__(stdout=stdout, fname=fname, logger=logger)
@property
def summary(self):
"""Dataframe with hits found in the archive if :py:attr:`~swmap.parsers.gesamtparser.GesamtParser.mode` is
'search-archive' otherwise a tuple with all the parsed figures of merit"""
if self.mode == 'search-archive':
return self.hits_df
else:
return self.qscore, self.rmsd, self.seq_id, self.n_align
[docs] def parse(self):
"""Method to parse :py:attr:`~swamp.parsers.parser.Parser.fname` and store figures of merit"""
if self.mode == 'search-archive':
self.parse_hitfile()
else:
self.parse_stdout()
[docs] def parse_stdout(self):
"""Method to retrieve qscore, rmsd, sequence identity and no. of aligned residues from \
:py:attr:`~swamp.parsers.parser.gesamtparser.GesamtParser.stdout`
:param str stdout: gesamt stdout to be parsed
:param int n_models: number of models that were used in the structural alignment to generate the provided stdout
:returns: qscore, rmsd, sequence identity and no. of aligned residues (tuple)
"""
if self.stdout == b'':
self.error = GesamtErrorCodes.NO_STDOUT
self.logger.error("Something went wrong, no gesamt stdout to parse! Exiting now...")
return
elif 'DISSIMILAR' in self.stdout:
self.error = GesamtErrorCodes.DISSIMILAR
return
elif 'ALIGNMENT ERROR 2' in self.stdout:
self.error = GesamtErrorCodes.ERROR_2
return
elif 'STOP DUE TO READ ERRORS' in self.stdout:
self.error = GesamtErrorCodes.READ_ERRORS
return
n_models = 0
for line in self.stdout.split('\n'):
if '... reading ' in line:
n_models += 1
if n_models == 2:
qscore_mark = "Q-score"
rmsd_mark = "RMSD"
n_align_mark = "Aligned residues"
seqid_mark = "Sequence Id"
else:
qscore_mark = "quality Q"
rmsd_mark = "r.m.s.d"
n_align_mark = "Nalign"
seqid_mark = "SEQ_ID IS NOT FOUND IN MULTIPLE STRCUT. ALIGNMENT"
self.qscore = np.nan
self.rmsd = np.nan
self.n_align = np.nan
self.seq_id = np.nan
for line in self.stdout.split("\n"):
if len(line.split()) != 0 and line.split()[0] != "#":
if qscore_mark in line and self.qscore is np.nan:
self.qscore = float(line.rstrip().lstrip().split(":")[-1].split()[0].rstrip().lstrip())
elif rmsd_mark in line and self.rmsd is np.nan:
self.rmsd = float(line.rstrip().lstrip().split(":")[-1].split()[0].rstrip().lstrip())
elif n_align_mark in line and self.n_align is np.nan:
self.n_align = int(line.rstrip().lstrip().split(":")[-1].split()[0].rstrip().lstrip())
elif seqid_mark in line and self.seq_id is np.nan:
self.seq_id = float(line.rstrip().lstrip().split(":")[-1].split()[0].rstrip().lstrip())
[docs] def parse_hitfile(self):
"""Method to parse a gesamt .hit output file
:param str fname: file name of the .hit output file
:returns: a dataframe with the results contained in the hit file (`pandas.Dataframe`)
"""
self.hits_df = []
with open(self.fname, "r") as fhandle:
for line in fhandle:
if line[0] != "#":
line = line[20:].split()
self.hits_df.append([line[-6], line[-5], line[-4], line[-3], line[-2], line[-1]])
self.hits_df = pd.DataFrame(self.hits_df)
self.hits_df.columns = ["qscore", "rmsd", "seq_id", "n_align", "n_res", "fname"]
[docs] @staticmethod
def get_pairwise_qscores(stdout):
"""Method to get the pairwise qscores of a given alignmnet between several models in an ensemble
:param str stdout: gesamt stdout for the command
:returns: qscores_dict: a dictionary with the pairwise qscores for each of the models in the alignment (dict)
"""
qscores_dict = {}
structure_id_dict = {}
qscores_mark = "(o) pairwise Q-scores"
file_mark = "... reading file"
rmsd_mark = "(o) pairwise r.m.s.d."
is_qscores = False
for line in stdout.split("\n"):
# Store file names and structure ids
if file_mark in line:
fname = line.split("'")[1]
structure_id = "S%s" % str(len(qscores_dict.keys()) + 1).zfill(3)
qscores_dict[fname] = None
structure_id_dict[structure_id] = fname
# Qscores will start appearing now
elif qscores_mark in line:
is_qscores = True
# Store the qscore in the dictionary
elif is_qscores and line.split("|")[0].rstrip().lstrip() in structure_id_dict.keys():
structure_id = line.split("|")[0].rstrip().lstrip()
idx = int(structure_id[1:])
qscores_dict[structure_id_dict[structure_id]] = float(line.split()[idx].rstrip().lstrip())
# If we reach the rmsd mark, break the loop
elif rmsd_mark in line:
break
return qscores_dict