import os
import re
import random
import io
import anyconfig
import numpy
import collections
import pathlib
import Bio
import cobra
import pandas
import tqdm
bar_format = '{desc:<75}: {percentage:.1f}%|{bar:10}| {n_fmt:>5}/{total_fmt:>5} [{elapsed}<{remaining}]'
import coralme
import warnings
try:
warnings.simplefilter(action = 'ignore', category = Bio.BiopythonWarning)
except:
warnings.warn("This biopython version does not allow for correct warning handling. Biopython >=1.80 is suggested.")
import logging
log = logging.getLogger(__name__)
element_types = {'CDS', 'rRNA','tRNA', 'ncRNA','misc_RNA','RNA','tmRNA'}
[docs]
class Organism(object):
"""Organism class for storing information about an organism
This class acts as a database containing all necessary information
to reconstruct a ME-model. It is used to retrieve and store
information of the main (org) and the reference (ref) organisms.
Information in Organism is read and manipulated by methods in
the MEBuilder class.
Parameters
----------
config : dict
Dictionary containing configuration and settings.
is_reference : bool
If True, process as reference organism.
"""
def __init__(self, config, is_reference, available_reference_models = None):
self.available_reference_models = available_reference_models
if is_reference:
# check values
if not bool(config.get('dev_reference', False)) and not bool(config.get('user_reference', False)):
config['dev_reference'] = 'iJL1678b'
logging.warning('Using \'iJL1678b\' model as reference. See `builder.available_reference_models` for more information.')
elif 'dev_reference' in config and 'user_reference' in config:
raise ValueError('The keys \'dev_reference\' and \'user_reference\' are mutually exclusive. Please remove one from configuration.')
elif config['dev_reference'] is True:
config['dev_reference'] = 'iJL1678b'
logging.warning('Using \'iJL1678b\' model as reference. See `builder.available_reference_models` for more information.')
elif config['dev_reference']:
if not config['dev_reference'] in self.available_reference_models:
raise ValueError('The key \'dev_reference\' must be \'iJL1678b\' (E. coli, gram negative) or \'iJT964\' (B. subtilis, gram positive)')
elif config['user_reference']:
if not pathlib.Path(config['user_reference']).is_dir():
raise ValueError('The key \'user_reference\' must be a directory.')
else:
return NotImplementedError
# If Organism is reference, set some default values for future processing
if config['dev_reference']:
self.id = config['dev_reference']
config["locus_tag"] = self.available_reference_models[self.id]
elif config['user_reference']:
# If User set a reference, use manually set values
# Set ID
self.id = config['user_reference']
config = config.copy()
# TODO: check if necessary
# Update the configuration using the configuration from the User's defined reference
for input_file in [config['user_reference'] + "/organism.json", \
config['user_reference'] + "/input.json"]:
with open(input_file, 'r') as infile:
config.update(anyconfig.load(infile))
else:
return NotImplementedError
else:
# If it is not the reference, use the organism files
self.id = config['ME-Model-ID']
# Set initial properties
self.is_reference = is_reference
self.curation_notes = collections.defaultdict(list)
self.config = config
#if self.is_reference:
## If it is the reference, use the gene ID conventions of the reference
#self.locus_tag = config.get('reference_tag','locus_tag')
#else:
# If it is not the reference, use the gene ID conventions of the organism
self.locus_tag = config.get('locus_tag','locus_tag')
# Create the protein localization interpreter
# TODO: Move this to a more standard location, builder/?
data = \
'code,interpretation,gram\n' \
'CCI-CW-BAC-POS-GP,Cell_Wall,pos\n' \
'CCI-OUTER-MEM-GN,Outer_Membrane,neg\n' \
'CCI-PM-BAC-NEG-GN,Inner_Membrane,neg\n' \
'CCI-PERI-BAC-GN,Periplasm,neg\n' \
'CCI-PM-BAC-POS-GP,Plasma_Membrane,pos\n' \
'CCI-EXTRACELLULAR-GP,Extracellular_Space,pos\n' \
'CCO-MEMBRANE,Membrane,'
self.location_interpreter = pandas.read_csv(io.StringIO(data), index_col=0)
# Initialize the curation fields
self.manual_curation = coralme.builder.curation.CurationList()
@property
def directory(self):
"""Working file directory with reconstruction files"""
if self.is_reference and self.id in self.available_reference_models:
try:
from importlib.resources import files
except ImportError:
from importlib_resources import files
return str(files("coralme") / self.id) + "-ME/building_data/"
else:
return self.config.get('out_directory', self.id) + "/building_data/"
#return self.id + "/building_data/"
@property
def blast_directory(self):
"""Directory containing the blast files and results"""
if not self.is_reference:
return self.config.get('out_directory', self.id) + "/blast_files_and_results/"
# If reference organism, no blast information
return None
@property
def _complexes_df(self):
"""
Returns the dataframe containing the protein complex information from the file.
If the file does not exist, it initializes it.
"""
filename = self.directory + "protein_complexes.txt"
if os.path.isfile(filename):
# If protein_complexes.txt exists in the files directory,
# use it as the final complexes dataframe.
return pandas.read_csv(
filename, index_col=0, sep="\t",comment='#'
).fillna('')
# If it does not exist, generate it from raw files
return self.generate_complexes_df()
@property
def _protein_mod(self):
"""
Returns the dataframe containing the protein complex modifications from the file.
If the file does not exist, it initializes it.
"""
filename = self.directory + "protein_modification.txt"
if os.path.isfile(filename):
# If protein_modification.txt exists in the files directory,
# use it as the final protein modification dataframe.
return pandas.read_csv(
filename, index_col=0, sep="\t",comment='#'
)
# If it does not exist, generate it from raw files
return pandas.DataFrame.from_dict(
{
"Modified_enzyme": {},
"Core_enzyme": {},
"Modifications": {},
"Source": {},
}
).set_index("Modified_enzyme")
@property
def _TU_df(self):
"""
Returns the dataframe containing the TU information from the file.
If the file does not exist, it initializes it.
"""
if self.is_reference:
# Reference file should contain TUs_from_biocyc.txt
filename = self.directory + "TUs_from_biocyc.txt"
else:
# Use TUs dataframe file path as defined
filename = self.config.get('df_TranscriptionalUnits', self.directory + "TUs_from_biocyc.txt")
if os.path.isfile(filename):
# If TUs dataframe exists, read it
tmp = pandas.read_csv(filename, index_col = 0, sep = "\t")
tmp = tmp.dropna(subset=['start', 'stop', 'genes'], how = 'any')
return tmp
else:
# If it does not exist, generate it from raw files
return self.get_TU_df()
@property
def _m_model(self):
"""
Returns the M-model.
"""
if self.is_reference:
if self.id in self.available_reference_models:
# If reference organism is iJL1678b, read it from m_model.json
model = self.directory + 'm_model.json'
else:
raise ValueError('Reference code is not valid. See builder.available_reference_models for available references.')
else:
# Read M-model from the configuration file path
model = self.config['m-model-path']
if model.endswith('.json'):
# Read from JSON model
return cobra.io.load_json_model(model)
elif model.endswith('.xml'):
# Read from SBML model
return cobra.io.read_sbml_model(model)
else:
raise ValueError('M-model input file must be json or xml format.')
@property
def rna_components(self):
"""
Returns the RNAs from the identified products.
"""
product_types = self.product_types
# Return all products whose type is RNA
return set(g for g,t in product_types.items() if 'RNA' in t)
def get_organism(self):
""" Processes input files, and creates an instance of
Organism.
"""
sep = '~ '*1
print("{}Processing files for {}...".format(sep,self.id))
if not self.is_reference:
logging.warning('Checking folder')
self.check_folder()
logging.warning("Loading M-model")
self.m_model = self._m_model
logging.warning("Checking M-model")
self.check_m_model()
logging.warning("Loading genbank file")
self.get_genbank_contigs()
logging.warning("Loading optional files")
self.load_optional_files()
logging.warning("Checking gene overlap")
self.check_gene_overlap()
logging.warning("Generating complexes dataframe")
self.complexes_df = self._complexes_df
logging.warning("Syncing files")
self.sync_files()
logging.warning("Looking for duplicates in provided files")
self.check_for_duplicates()
logging.warning('Pruning genbank from unwanted feature types')
self.prune_genbank()
logging.warning('Completing genbank with provided files')
self.update_genbank_from_files()
logging.warning("Updating genes and complexes from genbank")
self.update_complexes_genes_with_genbank()
logging.warning("Generating protein modifications dataframe")
self.protein_mod = self._protein_mod
logging.warning("Purging genes in optional files")
self.purge_genes_in_file()
logging.warning("Loading manual curation")
self.load_manual_curation()
logging.warning("Integrating manual metabolites")
self.modify_metabolites()
logging.warning("Integrating manual metabolic reactions")
self.modify_metabolic_reactions()
logging.warning("Integrating manual complexes")
self.add_manual_complexes()
logging.warning("Getting sigma factors from BioCyc")
self.get_sigma_factors()
self.get_rpod()
logging.warning("Getting RNA polymerase from BioCyc")
self.get_rna_polymerase()
logging.warning("Updating generics with genbank")
self.get_generics_from_genbank()
logging.warning("Generating transcription units dataframe")
self.TU_df = self._TU_df
self.get_TU_genes()
logging.warning("Updating ribosomal proteins with BioCyc")
self.update_ribosome_stoich()
logging.warning("Updating protein location with BioCyc")
self.get_protein_location()
logging.warning("Updating tRNA synthetases with BioCyc")
self.get_trna_synthetase()
logging.warning("Getting lipids")
self.lipids = self.get_lipids()
logging.warning("Getting phospholipids")
self.phospholipids = self.get_phospholipids()
logging.warning("Updating peptide release factors with BioCyc")
self.get_peptide_release_factors()
logging.warning("Complementing non-metabolic metabolites in M-model")
self.get_nonmetabolic()
logging.warning("Purging genes in M-model")
self.purge_genes_in_model()
logging.warning("Getting enzyme-reaction association")
self.get_enzyme_reaction_association()
print("Reading {} done.".format(self.id))
def get_genbank_contigs(self):
""" Reads GenBank file as a list of contigs.
"""
if self.is_reference:
if self.id in self.available_reference_models:
# If default reference iJL1678b, read from genome.gb
gb_it = Bio.SeqIO.parse(self.directory + "genome.gb", "gb")
else:
raise ValueError('Reference code is not valid. See builder.available_reference_models for available references.')
else:
# Read from configuration file path
gb_it = Bio.SeqIO.parse(self.config['genbank-path'], "gb")
self.contigs = [ i for i in gb_it ]
def check_folder(self):
""" Checks that the necessary directories are present.
"""
if not os.path.isdir(self.directory):
os.makedirs(self.directory)
logging.warning("{} directory was created.".format(self.directory))
if not os.path.isdir(self.blast_directory):
os.makedirs(self.blast_directory)
logging.warning("{} directory was created.".format(self.blast_directory))
def check_m_model(self):
""" Performs checks on the M-model
"""
m_model = self.m_model
# Metabolites
RNA_mets = []
formula_mets = []
formulaweight_mets = []
deadend_mets = []
for m in tqdm.tqdm(m_model.metabolites,
'Checking M-model metabolites...',
bar_format = bar_format):
if m.id.startswith("RNA"):
RNA_mets.append(m)
if not m.formula:
formula_mets.append(m.id)
try:
float(m.formula_weight)
except:
formulaweight_mets.append(m.id)
if len(m.reactions) == 0:
deadend_mets.append(m.id)
unused_genes = []
for g in tqdm.tqdm(m_model.genes,
'Checking M-model genes...',
bar_format = bar_format):
if not g.reactions:
unused_genes.append(g.id)
# Reactions
subsystem_RXNS = []
for r in tqdm.tqdm(m_model.reactions,
'Checking M-model reactions...',
bar_format = bar_format):
if not r.subsystem:
subsystem_RXNS.append(r.id)
# Warnings
if RNA_mets:
self.curation_notes['org.check_m_model'].append({
'msg':"Metabolites starting with the prefix RNA were removed.",
'triggered_by':[i.id for i in RNA_mets],
'importance':'high',
'to_do':'Check whether the model needs the removed metabolites, and change their name. RNA as a prefix is used for TranscribedGene objects.'})
m_model.remove_metabolites(RNA_mets)
if formula_mets:
self.curation_notes['org.check_m_model'].append({
'msg':"Some metabolites are missing their formula",
'triggered_by':formula_mets,
'importance':'critical',
'to_do':'Correct the formulas of the listed metabolites. Some metabolite formulas are critical for the completion of this pipeline. If homology is ON, this pipeline will try to fill in the formulas from the reference.'})
if formulaweight_mets:
self.curation_notes['org.check_m_model'].append({
'msg':"Some metabolites have a problematic formula. If these metabolites are used in protein modifications, or other subreactions, it will cause an error.",
'triggered_by':formulaweight_mets,
'importance':'critical',
'to_do':'Correct the formulas of the listed metabolites. Some metabolite formulas are critical for the completion of this pipeline. If homology is ON, this pipeline will try to fill in the formulas from the reference.'})
if subsystem_RXNS:
self.curation_notes['org.check_m_model'].append({
'msg':"Some reactions are missing their subsystem",
'triggered_by':subsystem_RXNS,
'importance':'high',
'to_do':'Make sure the subsystems of these reactions are correct'})
if deadend_mets:
self.curation_notes['org.check_m_model'].append({
'msg':"Some metabolites have no reactions associated",
'triggered_by':deadend_mets,
'importance':'critical',
'to_do':'Make sure these metabolites are removed or connected properly'})
if unused_genes:
self.curation_notes['org.check_m_model'].append({
'msg':"Some genes have no reactions associated",
'triggered_by':unused_genes,
'importance':'critical',
'to_do':'Make sure these genes are removed or associated properly'})
def load_optional_files(self):
""" Loads optional files.
"""
logging.warning("Loading gene dictionary")
self.gene_dictionary = self.read_gene_dictionary(
self.config.get('biocyc.genes', self.directory + "genes.txt")
)
self.gene_sequences = self.read_gene_sequences(
self.config.get('biocyc.seqs', self.directory + "sequences.fasta")
)
logging.warning("Getting proteins from BioCyc")
self.proteins_df = self.read_proteins_df(
self.config.get('biocyc.prots', self.directory + "proteins.txt")
)
logging.warning("Getting RNAs from BioCyc")
self.RNA_df = self.read_RNA_df(
self.config.get('biocyc.RNAs', self.directory + "RNAs.txt")
)
logging.warning("Getting transcription units from BioCyc")
self.TUs = self.read_TU_df(
self.config.get('biocyc.TUs', self.directory + "TUs.txt")
)
def load_manual_curation(self):
""" Loads manual curation to Organism instance
"""
coralme.builder.curation.MEManualCuration(self).load_manual_curation()
def purge_genes_in_file(self):
""" Checks genes in files and purges problematic ones.
"""
if self.is_reference:
return
all_genenames_in_gb = []
for i in self.all_genes_in_gb:
genenames = self.gene_dictionary[self.gene_dictionary['Accession-1'].eq(i)].index
all_genenames_in_gb += list(genenames)
warn_products = set(self.gene_dictionary[self.gene_dictionary["Product"] == ''].index)
warn_replicons = set(self.gene_dictionary[self.gene_dictionary["replicon"] == ''].index)
warn_rightpos = set(self.gene_dictionary[self.gene_dictionary["Left-End-Position"] == ''].index)
warn_leftpos = set(self.gene_dictionary[self.gene_dictionary["Right-End-Position"] == ''].index)
warn_sequences = set(self.gene_dictionary.index) - set(self.gene_sequences.keys()) - set(all_genenames_in_gb)
warn_genenames = set(self.gene_dictionary[self.gene_dictionary.index == ''].index)
self.gene_dictionary.drop(list(
warn_products|warn_genenames|warn_replicons|warn_sequences|warn_rightpos|warn_leftpos
),inplace=True)
def _get_product_type(self,
gene_name,
gene_dictionary = None,
complexes_df = None,
RNA_df = None,
warn_genes = []):
"""Gets the product type of a gene (RNA, CDS, ...) from the files"""
if gene_dictionary is None:
gene_dictionary = self.gene_dictionary
if complexes_df is None:
complexes_df = self.complexes_df
if RNA_df is None:
RNA_df = self.RNA_df
row = gene_dictionary.loc[gene_name]
gene_id = row['Accession-1']
product = row['Product'].split(' // ')[0]
### Try to get product type from gene id of type LOCUST_TAG-RNA
product_type = ''
if '-' in product and ' ' not in product:
product_type = re.findall('[a-zA-Z]+',product.split('-')[-1])
if product_type:product_type = product_type[0]
### Set product type to RNA if it is in ID
if ' ' in product or ('RNA' not in product and 'MONOMER' not in product) or not product_type:
if 'RNA' in gene_id \
or RNA_df['Gene'].str.match(gene_name).any() \
or product in RNA_df.index:
product_type = 'RNA'
elif 'MONOMER' in gene_id \
or complexes_df['genes'].str.contains(r'{}\(\d*\)'.format(gene_id),regex=True).any() \
or product in complexes_df.index:
product_type = 'MONOMER'
else:
warn_genes.append(gene_id)
return gene_id,None,None
return gene_id,product,product_type
def _correct_product(self,
gene_name,
product_type,
gene_dictionary = None):
"""
Product identifier contains wrong characters or is missing critical information
and must be corrected for parsing
"""
if gene_dictionary is None:
gene_dictionary = self.gene_dictionary
## Correct product. Likely product is a description and not an actual
## product ID like GENE-MONOMER or GENE-tRNA
product = '{}-{}'.format(gene_name,product_type)
gene_dictionary.at[gene_name,'Product'] = product
return product
def _add_entry_to_df(self,
df,
tmp):
"""Append one entry to the end of a dataframe"""
indexname = df.index.name
df = pandas.concat([df,
pandas.DataFrame.from_dict(tmp).T],
axis = 0, join = 'outer')
df.index.name = indexname
return df
def _add_entry_to_rna(self,
gene_id,
name,
product,
RNA_df,
source):
"""Append one entry to the RNA dataframe"""
logging.warning('Adding {} ({}) to RNAs from {}'.format(gene_id,product,source))
tmp = {product : {"Common-Name": name,
"Gene": gene_id}}
return self._add_entry_to_df(RNA_df,tmp)
def _add_entry_to_complexes(self,
gene_id,
name,
product,
complexes_df,
source):
"""Append one entry to the complexes dataframe"""
if product in complexes_df.index:
logging.warning('Could not add {} ({}) to complexes from {}. Already in complexes_df'.format(gene_id,product,source))
return complexes_df
if isinstance(gene_id,str):
gene_id = '{}()'.format(gene_id)
elif isinstance(gene_id,list):
gene_id = ' AND '.join(['{}()'.format(g) for g in gene_id])
elif isinstance(gene_id,dict):
gene_id = ' AND '.join(['{}({})'.format(k,v) for k,v in gene_id.items()])
else:
raise TypeError("Unsupported entry to add to complexes of type " + type(gene_id))
logging.warning('Adding {} ({}) to complexes from {}'.format(product,gene_id,source))
tmp = {product: {
"name": name,
"genes": gene_id,
"source": source,
}}
return self._add_entry_to_df(complexes_df,tmp)
def _add_entry_to_protein_mod(self,
protein_mod,
mod_complex,
core_enzyme,
mods,
source):
"""Append one entry to the protein modification dataframe"""
logging.warning('Adding {} to protein_mod from {}'.format(mod_complex, source))
tmp = {mod_complex: {
"Core_enzyme": core_enzyme,
"Modifications": mods,
"Source": source,
}}
return self._add_entry_to_df(protein_mod,tmp)
def sync_files(self):
"""
Syncs provided files. Cross checks information between
optional files and the genome file
"""
if self.is_reference:
return
gene_dictionary = self.gene_dictionary
RNA_df = self.RNA_df
complexes_df = self.complexes_df
product_types = {}
warn_genes = []
for gene_name,row in tqdm.tqdm(gene_dictionary.iterrows(),
'Syncing optional genes file...',
bar_format = bar_format,
total=gene_dictionary.shape[0]):
gene_id = row['Accession-1']
if not gene_name or isinstance(gene_name,float):
warn_genes.append(gene_id)
continue
gene_id,product,product_type = \
self._get_product_type(
gene_name,
gene_dictionary=gene_dictionary,
complexes_df=complexes_df,
RNA_df=RNA_df,
warn_genes=warn_genes)
if product is None:
warn_genes.append(gene_id)
continue
if ' ' in product or ('RNA' not in product and 'MONOMER' not in product):
product = \
self._correct_product(
gene_name,
product_type,
gene_dictionary)
product_types[gene_id] = product_type
## Sync files
if 'RNA' in product_type and product not in RNA_df.index:
RNA_df = \
self._add_entry_to_rna(gene_id,
product,
product,
RNA_df,
"Sync")
elif product_type == 'MONOMER' and product not in complexes_df.index:
complexes_df = \
self._add_entry_to_complexes(gene_id,
product,
product,
complexes_df,
"Sync")
self.gene_dictionary = gene_dictionary[pandas.notnull(gene_dictionary.index)]
self.RNA_df = RNA_df
self.complexes_df = complexes_df
self.product_types = product_types
# Warnings
if warn_genes:
self.curation_notes['org.sync_files'].append({
'msg':'The types of some genes (e.g. CDS, RNA...) could not be identified. Is Product or Gene Name missing?',
'triggered_by':warn_genes,
'importance':'medium',
'to_do':'Manually fill the products (with types) of these genes in genes.txt'
})
def _create_genbank_contig(self,
contig_id,
seq,
name,
description,
source):
"""Create a Biopython contig"""
from Bio.SeqRecord import SeqRecord
from Bio.SeqFeature import SeqFeature, ExactPosition, SimpleLocation
new_contig = SeqRecord(seq=seq,
id = contig_id,
name = name,
description = description,
annotations = {
'molecule_type' : 'DNA'
})
new_contig.features = [SeqFeature(SimpleLocation(ExactPosition(0),ExactPosition(len(seq))),
type='source',
id = contig_id,
qualifiers = {'note':'Added from {}'.format(source)})]
return new_contig
def _create_contig_feature(self,
gene_id,
seq,
strand,
feature_type,
product_name):
"""Create a Biopython feature to add to a contig"""
from Bio.SeqFeature import SeqFeature, ExactPosition, SimpleLocation
return SeqFeature(SimpleLocation(ExactPosition(0),ExactPosition(len(seq)),strand),
type=feature_type,
id = gene_id,
qualifiers = {
self.locus_tag:[gene_id],
'product':[product_name]
})
def _add_entry_to_genbank(self,
gene_id,
gene_name,
product_type,
product_name,
row,
contigs,
gene_sequences):
"""Append one entry to the GenBank"""
if self.duplicated_genes is not None and gene_id in self.duplicated_genes:
gene_id = '{};{}'.format(gene_id,gene_name)
logging.warning('Adding {} to genbank file as {}'.format(gene_id,product_type))
from Bio.SeqFeature import SeqFeature, CompoundLocation, ExactPosition, FeatureLocation, SimpleLocation
gene_seq = gene_sequences[gene_name]
gene_left = int(row['Left-End-Position'])
gene_right = int(row['Right-End-Position'])
new_contig = self._create_genbank_contig('{}'.format(gene_id),
gene_seq.seq,
gene_seq.name,
gene_seq.description,
'BioCyc')
feature = self._create_contig_feature(gene_id,
gene_seq.seq,
1 if gene_left < gene_right else -1,
product_type,
product_name)
feature.qualifiers['transl_table'] = [self.transl_table]
new_contig.features += [feature]
contigs.append(new_contig)
def _get_product_name_if_present(self,
gene_id,
product,
product_type,
query_types,
dfs,
warns,
columns):
"""Get the name of a gene product from the files"""
for qt,df,warn,col in zip(query_types,dfs,warns,columns):
if qt in product_type:
if product not in df.index:
warn.append(gene_id)
return None
product_name = df.loc[product][col]
return product_name if product_name else None
return product if product else None
def _read_product_type(self,
gene_id,
product_types):
"""Get the type of a gene product from the files"""
product_type = product_types[gene_id] \
if gene_id in product_types else 'gene'
if 'MONOMER' in product_type:
return 'CDS'
else:
return product_type
def update_genbank_from_files(self):
""" Complements GenBank file from optional files."""
if self.is_reference:
return
contigs = self.contigs
gene_sequences = self.gene_sequences
gene_dictionary = self.gene_dictionary
RNA_df = self.RNA_df
complexes_df = self.complexes_df
product_types = self.product_types
all_genes_in_gb = self.all_genes_in_gb
warn_rnas = []
warn_proteins = []
warn_position = []
warn_sequence = []
# Add new genes
for gene_name,row in tqdm.tqdm(gene_dictionary.iterrows(),
'Updating Genbank file with optional files...',
bar_format = bar_format,
total=gene_dictionary.shape[0]):
gene_id = row['Accession-1']
if gene_id not in all_genes_in_gb:
product = row['Product'].split(' // ')[0]
### Try to get product type from gene id of type LOCUST_TAG-RNA
product_type = self._read_product_type(gene_id,
product_types)
### Retrieve values to sync with genbank
product_name = \
self._get_product_name_if_present(gene_id,
product,
product_type,
['RNA','MONOMER'],
[RNA_df,complexes_df],
[warn_rnas,warn_proteins],
['Common-Name','name'])
if product_name is None:
continue
if not row['Left-End-Position'] or not row['Right-End-Position']:
warn_position.append(gene_id)
continue
if gene_name not in gene_sequences:
warn_sequence.append(gene_id)
continue
self._add_entry_to_genbank(
gene_id,
gene_name,
product_type,
product_name,
row,
contigs,
gene_sequences)
# Ensure translation is in qualifiers
warn_translation = []
for record in self.contigs:
for feature in record.features:
if self.locus_tag not in feature.qualifiers:
continue
if feature.type != "CDS":
continue
if "translation" in feature.qualifiers:
continue
warn_translation.append(feature.qualifiers[self.locus_tag][0])
seq = feature.extract(record).seq
feature.qualifiers["translation"] = [seq.translate(self.transl_table)]
with open(self.directory + 'genome_modified.gb', 'w') as outfile:
for contig in self.contigs:
Bio.SeqIO.write(contig, outfile, 'genbank')
# Warnings
if warn_rnas:
self.curation_notes['org.update_genbank_from_files'].append({
'msg':'Some genes were identified as RNA from their locus_tags, but they are not present in RNAs.txt',
'triggered_by':warn_rnas,
'importance':'medium',
'to_do':'Check whether you should add these genes to RNAs.txt or fix its product value in genes.txt'
})
if warn_proteins:
self.curation_notes['org.update_genbank_from_files'].append({
'msg':'Some genes were identified as CDS from their locus_tags, but they are not present in proteins.txt',
'triggered_by':warn_proteins,
'importance':'medium',
'to_do':'Check whether you should add these genes to proteins.txt or fix its product value in genes.txt'
})
if warn_position:
self.curation_notes['org.update_genbank_from_files'].append({
'msg':'Could not add some genes in genes.txt to genbank.gb since they lack position information',
'triggered_by':warn_position,
'importance':'medium',
'to_do':'Fill in position information in genes.txt'
})
if warn_sequence:
self.curation_notes['org.update_genbank_from_files'].append({
'msg':'Could not add some genes in genes.txt to genbank.gb since they lack sequence information. Are your BioCyc files from the same database version?',
'triggered_by':warn_sequence,
'importance':'medium',
'to_do':'Add gene sequence in sequences.fasta. Check whether you downloaded the database files from the same BioCyc version.'
})
if warn_translation:
self.curation_notes['org.update_genbank_from_files'].append({
'msg':'Some feature in genbank are CDS but have no translation qualifier. Translated sequences from Biopython were filled in instead',
'triggered_by':warn_translation,
'importance':'high',
'to_do':'Check whether the genbank was downloaded or constructed correctly.'
})
def _create_complexes_entry(self,
row,
genes,
stoich):
"""Create an entry to append to the complexes dataframe"""
return {"name" : row["Common-Name"],
"genes" : " AND ".join(
[
self.gene_dictionary["Accession-1"][g] + "({})".format(stoich)
for g in genes
]
),
"source" : "BioCyc"}
def generate_complexes_df(self):
"""
Creates a DataFrame containing complex composition
information from the provided files.
"""
proteins_df = self.proteins_df
if proteins_df.empty:
return pandas.DataFrame(
columns = [
'complex',
'name',
'genes',
'source'
]
).set_index('complex')
gene_dictionary = self.gene_dictionary
complexes = {}
warn_proteins = []
for p, row in tqdm.tqdm(proteins_df.iterrows(),
'Generating complexes dataframe from optional proteins file...',
bar_format = bar_format,
total=proteins_df.shape[0]):
stoich = "" if "dimer" not in str(row["Common-Name"]) else "2"
genes = row["Genes of polypeptide, complex, or RNA"]
if not genes:
warn_proteins.append(p)
continue
genes = [
g for g in genes.split(" // ") if g in gene_dictionary["Accession-1"]
]
complexes[p] = self._create_complexes_entry(row,
genes,
stoich)
complexes_df = pandas.DataFrame.from_dict(complexes).T[["name", "genes", "source"]]
complexes_df.index.name = "complex"
# Warnings
if warn_proteins:
self.curation_notes['org.generate_complexes_df'].append({
'msg':'Some proteins have no genes in proteins.txt',
'triggered_by':warn_proteins,
'importance':'medium',
'to_do':'Fill genes in proteins.txt'})
return complexes_df.fillna({"name": ""})
def read_optional_file(self,filetype,filename,columns):
""" Method for reading an optional file."""
if os.path.isfile(filename):
file = pandas.read_csv(filename, sep="\t",index_col=0)
else:
self.curation_notes['org.read_optional_file'].append({
'msg':'No {} file was found. Initializing an empty one.'.format(filetype),
'importance':'high',
'to_do':'Download {} from BioCyc if available'.format(filetype)})
file = pandas.DataFrame(columns=columns).set_index(columns[0],inplace=False)
return file#.fillna('')
def read_gene_dictionary(self,filename):
""" Loads the genes file."""
gene_dictionary = self.read_optional_file(
'genes',
filename,
columns=[
'Gene Name',
'Accession-1',
'Left-End-Position',
'Right-End-Position',
'Product'
]).reset_index().set_index("Gene Name")
# Save warnings
warn_start = _get_na_entries(gene_dictionary,"Left-End-Position")
warn_end = _get_na_entries(gene_dictionary,"Right-End-Position")
warn_genes = _get_na_entries(gene_dictionary,"Accession-1")
warn_product = _get_na_entries(gene_dictionary,"Product")
# Warn
if not self.is_reference:
for g in warn_genes:
gene_dictionary.at[g, "Accession-1"] = g
if warn_start:
self.curation_notes['org.read_gene_dictionary'].append({
'msg':'Some genes are missing start positions in genes.txt',
'triggered_by':warn_start,
'importance':'medium',
'to_do':'Complete start positions in genes.txt if those genes are important.'})
if warn_end:
self.curation_notes['org.read_gene_dictionary'].append({
'msg':'Some genes are missing end positions in genes.txt',
'triggered_by':warn_end,
'importance':'medium',
'to_do':'Complete end positions in genes.txt if those genes are important.'})
if warn_genes:
self.curation_notes['org.read_gene_dictionary'].append({
'msg':'Some genes are missing Accession-1 IDs in genes.txt',
'triggered_by':warn_genes,
'importance':'medium',
'to_do':'Complete Accession-1 IDs in genes.txt if those genes are important.'})
if warn_product:
self.curation_notes['org.read_gene_dictionary'].append({
'msg':'Some genes are missing product definition in genes.txt',
'triggered_by':warn_product,
'importance':'medium',
'to_do':'Complete products in genes.txt if those genes are important.'})
# Pruning
gene_dictionary = gene_dictionary.reset_index().dropna(subset=["Gene Name","Product"],how="any").set_index("Gene Name")
gene_dictionary = gene_dictionary.fillna("") # All other empty fields, fill with ""
gene_dictionary['replicon'] = ""
return gene_dictionary
def read_proteins_df(self,filename):
""" Loads the proteins file."""
return self.read_optional_file(
'proteins',
filename,
columns = [
'Proteins',
'Common-Name',
'Genes of polypeptide, complex, or RNA',
'Locations'
]
).fillna("")
def read_gene_sequences(self,filename):
""" Loads the gene sequences file."""
if os.path.isfile(filename):
d = {}
for i in Bio.SeqIO.parse(filename,'fasta'):
for g in i.id.split('|'):
d[g] = i
return d
return {}
def read_RNA_df(self,filename):
""" Loads the RNAs file."""
return self.read_optional_file(
'RNAs',
filename,
columns = [
'(All-tRNAs RNAs Misc-RNAs rRNAs)',
'Common-Name',
'Gene'
]
).fillna("")
def read_TU_df(self,filename):
""" Loads the TUs file."""
return self.read_optional_file(
'TUs',
filename,
columns = [
'Transcription-Units',
'Genes of transcription unit',
'Direction'
]
).fillna("")
def check_gene_overlap(self):
""" Assesses gene identifier overlap between files."""
if self.is_reference:
return
def get_severity(o):
if o < 50 :
return 'critical'
elif o < 60 :
return 'high'
elif o < 70 :
return 'medium'
elif o < 80 :
return 'low'
else:
return 0
m_model_genes = set([g.id for g in self.m_model.genes])
file_genes = set(self.gene_dictionary['Accession-1'].values)
all_genes_in_gb = []
transl_table = []
warn_table = []
for record in self.contigs:
for feature in record.features:
if self.locus_tag not in feature.qualifiers:
continue
all_genes_in_gb.append(feature.qualifiers[self.locus_tag][0])
transl_table+=(feature.qualifiers.get('transl_table',[None]))
# self.all_genes_in_gb = all_genes_in_gb
transl_table = set(i for i in set(transl_table) if i is not None)
if len(transl_table) > 1:
warn_table = transl_table
elif not transl_table:
transl_table = ['11']
self.transl_table = list(transl_table)[0]
genbank_genes = set(all_genes_in_gb)
# Overlaps
file_overlap = int((len(file_genes & m_model_genes) / len(m_model_genes))*100)
gb_overlap = int((len(genbank_genes & m_model_genes) / len(m_model_genes))*100)
logging.warning('Gene overlap between M-model and Genbank : {}%'.format(gb_overlap))
logging.warning('Gene overlap between M-model and optional files : {}%'.format(file_overlap))
if gb_overlap < 1:
raise ValueError('Overlap of M-model genes with genbank is too low ({}%)'.format(gb_overlap))
fs = get_severity(file_overlap)
gs = get_severity(gb_overlap)
if fs:
self.curation_notes['org.check_gene_overlap'].append({
'msg':'M-model has a {} gene overlap with optional files (BioCyc)'.format(file_overlap),
'importance':fs,
'to_do':'Check whether optional files where downloaded correctly.'})
if gs:
self.curation_notes['org.check_gene_overlap'].append({
'msg':'M-model has a {} gene overlap with Genbank'.format(gb_overlap),
'importance':gs,
'to_do':'Check whether genbank was downloaded correctly.'})
if warn_table:
self.curation_notes['org.check_gene_overlap'].append({
'msg':'Provided GenBank file contains more than one translation table. Is this correct?',
'triggered_by':warn_table,
'importance':'high',
'to_do':'Check if translation tables are correct.'})
def update_ribosome_stoich(self):
""" Updated ribosome composition from files."""
if self.is_reference:
return
complexes_df = self.complexes_df
protein_mod = self.protein_mod.reset_index().set_index('Core_enzyme')
ribo_df = complexes_df.loc[
complexes_df["name"].str.contains("ribosomal.*(?:subunit)?.* protein", regex=True)
]
self.ribosomal_proteins = ribo_df
ribosome_stoich = self.ribosome_stoich
ribo_30S = ribosome_stoich["30_S_assembly"]["stoich"]
if [i for i in ribo_30S if "generic" not in i]:
update_30S = False
else:
update_30S = True
ribo_50S = ribosome_stoich["50_S_assembly"]["stoich"]
if [i for i in ribo_50S if "generic" not in i]:
update_50S = False
else:
update_50S = True
if not(update_30S or update_50S):
# Only update if it has not been user-defined
return
trigger_factor = list(complexes_df[complexes_df['name'].str.contains('[T,t]rigger factor',regex=True)].index)
if trigger_factor:
ribo_50S[trigger_factor[0]] = 1
warn_proteins = []
for p, row in tqdm.tqdm(ribo_df.iterrows(),
'Gathering ribosome stoichiometry...',
bar_format = bar_format,
total=ribo_df.shape[0]):
p_mod_list = []
if p in protein_mod.index:
p_mod_list = protein_mod.loc[[p]]['Modified_enzyme'].values
if re.search("30S|small.*subunit",row["name"],re.IGNORECASE) and update_30S:
if set(p_mod_list) & set(ribo_30S.keys()):
continue
ribo_30S[p] = 1
elif re.search("50S|large.*subunit",row["name"],re.IGNORECASE) and update_50S:
if set(p_mod_list) & set(ribo_50S.keys()):
continue
ribo_50S[p] = 1
else:
if set(p_mod_list) & set(ribo_50S.keys()):
continue
ribo_50S[p] = 1 # Add it to stoichiometry but warn it might not be a good mapping
warn_proteins.append(p)
if warn_proteins:
self.curation_notes['org.update_ribosome_stoich'].append({
'msg':'Some ribosomal proteins do not contain subunit information (30S, 50S). Check whether they are ribosomal proteins!',
'triggered_by':warn_proteins,
'importance':'high',
'to_do':'Curate them in ribosomal_proteins.txt'})
def _add_entry_to_gene_dictionary(self,
gene_dictionary,
gene_id,
feature,
left_end,
right_end):
"""Append one entry to the gene information dataframe"""
logging.warning("Adding {} to genes from genbank".format(gene_id))
feature_type = feature.type
if feature_type == 'CDS':
feature_type = 'MONOMER'
tmp = {gene_id: {
"Accession-1": gene_id,
"Left-End-Position": left_end,
"Right-End-Position": right_end,
"Product": "{}-{}".format(gene_id,feature_type)
}}
return self._add_entry_to_df(gene_dictionary, tmp)
def _add_entry_to_complexes_or_rna(self,
complexes_df,
RNA_df,
gene_name,
gene_id,
feature,
):
"""Append one entry to the Complexes of RNA dataframe according to its type"""
name_annotation = feature.qualifiers["product"][0] if 'product' in feature.qualifiers \
else gene_name
if feature.type == 'CDS':
product = gene_name + '-MONOMER'
if not complexes_df["genes"].str.contains(gene_id).any():
complexes_df = \
self._add_entry_to_complexes(gene_id,
name_annotation,
product,
complexes_df,
"GenBank")
else: # It's not CDS, but an RNA
product = "{}-{}".format(gene_name,feature.type)
if not RNA_df["Gene"].str.contains(gene_name).any():
RNA_df = \
self._add_entry_to_rna(gene_id,
name_annotation,
product,
RNA_df,
"GenBank")
return complexes_df,RNA_df,product
def _add_entries_to_optional_files(self,
gene_dictionary,
complexes_df,
RNA_df,
feature,
record,
product_types):
"""Add entries to the optional files from the GenBank"""
# gene_id = feature.qualifiers[self.locus_tag][0]
gene_id = self._get_feature_locus_tag(feature)
if ';' in gene_id:
gene_id = gene_id.split(';')[0]
left_end = min([i.start for i in feature.location.parts])
right_end = max([i.end for i in feature.location.parts])
if not gene_dictionary["Accession-1"].eq(gene_id).any():
gene_dictionary = \
self._add_entry_to_gene_dictionary(
gene_dictionary,
gene_id,
feature,
left_end,
right_end)
gene_names = gene_dictionary[gene_dictionary["Accession-1"].eq(gene_id)].index
warn_genes = []
for gene_name in gene_names:
complexes_df,RNA_df,product = \
self._add_entry_to_complexes_or_rna(
complexes_df,
RNA_df,
gene_name,
gene_id,
feature,
)
gene_dictionary.at[gene_name,'Product'] = product # Ensuring product is the same.
gene_dictionary.at[gene_name,"Left-End-Position"] = left_end
gene_dictionary.at[gene_name,"Right-End-Position"] = right_end
gene_dictionary.at[gene_name,"replicon"] = record.id
# Update product types
gid,product,product_type = \
self._get_product_type(
gene_name,
gene_dictionary = gene_dictionary,
complexes_df = complexes_df,
RNA_df = RNA_df,
warn_genes = warn_genes)
if product is None:
warn_genes.append(gid)
continue
if ' ' in product or ('RNA' not in product and 'MONOMER' not in product):
product = \
self._correct_product(
gene_name,
product_type)
product_types[gene_id] = product_type
return gene_dictionary,complexes_df,RNA_df
def update_complexes_genes_with_genbank(self):
""" Complements complexes and genes with genome"""
if self.is_reference:
return
# In some genbanks, CDS are duplicated with gene features. See staph or pputida
complexes_df = self.complexes_df
gene_dictionary = self.gene_dictionary
RNA_df = self.RNA_df
product_types = self.product_types
warn_locus = []
for record in tqdm.tqdm(self.contigs,
'Syncing optional files with genbank contigs...',
bar_format = bar_format):
for feature in record.features:
if feature.type not in element_types:
continue
if self._get_feature_locus_tag(feature) is None:
continue
gene_dictionary,complexes_df,RNA_df = \
self._add_entries_to_optional_files(
gene_dictionary,
complexes_df,
RNA_df,
feature,
record,
product_types)
self.complexes_df = complexes_df
gene_dictionary.index.name = "Gene Name"
self.gene_dictionary = gene_dictionary
self.RNA_df = RNA_df
if warn_locus:
self.curation_notes['org.update_complexes_genes_with_genbank'].append({
'msg':'Some genbank features do not have locus_tag.',
'triggered_by':warn_locus,
'importance':'high',
'to_do':'Check whether these features are necessary, and correct their locus_tag. If they have been completed from other provided files, ignore.'})
def purge_genes_in_model(self):
""" Purges problematic genes in the M-model"""
m_model = self.m_model
gene_dictionary = self.gene_dictionary
RNA_df = self.RNA_df
gene_list = []
wrong_assoc = []
for g in tqdm.tqdm(m_model.genes,
'Purging M-model genes...',
bar_format = bar_format):
if g.id not in gene_dictionary['Accession-1'].values:
gene_list.append(g)
else:
product = gene_dictionary[self.gene_dictionary['Accession-1'].eq(g.id)]['Product'].values[0]
if product in RNA_df.index:
wrong_assoc.append(g)
self.skip_genes = [g.id for g in gene_list + wrong_assoc]
# cobra.manipulation.delete.remove_genes(m_model,
# gene_list + wrong_assoc,
# remove_reactions=False)
# Warnings
if gene_list:
self.curation_notes['org.purge_genes_in_model'].append({
'msg':'Some genes in M-model were not found in genes.txt or genome.gb. These genes will be skipped in reconstruction.',
'triggered_by':[g.id for g in gene_list],
'importance':'high',
'to_do':'Confirm the gene is correct in the m_model. If so, add it to genes.txt'})
if wrong_assoc:
self.curation_notes['org.purge_genes_in_model'].append({
'msg':'Some genes in M-model are RNAs. These genes will be skipped in reconstruction.',
'triggered_by':[g.id for g in wrong_assoc],
'importance':'high',
'to_do':'Confirm the gene is correct in the m_model. If so, then annotation from GenBank or BioCyc marked them as a different type'})
def _get_ligases_from_regex(self,
complexes_df):
"""Call ligases from provided files using regex"""
return self._get_slice_from_regex(
complexes_df,
"[-]{,2}tRNA (?:synthetase|ligase)(?!.*subunit.*)")
def _get_ligases_subunits_from_regex(self,
complexes_df):
"""Call ligases subunits from provided files using regex"""
return self._get_slice_from_regex(
complexes_df,
"[-]{,2}tRNA (?:synthetase|ligase)(?=.*subunit.*)")
def _extract_trna_string(self,
trna_string):
"""Srip the tRNA ligase information to call the amino acid"""
t = re.findall(".*[-]{,2}tRNA (?:synthetase|ligase)",trna_string)
return t[0] if t else None
def _is_base_complex_in_list(self,cplx,lst):
"""Check if a complex (base) is already in a list of complexes"""
return cplx in set(i.split('_mod_')[0] for i in lst)
def _get_genes_of_cplx(self,cplx):
"""Get the genes associated with a complex from the files"""
d = {}
for i in self.complexes_df.loc[cplx]['genes'].split(' AND '):
gene = re.findall(r'.*(?=\(\d*\))',i)[0]
coeff = re.findall(r'(?<=\().*(?=\))',i)[0]
d[gene] = coeff
return d
# return [re.findall('.*(?=\(\d*\))',i)[0] \
# for i in self.complexes_df.loc[cplx]['genes'].split(' AND ')]
def get_trna_synthetase(self):
""" Gets tRNA synthetases from files."""
if self.is_reference:
return
def find_aminoacid(trna_string):
"""Call the aminoacid from a ligase description string"""
trna_string = trna_string.lower()
for aa, rx in coralme.builder.dictionaries.amino_acid_regex.items():
if re.search(rx, trna_string):
return aa
return None
org_amino_acid_trna_synthetase = self.amino_acid_trna_synthetase
manually_curated_aa = [k for k,v in org_amino_acid_trna_synthetase.items() if v]
generic_dict = self.generic_dict
complexes_df = self.complexes_df
warn_generic = []
d = collections.defaultdict(set)
for k,v in org_amino_acid_trna_synthetase.copy().items():
if isinstance(v,list):
d[k] = set(v)
elif isinstance(v,str):
if not v:
d[k] = set()
continue
if 'generic' in v:
if v not in generic_dict:
warn_generic.append(v)
d[k] = set()
continue
# d[k] = set(generic_dict[v]['enzymes'])
# continue
d[k] = set([v])
trna_ligases = self._get_ligases_from_regex(complexes_df).to_dict()['name']
for cplx, trna_string in trna_ligases.items():
aa = find_aminoacid(trna_string)
if aa is None:continue
if aa in manually_curated_aa: continue
# if aa not in d: d[aa] = set()
if self._is_base_complex_in_list(cplx,d[aa]): continue
d[aa].add(cplx)
trna_ligases_from_subunits = self._get_ligases_subunits_from_regex(complexes_df).to_dict()['name']
# new_cplxs = {k:dict() for k in d.copy()}
new_cplxs = collections.defaultdict(dict)
for cplx,trna_string in trna_ligases_from_subunits.items():
trna_string = self._extract_trna_string(trna_string)
aa = find_aminoacid(trna_string)
if aa is None:continue
# if aa not in d: d[aa] = set()
if d[aa]: continue
cplx_genes = self._get_genes_of_cplx(cplx)
for k,v in cplx_genes.items():
# if aa not in new_cplxs: new_cplxs[aa] = dict()
new_cplxs[aa][k] = v
# new_cplxs[aa].add(cplx)
for k,v in new_cplxs.items():
if not v: continue
cplx_id = "CPLX-tRNA-{}-LIGASE".format(k.upper()[:3])
complexes_df = self._add_entry_to_complexes(
v,
"tRNA-{} ligase".format(k[0].upper() + k[1:3]),
cplx_id,
complexes_df,
"Inferred from subunits")
d[k] = set([cplx_id])
warn_ligases = []
for aa,c_set in d.items():
c_list = list(c_set)
if len(c_list) == 1:
d[aa] = c_list[0]
elif len(c_list) > 1:
generic = 'generic_{}_ligase'.format(aa.split('__L_c')[0])
generic_dict[generic] = {'enzymes':c_list}
d[aa] = generic
else:
d[aa] = ''
warn_ligases.append(aa)
self.amino_acid_trna_synthetase = dict(d)
self.complexes_df = complexes_df
# Warnings
if warn_ligases:
self.curation_notes['org.get_trna_synthetase'].append({
'msg':'No tRNA ligases were found for some amino acids. Will assign CPLX_dummy.',
'triggered_by':warn_ligases,
'importance':'high',
'to_do':'Check whether your organism should have a ligase for these amino acids, or if you need to add a reaction to get it (e.g. tRNA amidotransferases)'})
if warn_generic:
self.curation_notes['org.get_trna_synthetase'].append({
'msg':'A generic tRNA ligase was defined in amino_acid_trna_synthetase, but it is not defined in generic_dict.',
'triggered_by':warn_generic,
'importance':'high',
'to_do':'Fix the definition in generic_dict'})
def get_peptide_release_factors(self):
""" Gets peptide release factors from files."""
if self.is_reference:
return
proteins_df = self.proteins_df["Common-Name"].dropna()
peptide_release_factors = self.peptide_release_factors
generics = self.generic_dict
rf = proteins_df[proteins_df.str.contains("peptide.*release.*factor")]
if rf.size:
if not peptide_release_factors["UAG"]['enzyme'] and rf.str.contains("1").any():
peptide_release_factors["UAG"]['enzyme'] = rf[rf.str.contains("1")].index[0]
generics["generic_RF"]['enzymes'].append(peptide_release_factors["UAG"]['enzyme'])
if not peptide_release_factors["UGA"]['enzyme'] and rf.str.contains("2").any():
peptide_release_factors["UGA"]['enzyme'] = rf[rf.str.contains("2")].index[0]
generics["generic_RF"]['enzymes'].append(peptide_release_factors["UGA"]['enzyme'])
def get_nonmetabolic(self):
""" Gets nonmetabolic metabolites in M-model."""
m_model = self.m_model
queries = ['ACP','trna']
for m in m_model.metabolites.query('|'.join(queries)):
if m.id in self.me_mets.index:
continue
tmp = {m.id:{
'me_id':'',
'name':'',
'formula':'',
'compartment':'',
'type':'CURATE'
}}
self.me_mets = self._add_entry_to_df(self.me_mets,tmp)
return None
def gb_to_faa(self, org_id, outdir = False, element_types = {"CDS"}):
""" Generates a protein FASTA from genome for BLAST"""
# Create FASTA file with AA sequences for BLAST
contigs = self.contigs
if not outdir:
outdir = self.blast_directory
#outdir += "blast_files_and_results/"
FASTA_file = outdir + "{}.faa".format(org_id)
# FASTA_file = "{}.faa".format(org_id)
file = open(FASTA_file, "w")
for contig in tqdm.tqdm(contigs,
'Converting Genbank contigs to FASTA for BLAST...',
bar_format = bar_format):
for feature in contig.features:
if feature.type not in element_types \
or "translation" not in feature.qualifiers \
or self.locus_tag not in feature.qualifiers:
continue
file.write(
">{}\n".format(feature.qualifiers[self.locus_tag][0])
) # Some way to identify which qualifier meets regular expression?
file.write("{}\n".format(feature.qualifiers["translation"][0]))
file.close()
def _process_sigma_name(self,name, row):
"""Generate a sigma factor identifier from its description"""
name = name.split("RNA polymerase")[-1]
replace_list = ["sigma", "factor", "sup"]
for r in replace_list:
name = name.replace(r, "")
name = "".join(re.findall("[a-zA-Z0-9]{1,}", name))
if not name:
name = "_".join(row["genes"])
return "RNAP_" + name
def get_sigma_factors(self):
""" Gets sigma factors from files."""
complexes_df = self.complexes_df
sigma_df = complexes_df.loc[
complexes_df["name"].str.contains(
"RNA polymerase.*sigma.*[factor.*]?.*|sigma.*[factor.*]?.*RNA polymerase", regex=True
)
]
if not sigma_df.shape[0]:
self.curation_notes['org.get_sigma_factors'].append({
'msg':"No sigma factors could be identified from proteins.txt",
'importance':'critical',
'to_do':'Manually define sigmas in sigma_factors.txt'})
random_cplx = random.choice(complexes_df.index)
sigma_df = complexes_df.loc[[random_cplx]]
## Get sigmas automatically
# Find RpoD to add as default sigma
for s, row in tqdm.tqdm(sigma_df.iterrows(),
'Getting sigma factors...',
bar_format = bar_format,
total=sigma_df.shape[0]):
tmp = {
s : {
"complex" : self._process_sigma_name(s, row),
"genes" : row["genes"],
"name" : row["name"]
}
}
self.sigmas = self._add_entry_to_df(self.sigmas,tmp)
def get_rpod(self):
""" Gets RpoD from files."""
sigma_df = self.sigmas
rpod = sigma_df[sigma_df["name"].str.contains("RpoD")].index.to_list()
if not rpod:
rpod_re = "|".join(["70", "sigma-A", "sigA", "SigA","Sigma-A"])
rpod = sigma_df[sigma_df["name"].str.contains(rpod_re)].index.to_list()
if rpod:
rpod = rpod[0]
# Warnings
self.curation_notes['org.get_rpod'].append({
'msg':"{} was identified as RpoD. If this is not true, define RpoD!".format(rpod),
'importance':'high',
'to_do':'Check whether you need to correct RpoD by running me_builder.org.rpod = correct_rpod'})
else:
rpod = random.choice(sigma_df.index)
# Warnings
self.curation_notes['org.get_rpod'].append({
'msg':"RpoD randomly assigned to {}".format(rpod),
'importance':'critical',
'to_do':'genome.gb does not have a valid annotation for RpoD. A random identified sigma factor in me_builder.org.sigmas was set as RpoD so that the builder can continue running. Set the correct RpoD by running me_builder.org.rpod = correct_rpod'})
self.rpod = rpod
def _get_slice_from_regex(self,
df,
regex,
):
"""The a slice of a dataframe from a regex"""
return df[df["name"].str.contains(regex,regex=True)]
def _get_complex_from_regex(self,
complexes_df,
cplx_regex,
subunit_regex=None):
"""Get complex as one entry from complexes"""
cplx = self._get_slice_from_regex(complexes_df,cplx_regex)
if cplx.shape[0] == 1:
return cplx.iloc[[0],:],'cplx'
if subunit_regex is None:
return None,None
# Get complex as composed from subunits
subunits = self._get_slice_from_regex(complexes_df,subunit_regex)
if subunits.empty:
return None,None
if subunits.shape[0] == 1:
return subunits,'cplx'
return subunits,'subunits'
def _get_rna_polymerase_from_regex(self,
complexes_df):
"""Call the RNAP from regex"""
cplx,flag = self._get_complex_from_regex(
complexes_df,
"(?:RNA polymerase.*core enzyme|DNA.*directed.*RNA polymerase)(?!.*subunit.*|.*chain.*)",
subunit_regex = "(?:RNA polymerase.*core enzyme|DNA.*directed.*RNA polymerase)(?=.*subunit.*|.*chain.*)")
if cplx is not None:
return cplx,flag
cplx,flag = self._get_complex_from_regex(
complexes_df,
"(?:^RNA polymerase$)",
subunit_regex = "(?:^RNA polymerase)(?=.*subunit.*|.*chain.*)")
return cplx,flag
def _add_rna_polymerase_to_complexes(self,
complexes_df,
RNAP_genes):
"""Add the RNAP entry to the complexes dataframe"""
tmp = pandas.DataFrame.from_dict({
"RNAP-CPLX": {
"name": "DNA-directed RNA polymerase",
"genes": " AND ".join(["{}()".format(g) for g in RNAP_genes]),
"source": "GenBank",
}}).T
return pandas.concat([complexes_df, tmp], axis = 0, join = 'outer')
def _is_beta_prime_in_RNAP(self,RNAP,complexes_df):
"""Checks if beta prime subunit is in RNAP"""
genes = [re.findall(r'.*(?=\(\d*\))',i)[0] for i in complexes_df.loc[RNAP]['genes'].split(' AND ')]
df = complexes_df[complexes_df['genes'].str.contains('|'.join(genes))]
return df['name'].str.contains(r"beta(?:\'|.*prime)|rpoc|RNA polymerase.*(?:subunit|chain).*beta",regex=True,case=False).any()
def get_rna_polymerase(self):
"""Call the RNAP from files"""
# TODO: Allow user to define RNAP, skip inferring?
complexes_df = self.complexes_df
protein_mod = self.protein_mod
RNAP = self.config.get("force_RNAP_as",None)
if RNAP is None:
RNAP,flag = self._get_rna_polymerase_from_regex(complexes_df)
if RNAP is None:
RNAP = random.choice(complexes_df.index)
self.curation_notes['org.get_rna_polymerase'].append({
'msg':"Could not identify RNA polymerase".format(RNAP),
'importance':'critical',
'to_do':'Find correct RNAP complex and add force_RNAP_as:"RNAP_COMPLEX_ID" to the configuration (by default, organism.json)'})
elif flag == 'cplx':
RNAP = RNAP.index[0]
# Warnings
self.curation_notes['org.get_rna_polymerase'].append({
'msg':"{} was identified as RNA polymerase".format(RNAP),
'importance':'high',
'to_do':'Find correct RNAP complex and add force_RNAP_as:"RNAP_COMPLEX_ID" to the configuration (by default, organism.json)'})
elif flag == 'subunits':
RNAP_genes = [g.split("-MONOMER")[0] for g in RNAP.index if "-MONOMER" in g]
RNAP_genes = [self.gene_dictionary.loc[g]['Accession-1'] for g in RNAP_genes]
RNAP = 'RNAP-CPLX'
complexes_df = self._add_rna_polymerase_to_complexes(complexes_df,
RNAP_genes)
self.curation_notes['org.get_rna_polymerase'].append({
'msg':"RNAP was identified with subunits {}".format(
", ".join(RNAP_genes)
),
'importance':'medium',
'to_do':'Check whether the correct proteins were called as subunits of RNAP. If not find correct RNAP complex and run me_builder.org.get_rna_polymerase(force_RNAP_as=correct_RNAP)'})
# Identify if beta prime in RNAP, if so, add zn2 and mg2. https://pubmed.ncbi.nlm.nih.gov/15351641/
if self._is_beta_prime_in_RNAP(RNAP,complexes_df):
RNAP_mod = RNAP + '_mod_zn2(1)_mod_mg2(2)'
protein_mod = \
self._add_entry_to_protein_mod(protein_mod,
RNAP_mod,
RNAP,
"zn2(1) AND mg2(2)",
"RNA_Polymerase")
RNAP = RNAP_mod
self.RNAP = RNAP
self.complexes_df = complexes_df
self.protein_mod = protein_mod
self.sigma_factor_complex_to_rna_polymerase_dict = self.sigmas[
"complex"
].to_dict()
self.rna_polymerase_id_by_sigma_factor = {}
for k, v in self.sigma_factor_complex_to_rna_polymerase_dict.items():
self.rna_polymerase_id_by_sigma_factor[v] = {
"sigma_factor": k,
"polymerase": self.RNAP,
}
self.rna_polymerases = list(self.rna_polymerase_id_by_sigma_factor.keys())
def get_TU_genes(self):
""" Gets TU composition from files."""
TUs = self.TUs
gene_dictionary = self.gene_dictionary
genes_to_TU = {}
TU_to_genes = {}
for tu, row in tqdm.tqdm(TUs.iterrows(),
'Getting TU-gene associations from optional TUs file...',
bar_format = bar_format,
total=TUs.shape[0]):
genes = row["Genes of transcription unit"]
if not genes:
continue
for g in genes.split(" // "):
if g not in gene_dictionary.index:
continue
genes_to_TU[gene_dictionary["Accession-1"][g]] = tu
if tu not in TU_to_genes:
TU_to_genes[tu] = []
TU_to_genes[tu].append(g)
self.genes_to_TU = genes_to_TU
self.TU_to_genes = TU_to_genes
def get_TU_df(self):
""" Generates TUs_from_biocyc."""
TUs = self.TUs
gene_dictionary = self.gene_dictionary
rpod = self.rpod
rho_independent = self.rho_independent
TU_dict = {}
warn_genes = []
warn_tus = []
if TUs.empty:
return pandas.DataFrame(
columns = [
'TU_id',
'replicon',
'genes',
'start',
'stop',
'tss',
'strand',
'rho_dependent',
'rnapol'
]
).set_index('TU_id')
for tu, row in tqdm.tqdm(TUs.iterrows(),
'Processing optional TUs file...',
bar_format = bar_format,
total=TUs.shape[0]):
sites = []
start = []
stop = []
genes = []
replicons = []
for g in row["Genes of transcription unit"].split(" // "):
if g not in gene_dictionary.index:
warn_genes.append(g)
continue
genes.append(gene_dictionary["Accession-1"][g])
sites.append(int(gene_dictionary["Left-End-Position"][g]))
sites.append(int(gene_dictionary["Right-End-Position"][g]))
# start.append(int(gene_dictionary["Left-End-Position"][g]))
# stop.append(int(gene_dictionary["Right-End-Position"][g]))
replicons.append(gene_dictionary["replicon"][g])
if not genes:
warn_tus.append(tu)
continue
sigma = rpod # Default RpoD
tu_name = "{}_from_{}".format(tu, sigma)
TU_dict[tu_name] = {}
TU_dict[tu_name]["genes"] = ','.join(genes)
TU_dict[tu_name]["rho_dependent"] = False if tu in rho_independent else True
TU_dict[tu_name]["rnapol"] = sigma
TU_dict[tu_name]["tss"] = None
TU_dict[tu_name]["strand"] = row["Direction"] if row["Direction"] else '+'
TU_dict[tu_name]["start"] = int(min(sites))+1
# TU_dict[tu_name]["start"] = ','.join([ str(x+1) for x in start ])
TU_dict[tu_name]["stop"] = int(max(sites))
# TU_dict[tu_name]["stop"] = ','.join([ str(x) for x in stop ])
TU_dict[tu_name]["replicon"] = ','.join(replicons) if set(replicons) != {''} else None
if TU_dict:
df = pandas.DataFrame.from_dict(TU_dict).T[
#["start", "stop", "tss", "strand", "rho_dependent", "rnapol","replicon"]
['replicon', 'genes', 'start', 'stop', 'tss', 'strand', 'rho_dependent', 'rnapol']
]
df.index.name = "TU_id"
else:
df = pandas.DataFrame(
columns = [
'TU_id',
'replicon',
'genes',
'start',
'stop',
'tss',
'strand',
'rho_dependent',
'rnapol'
]
).set_index('TU_id')
# Warnings
if warn_genes or warn_tus:
if warn_genes:
self.curation_notes['org.get_TU_df'].append({
'msg':"Some genes appear in TUs.txt but not in genes.txt",
'triggered_by':warn_genes,
'importance':'medium',
'to_do':'If those genes are supposed to be in the model, fill them in genes.txt'})
if warn_tus:
self.curation_notes['org.get_TU_df'].append({
'msg':"Some TUs are either empty (no genes in TUs.txt) or contain genes that are not in genes.txt",
'triggered_by':warn_tus,
'importance':'medium',
'to_do':'If those TUs contain genes that are supposed to be in the model, fill them in TUs.txt and genes.txt'})
return df
def _is_cytosol_in_locations(self,locs):
"""Checks if cytosol is in the locations list of a protein"""
for i in locs:
if 'CCI-CYTOSOL' in i:
return True
return False
def _process_location_dict(self,
location,
location_interpreter):
new_location = {}
for k, v in location.items():
if not v or isinstance(v, float):
continue
locations = []
for loc in v.split(" // "):
if loc not in location_interpreter.index:
continue
locations.append(location_interpreter["interpretation"][loc])
if self._is_cytosol_in_locations(locations):
continue
if locations:
new_location[k] = locations[0]
return new_location
def _add_entry_to_protein_location(self,
c,
c_loc,
gene_string,
gene_dictionary,
protein_location,
gene_location):
"""Append one entry to the protein locations dataframe"""
gene = re.findall(r'.*(?=\(\d*\))', gene_string)[0]
if gene not in gene_dictionary.index:
return protein_location
gene = gene_dictionary.loc[[gene]]["Gene Name"]
for gene_ in gene: # In case of duplicates
if gene_ in gene_location:
tmp = {
c: {
"Complex_compartment": c_loc,
"Protein": gene_string,
"Protein_compartment": gene_location[gene_],
"translocase_pathway": "s",
}}
protein_location = self._add_entry_to_df(protein_location,tmp)
return protein_location
def get_protein_location(self):
""" Gets protein location from files."""
complexes_df = self.complexes_df
proteins_df = self.proteins_df
gene_dictionary = self.gene_dictionary
location_interpreter = self.location_interpreter
protein_location = self.protein_location
gene_location = self._process_location_dict(
proteins_df.set_index("Genes of polypeptide, complex, or RNA")["Locations"]
.dropna()
.to_dict(),
location_interpreter,
)
cplx_location = self._process_location_dict(
proteins_df["Locations"].dropna().to_dict(), location_interpreter
)
gene_dictionary = gene_dictionary.reset_index().set_index("Accession-1")
for c, row in tqdm.tqdm(complexes_df.iterrows(),
'Adding protein location...',
bar_format = bar_format,
total=complexes_df.shape[0]):
if c not in cplx_location:
continue
c_loc = cplx_location[c]
for gene_string in complexes_df["genes"][c].split(' AND '):
protein_location = self._add_entry_to_protein_location(
c,
c_loc,
gene_string,
gene_dictionary,
protein_location,
gene_location)
self.protein_location = protein_location
#TODO: Add warnings for empty location
# TODO: New format of keffs file
def get_reaction_keffs(self):
""" Gets reaction Keffs from files."""
if self.is_reference:
return None
# Keff estimator from https://pubs.acs.org/doi/10.1021/bi2002289
m_model = self.m_model
subsystem_classification = self.subsystem_classification
enz_rxn_assoc_df = self.enz_rxn_assoc_df
rxn_keff_dict = {}
reaction_dirs = ["FWD", "REV"]
keffs = {
"central_CE": 79,
"central_AFN": 18,
"intermediate": 5.2,
"secondary": 2.5,
"other": 65,
}
for reaction, row in tqdm.tqdm(enz_rxn_assoc_df.iterrows(),
'Getting reaction Keffs...',
bar_format = bar_format,
total=enz_rxn_assoc_df.shape[0]):
if reaction not in m_model.reactions:
#TODO: Change this so that Keffs of new reactions in reaction_matrix can be estimated
logging.warning('Tried setting Keffs for {}, but the reaction is not in model.'.format(reaction))
continue
r = m_model.reactions.get_by_id(reaction)
subsystem = r.subsystem
if subsystem in subsystem_classification:
classification = subsystem_classification[subsystem]
else:
classification = "other"
keff = keffs[classification]
cplxs = row["Complexes"].split(" OR ")
for c in cplxs:
for d in reaction_dirs:
r = "{}".format(reaction)
rxn_keff_dict[r] = {}
rxn_keff_dict[r]["keff"] = keff
self.reaction_median_keffs = pandas.DataFrame.from_dict(rxn_keff_dict).T
self.reaction_median_keffs.index.name = "reaction"
self.reaction_median_keffs.to_csv(self.directory + 'reaction_median_keffs.txt', sep='\t')
return self.reaction_median_keffs['keff'].to_dict()
def get_phospholipids(self):
""" Gets phospholipids from M-model."""
m_model = self.m_model
return [
str(m.id) for m in m_model.metabolites.query(re.compile("^pg[0-9]{2,3}_.$"))
]
def get_lipids(self):
""" Gets lipids from M-model."""
m_model = self.m_model
return [
str(m.id) for m in m_model.metabolites.query(re.compile("^[a-z]*[0-9]{2,3}_.$"))
]
def _get_feature_locus_tag(self,
feature):
"""Get the locus tag identifier from a Biopython feature"""
lt = feature.qualifiers.get(self.locus_tag,None)
if lt is not None:
return lt[0]
lt = feature.qualifiers.get('locus_tag',None)
return lt[0] if lt is not None else None
def _map_to_a_generic(self,
feature,
generic_dict):
"""Associate a Biopython feature to a generic component"""
gene_id = self._get_feature_locus_tag(feature)
if gene_id is None:
logging.warning('Could not get {} of a feature at location {}'.format(feature.location))
return
gene = "RNA_" + gene_id
if any("5S" in i for i in feature.qualifiers["product"]):
cat = "generic_5s_rRNAs"
elif any("16S" in i for i in feature.qualifiers["product"]):
cat = "generic_16s_rRNAs"
elif any("23S" in i for i in feature.qualifiers["product"]):
cat = "generic_23s_rRNAs"
else:
cat = 0
if cat:
logging.warning("Setting {} to {}".format(gene, cat))
generic_dict[cat]['enzymes'].append(gene)
def get_generics_from_genbank(self):
""" Gets generics from genome."""
if self.is_reference:
return None
contigs = self.contigs
generic_dict = self.generic_dict
warn_generics = []
for contig in tqdm.tqdm(contigs,
'Getting generics from Genbank contigs...',
bar_format = bar_format):
for feature in contig.features:
if "rRNA" in feature.type:
self._map_to_a_generic(
feature,
generic_dict)
if self.duplicated_genes is not None:
for d in self.duplicated_genes:
if not d: continue
dups = self.gene_dictionary[self.gene_dictionary['Accession-1'].eq(d)]
generic_dict['generic_{}'.format(d)] = {"enzymes":[i for i in dups['Product'].values if i]}
for k, v in generic_dict.items():
if not v:
warn_generics.append(k)
if warn_generics:
self.curation_notes['org.get_generics_from_genbank'].append({
'msg':'Some generics in me_builder.org.generic_dict are empty.',
'triggered_by':warn_generics,
'importance':'high',
'to_do':'Curate and fill generics in generics.txt or directly in me_builder.org.generic_dict'})
def _modify_rna_modification_from_load(self,df):
"""Read and preprocess the RNA modification file"""
d = {}
for idx,row in df.iterrows():
mods = ['{}_at_{}'.format(idx,i) for i in row['positions'].split(',')]
for enz in row['enzymes'].split(' AND '):
#if enz not in d: d[enz] = []
#d[enz] = mods
for mod in mods:
d.setdefault(enz, []).append(mod)
return d
def _get_rrna_genes(self):
"""Get the genes associated with rRNAs"""
rrnas = ['generic_5s_rRNAs','generic_16s_rRNAs','generic_23s_rRNAs']
generic_dict = self.generic_dict
d = {}
for key in rrnas:
rrnaid = key.split('_')[1].upper()
d[rrnaid] = [i[4:] for i in generic_dict[key]['enzymes']]
return d
def process_rna_modifications(self):
""" Processes RNA modification information."""
rna_mods = self.rna_modification_df
self.rna_modification = self._modify_rna_modification_from_load(rna_mods)
mod_targets = self.rna_modification_targets
for subunit,genes in self._get_rrna_genes().items():
mod_df = rna_mods[rna_mods['type'].eq(subunit)].T
if mod_df.empty:continue
for mod,row in mod_df.items():
positions = row['positions'].split(',')
for g in genes:
gene_mods = pandas.DataFrame()
if g in mod_targets.index:
gene_mods = mod_targets.loc[[g]]
d = {}
for p in positions:
if not gene_mods.empty and gene_mods['position'].eq(p).any():
continue
d[g] = {'modification':mod,'position':p}
df = pandas.DataFrame.from_dict(d).T
mod_targets = pandas.concat([mod_targets,df])
mod_targets.index.name = 'bnum'
self.rna_modification_targets = mod_targets
def _check_for_duplicates_within_datasets(self,
info):
"""Checks for duplicates within optinal files"""
import collections
warn_dups = {}
for k,v in tqdm.tqdm(info.items(),
'Looking for duplicates within datasets...',
bar_format = bar_format,
total = len(info)):
if len(v) != len(set(v)):
warn_dups[k] = [item for item, count in collections.Counter(v).items() if count > 1]
if warn_dups:
self.curation_notes['org.check_for_duplicates'].append({
'msg':'Some datasets contain duplicate indices or Accession IDs.',
'triggered_by' : [warn_dups],
'importance':'critical',
'to_do':'Remove or fix duplicates. If duplicates are in Accession-1, they are processed as different possibilities to get the same enzyme, so they are added as generic complexes. Check!'})
if 'Accession-1' in warn_dups and not self.is_reference:
self.duplicated_genes = warn_dups['Accession-1']
return
self.duplicated_genes = None
def _check_for_duplicates_between_datasets(self,
info):
"""Checks for duplicates between optinal files"""
cplxs = set(info['complexes_df'])
rnas = set(info['RNA_df'])
genes = set(info['gene_dictionary'])
rxns = set(info['reactions'])
occ = {}
for i in tqdm.tqdm(cplxs|rnas|genes|rxns,
'Gathering ID occurrences across datasets...',
bar_format = bar_format):
occ[i] = {'proteins':0,'RNAs':0,'genes':0,'reactions':0}
if i in cplxs:
occ[i]['proteins'] += 1
if i in rnas:
occ[i]['RNAs'] += 1
if i in genes:
occ[i]['genes'] += 1
if i in rxns:
occ[i]['reactions'] += 1
df = pandas.DataFrame.from_dict(occ).T
df = df[df['reactions'] == 1]
dup_df = df[df.sum(1)>1]
return dup_df
def _solve_duplicates_between_datasets(self,
dup_df):
"""Solves found duplicates according to hierarchy"""
from coralme.builder.helper_functions import change_reaction_id
for c,row in tqdm.tqdm(dup_df.iterrows(),
'Solving duplicates across datasets...',
bar_format = bar_format,
total=dup_df.shape[0]):
if row['reactions']:
change_reaction_id(self.m_model,c,c+'_rxn')
logging.warning('Changed reaction ID from {} to {} to prevent the conflict between: {}'.format(c,c+'_rxn',' and '.join([j for j,k in row.items() if k])))
else:
raise ValueError('The identifier {} is duplicated in {}. Please fix!'.format(c,' and '.join([j for j,k in row.items() if k])))
def check_for_duplicates(self):
""" Checks for problematic duplicates in provided files."""
# Duplicates within datasets
info = {
'complexes_df' : list(self.complexes_df.index),
'RNA_df' : list(self.RNA_df.index),
'gene_dictionary' : list(self.gene_dictionary.index),
'reactions' : list([i.id for i in self.m_model.reactions]),
'Accession-1' : list(self.gene_dictionary['Accession-1'].values)
}
self._check_for_duplicates_within_datasets(info)
dup_df = self._check_for_duplicates_between_datasets(info)
self._solve_duplicates_between_datasets(dup_df)
def _is_sequence_divisible_by_three(self,
contig,
f):
"""Checks whether a gene sequence is divisible by three"""
if f.type == 'source' or 'RNA' in f.type:
return True
return not bool(len(f.extract(contig).seq.replace('-', '')) % 3)
def prune_genbank(self):
""" Prunes and cleans genome file"""
if self.is_reference:
return
contigs = self.contigs
exclude_prune_types = list(element_types) #+ ['source','gene']
new_contigs = []
warn_sequence = []
self.all_genes_in_gb = []
for contig in tqdm.tqdm(contigs,
'Pruning GenBank...',
bar_format = bar_format):
new_contig = \
self._create_genbank_contig(
contig.id,
contig.seq,
contig.name,
contig.description,
'GenBank')
new_contig.features = []
for feature in contig.features:
if feature.type in ['source', 'region']: # genbanks from gff+fna do not contain a 'source' feature
new_contig.features.append(feature)
continue
if feature.type not in exclude_prune_types:
continue
if 'pseudo' in feature.qualifiers:
if 'RNA' in feature.type:
del feature.qualifiers['pseudo'] # pseudo RNA?
elif not self.config.get('include_pseudo_genes', False):
continue
if 'transl_table' not in feature.qualifiers and 'RNA' not in feature.type:
feature.qualifiers['transl_table'] = [self.transl_table]
if not self._is_sequence_divisible_by_three(new_contig,
feature):
d = feature.qualifiers.copy()
d['location'] = str(feature.location)
warn_sequence.append(d)
locus_tag = self._get_feature_locus_tag(feature)
if locus_tag is not None:
feature.qualifiers[self.locus_tag] = [locus_tag]
self.all_genes_in_gb.append(locus_tag)
new_contig.features.append(feature)
if len(new_contig.features) <= 1:
# only source, no feature
continue
new_contigs.append(new_contig)
self.contigs = new_contigs
if warn_sequence:
self.curation_notes['org.prune_genbank'].append({
'msg':'Some features contain a sequence that is not divisible by 3.',
'triggered_by':warn_sequence,
'importance':'critical',
'to_do':'Check whether any of these genes are translated in your final ME-model. If so, fix the positions of the gene in genome_modified.gb'})
def modify_metabolic_reactions(self):
""" Modifies metabolic reactions from manual input"""
if self.is_reference:
return
m_model = self.m_model
new_reactions_dict = self.reaction_corrections
for rxn_id, info in tqdm.tqdm(new_reactions_dict.items(),
'Modifying metabolic reactions with manual curation...',
bar_format = bar_format,
total=len(new_reactions_dict)):
if info["reaction"] == "eliminate":
m_model.reactions.get_by_id(rxn_id).remove_from_model()
else:
if rxn_id not in m_model.reactions:
rxn = cobra.Reaction(rxn_id)
m_model.add_reactions([rxn])
else:
rxn = m_model.reactions.get_by_id(rxn_id)
if info["reaction"]:
rxn.build_reaction_from_string(info["reaction"])
if info["gene_reaction_rule"]:
if info["gene_reaction_rule"] == "no_gene":
rxn.gene_reaction_rule = ""
else:
rxn.gene_reaction_rule = info["gene_reaction_rule"]
if info["name"]:
rxn.name = info["name"]
def modify_metabolites(self):
""" Modifies metabolites from manual input"""
if self.is_reference:
return
m_model = self.m_model
new_metabolites_dict = self.metabolite_corrections
for met_id, info in tqdm.tqdm(new_metabolites_dict.items(),
'Modifying metabolites with manual curation...',
bar_format = bar_format,
total=len(new_metabolites_dict)):
if info["name"] == "eliminate":
m_model.metabolites.get_by_id(met_id).remove_from_model()
else:
if met_id not in m_model.metabolites:
met = cobra.Metabolite(met_id)
m_model.add_metabolites([met])
else:
met = m_model.metabolites.get_by_id(met_id)
if info["name"]:
met.name = info["name"]
if info["formula"]:
met.formula = info["formula"]
def add_manual_complexes(self):
""" Modifies complexes from manual input"""
if self.is_reference:
return
manual_complexes = self.manual_complexes
complexes_df = self.complexes_df
protein_mod = self.protein_mod
warn_manual_mod = []
warn_replace = []
for new_complex, info in tqdm.tqdm(manual_complexes.iterrows(),
'Adding manual curation of complexes...',
bar_format = bar_format,
total=manual_complexes.shape[0]):
if info["genes"]:
if new_complex not in complexes_df:
complexes_df = \
self._add_entry_to_complexes(
"",
"",
new_complex,
complexes_df,
"Manual")
complexes_df.loc[new_complex, "genes"] = info["genes"]
complexes_df.loc[new_complex, "name"] = str(info["name"])
if info["replace"]:
if info["replace"] in protein_mod.index:
protein_mod = protein_mod.drop(info["replace"])
else:
warn_replace.append(mod_complex)
if info["mod"]:
mod_complex = (
new_complex
+ "".join(
[
"_mod_{}".format(m)
for m in info['mod'].split(' AND ')
]
)
if info["mod"]
else new_complex
)
if mod_complex in protein_mod.index:
warn_manual_mod.append(mod_complex)
continue
protein_mod = self._add_entry_to_protein_mod(protein_mod,
mod_complex,
new_complex,
info["mod"],
"Manual")
complexes_df.index.name = "complex"
self.complexes_df = complexes_df
self.protein_mod = protein_mod
# Warnings
if warn_manual_mod or warn_replace:
if warn_manual_mod:
self.curation_notes['org.add_manual_complexes'].append({
'msg':'Some modifications in protein_corrections.txt are already in me_builder.org.protein_mod and were skipped.',
'triggered_by':warn_manual_mod,
'importance':'low',
'to_do':'Check whether the protein modification specified in protein_corrections.txt is correct and not duplicated.'})
if warn_replace:
self.curation_notes['org.add_manual_complexes'].append({
'msg':'Some modified proteins marked for replacement in protein_corrections.txt are not in me_builder.org.protein_mod. Did nothing.',
'triggered_by':warn_replace,
'importance':'low',
'to_do':'Check whether the marked modified protein in protein_corrections.txt for replacement is correctly defined.'})
def get_enzyme_reaction_association(self):
"""
Maps the M-model GPRs to the provided files and links reactions
to enzymatic complexes
"""
if self.is_reference:
return
m_model = self.m_model
org_complexes_df = self.complexes_df
protein_mod = self.protein_mod
gene_dictionary = (
self.gene_dictionary.reset_index()
.set_index("Accession-1")
)
generic_dict = self.generic_dict
enz_rxn_assoc_dict = {}
new_generics = {}
gpr_combination_cutoff = self.config.get("gpr_combination_cutoff",100)
for rxn in tqdm.tqdm(m_model.reactions,
'Getting enzyme-reaction associations...',
bar_format = bar_format):
if rxn.id in self.manual_curation.enz_rxn_assoc_df.data.index:
# Only complete those not in manual curation
continue
unnamed_counter = 0
rule = str(rxn.gene_reaction_rule)
if not rule:
continue
enz_rxn_assoc_dict[rxn.id] = []
#rule_list = expand_gpr(listify_gpr(rule)).split(" or ")
rule_list = coralme.builder.gpr.expand_gpr(rule,threshold=gpr_combination_cutoff)
#if rule_list != "STOP" and len(rule_list) <= gpr_combination_cutoff:
if rule_list != "STOP":
enz_rxn_assoc = []
reaction_cplx_list = []
for rule_gene_list in rule_list:
identified_genes = [i for i in rule_gene_list if i not in self.skip_genes]
if not identified_genes:
continue
cplx_id = coralme.builder.gpr.find_match(org_complexes_df["genes"].to_dict(),identified_genes)
if not cplx_id:
if len(identified_genes) > 1:
# New cplx not found in BioCyc files
cplx_id = "CPLX_{}-{}".format(rxn.id,unnamed_counter)
unnamed_counter += 1
else:
gene = identified_genes[0]
cplx_id = "{}-MONOMER".format(gene_dictionary.loc[gene]['Gene Name'])
if cplx_id not in org_complexes_df.index:
logging.warning("Adding {} to complexes from m_model".format(cplx_id))
tmp = pandas.DataFrame.from_dict({
cplx_id: {
"name": str(rxn.name),
"genes": " AND ".join(["{}()".format(g) for g in identified_genes]),
"source": "{}({})".format(m_model.id, rxn.id),
}}).T
org_complexes_df = pandas.concat([org_complexes_df, tmp], axis = 0, join = 'outer')
if cplx_id in protein_mod["Core_enzyme"].values:
# Use modifications
cplx_mods = protein_mod[
protein_mod["Core_enzyme"].eq(cplx_id)
].index
for cplx_id in cplx_mods:
if "Oxidized" in cplx_id:
reaction_cplx_list.append(cplx_id.split("_mod_Oxidized")[0])
else:
reaction_cplx_list.append(cplx_id)
else:
# Use base complex
reaction_cplx_list.append(cplx_id)
enz_rxn_assoc_dict[rxn.id] = " OR ".join(reaction_cplx_list)
else:
logging.warning('{} contains a GPR rule that has more gene combinations than the specified cutoff ({}). Generifying it.'.format(rxn.id,gpr_combination_cutoff))
listified_gpr = coralme.builder.gpr.listify_gpr(rule)
n,rule_dict = coralme.builder.gpr.generify_gpr(listified_gpr,rxn.id,d={},generic_gene_dict=new_generics)
if not rule_dict: # n in gene_dictionary.index:
product = gene_dictionary.loc[n,'Product']
rule_dict[product] = n
n = product
n,rule_dict = coralme.builder.gpr.process_rule_dict(n,rule_dict,org_complexes_df["genes"].to_dict(),protein_mod)
generified_rule = n
for cplx,rule in rule_dict.items():
if 'mod' in cplx:
cplx_id = cplx.split('_mod_')[0]
else:
cplx_id = cplx
if 'generic' in cplx_id and cplx_id not in generic_dict:
logging.warning("Adding {} to generics from m_model".format(cplx_id))
new_generics[cplx_id] = rule.split(' or ')
generic_dict[cplx_id] = {
'enzymes':[gene_dictionary.loc[i,'Product'] if i in gene_dictionary.index else i for i in rule.split(' or ')]
}
elif 'generic' not in cplx_id and cplx_id not in org_complexes_df.index:
# New cplx not found in BioCyc files
logging.warning("Adding {} to complexes from m_model".format(cplx_id))
tmp = pandas.DataFrame.from_dict({
cplx_id: {
"name": str(rxn.name),
"genes": " AND ".join(["{}()".format(g) for g in rule.split(' and ')]),
"source": "{}({})".format(m_model.id, rxn.id),
}}).T
org_complexes_df = pandas.concat([org_complexes_df, tmp], axis = 0, join = 'outer')
enz_rxn_assoc_dict[rxn.id] = generified_rule
enz_rxn_assoc_df = pandas.DataFrame.from_dict({"Complexes": enz_rxn_assoc_dict})
enz_rxn_assoc_df = enz_rxn_assoc_df.replace(
"", numpy.nan
).dropna() # Remove empty rules
if not enz_rxn_assoc_df.empty: # Only if it inferred any new GPRs
self.enz_rxn_assoc_df = pandas.concat([enz_rxn_assoc_df, self.enz_rxn_assoc_df], axis = 0, join = 'outer')
else:
logging.warning('No new GPR was inferred. If you provided all GPRs in enzyme_reaction_association.txt, no further action is needed.')
self.enz_rxn_assoc_df.index.name = "Reaction"
self.complexes_df = org_complexes_df
self.protein_mod = protein_mod
def _get_na_entries(df,col):
return list(df[df[col].isna()].index)