import re
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from .utils import get_seq
[docs]class DatasetBlock(object):
"""
By default, the data sequences block generated is NEXUS and we use BioPython
tools to convert it to other formats such as FASTA.
However, sometimes the blo
Parameters:
data (named tuple): containing:
* gene_codes: list
* number_chars: string
* number_taxa: string
* seq_records: list of SeqRecordExpanded objects
* gene_codes_and_lengths: OrderedDict
codon_positions (str): str. Can be 1st, 2nd, 3rd, 1st-2nd, ALL (default).
partitioning (str):
aminoacids (boolean):
degenerate (str):
format (str): NEXUS, PHYLIP or FASTA.
outgroup (str): Specimen code of taxon that should be used as outgroup.
"""
def __init__(self, data, codon_positions, partitioning, aminoacids=None,
degenerate=None, format=None, outgroup=None):
self.warnings = []
self.data = data
self.codon_positions = codon_positions
self.partitioning = partitioning
self.aminoacids = aminoacids
self.degenerate = degenerate
self.format = format
self.outgroup = outgroup
self._blocks = []
[docs] def dataset_block(self):
"""Creates the block with taxon names and their sequences.
Override this function if the dataset block needs to be different
due to file format.
Example:
CP100_10_Aus_aus ACGATRGACGATRA...
CP100_11_Aus_bus ACGATRGACGATRA...
...
"""
self.split_data()
out = []
for block in self._blocks:
out.append(self.convert_to_string(block))
return '\n'.join(out).strip() + '\n;\nEND;'
[docs] def split_data(self):
"""Splits the list of SeqRecordExpanded objects into lists, which are
kept into a bigger list.
If the file_format is Nexus, then it is only partitioned by gene. If it
is FASTA, then it needs partitioning by codon positions if required.
Example:
>>> blocks = [
... [SeqRecord1, SeqRecord2], # for gene 1
... [SeqRecord1, SeqRecord2], # for gene 2
... [SeqRecord1, SeqRecord2], # for gene 3
... [SeqRecord1, SeqRecord2], # for gene 4
... ]
"""
this_gene_code = None
for seq_record in self.data.seq_records:
if this_gene_code is None or this_gene_code != seq_record.gene_code:
this_gene_code = seq_record.gene_code
self._blocks.append([])
list_length = len(self._blocks)
self._blocks[list_length - 1].append(seq_record)
[docs] def convert_to_string(self, block):
"""Makes gene_block as str from list of SeqRecordExpanded objects of a gene_code.
Override this function if the dataset block needs to be different
due to file format.
This block will need to be split further if the dataset is FASTA or
TNT and the partitioning scheme is 1st-2nd, 3rd.
As the dataset is split into several blocks due to 1st-2nd, 3rd
we cannot translate to aminoacids or degenerate the sequences.
"""
if self.partitioning != '1st-2nd, 3rd':
return self.make_datablock_by_gene(block)
else:
if self.format == 'FASTA':
return self.make_datablock_considering_codon_positions_as_fasta_format(block)
else:
return self.make_datablock_by_gene(block)
[docs] def convert_block_dicts_to_string(self, block_1st2nd, block_1st, block_2nd, block_3rd):
"""Takes into account whether we need to output all codon positions."""
out = ""
# We need 1st and 2nd positions
if self.codon_positions in ['ALL', '1st-2nd']:
for gene_code, seqs in block_1st2nd.items():
out += '>{0}_1st-2nd\n----\n'.format(gene_code)
for seq in seqs:
out += seq
elif self.codon_positions == '1st':
for gene_code, seqs in block_1st.items():
out += '>{0}_1st\n----\n'.format(gene_code)
for seq in seqs:
out += seq
elif self.codon_positions == '2nd':
for gene_code, seqs in block_2nd.items():
out += '>{0}_2nd\n----\n'.format(gene_code)
for seq in seqs:
out += seq
# We also need 3rd positions
if self.codon_positions in ['ALL', '3rd']:
for gene_code, seqs in block_3rd.items():
out += '\n>{0}_3rd\n----\n'.format(gene_code)
for seq in seqs:
out += seq
return out
[docs] def make_datablock_by_gene(self, block):
out = None
for seq_record in block:
if not out:
out = '[{0}]\n'.format(seq_record.gene_code)
taxonomy_as_string = self.flatten_taxonomy(seq_record)
taxon_id = '{0}{1}'.format(seq_record.voucher_code,
taxonomy_as_string)
taxon_id = taxon_id[0:54]
sequence = get_seq(seq_record, self.codon_positions,
aminoacids=self.aminoacids,
degenerate=self.degenerate)
seq = sequence.seq
if sequence.warning:
self.warnings.append(sequence.warning)
out += '{0}{1}\n'.format(taxon_id.ljust(55), seq)
return out
[docs] def flatten_taxonomy(self, seq_record):
out = ''
if seq_record.taxonomy is None:
return out
else:
try:
out += "_" + seq_record.taxonomy['orden']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['superfamily']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['family']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['subfamily']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['tribe']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['subtribe']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['genus']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['species']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['subspecies']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['author']
except KeyError:
pass
try:
out += "_" + seq_record.taxonomy['hostorg']
except KeyError:
pass
out = out.replace(" ", "_")
out = re.sub("_$", "", out)
return re.sub('_+', '_', out)
[docs]class BasePairCount(object):
"""
Uses reading frame info, partitioning method and number of codon positions
to return corrected base pair count for charset lines.
Example:
>>> bp_count = BasePairCount(reading_frame=1, codon_positions='1st-2nd',
... partitioning='by codon position',
... count_start=100, count_end=512)
>>> bp_count.get_corrected_count()
[
'100-512',
'101-513',
]
"""
def __init__(self, reading_frame=None, codon_positions=None, partitioning=None,
count_start=None, count_end=None):
self._partitioning = self._set_partitioning(partitioning)
self._codon_positions = self._set_codon_positions(codon_positions)
self._reading_frame = self._set_reading_frame(reading_frame)
self._count_start = self._set_count_start(count_start)
self._count_end = self._set_count_end(count_end)
def _set_codon_positions(self, codon_positions):
if not codon_positions:
raise ValueError("_codon_positions argument is needed. Can't be None")
else:
return codon_positions
def _set_reading_frame(self, reading_frame):
if not reading_frame and self._partitioning in ['by codon position', '1st-2nd, 3rd']:
raise ValueError("_reading_frame argument is needed. Can't be None")
else:
return reading_frame
def _set_partitioning(self, partitioning):
if not partitioning:
raise ValueError("_partitioning argument is needed. Can't be None")
else:
return partitioning
def _set_count_start(self, count_start):
if not count_start:
raise ValueError("codon_start argument is needed. Can't be None")
else:
return count_start
def _set_count_end(self, count_end):
if not count_end:
raise ValueError("codon_end argument is needed. Can't be None")
else:
return count_end
[docs] def get_corrected_count(self):
if self._codon_positions == '1st-2nd' and self._partitioning in ['by gene',
'by codon position',
'1st-2nd, 3rd']:
return self._using_1st2nd_codons()
if self._codon_positions == 'ALL' and self._partitioning == 'by codon position':
return self._using_all_codons_partition_by_codon_position()
if self._codon_positions in ['ALL', '1st', '2nd', '3rd'] and self._partitioning == 'by gene':
return self._using_all_codons_partition_by_gene()
if self._codon_positions in ['1st', '2nd', '3rd'] and self._partitioning in ['by codon position', '1st-2nd, 3rd']:
return self._using_one_codon_position_partitioned_by_codon_position(self._codon_positions)
if self._codon_positions == 'ALL' and self._partitioning == '1st-2nd, 3rd':
return self._using_all_codons_partition_by_1st2nd_3rd()
def _using_1st2nd_codons(self):
return [
'{0}-{1}'.format(self._count_start, self._count_end),
'{0}-{1}'.format(self._count_start + 1, self._count_end),
]
def _using_all_codons_partition_by_codon_position(self):
if self._reading_frame == 1:
return [
'{0}-{1}'.format(self._count_start, self._count_end),
'{0}-{1}'.format(self._count_start + 1, self._count_end),
'{0}-{1}'.format(self._count_start + 2, self._count_end),
]
elif self._reading_frame == 2:
return [
'{0}-{1}'.format(self._count_start + 1, self._count_end),
'{0}-{1}'.format(self._count_start + 2, self._count_end),
'{0}-{1}'.format(self._count_start, self._count_end),
]
else:
return [
'{0}-{1}'.format(self._count_start + 2, self._count_end),
'{0}-{1}'.format(self._count_start, self._count_end),
'{0}-{1}'.format(self._count_start + 1, self._count_end),
]
def _using_one_codon_position_partitioned_by_codon_position(self, position):
return [
'{0}-{1}'.format(self._count_start, self._count_end),
]
def _using_all_codons_partition_by_gene(self):
return [
'{0}-{1}'.format(self._count_start, self._count_end,)
]
def _using_all_codons_partition_by_1st2nd_3rd(self):
if self._reading_frame == 1:
return [
'{0}-{1}\\3 {2}-{3}'.format(self._count_start, self._count_end,
self._count_start + 1,
self._count_end),
'{0}-{1}'.format(self._count_start + 2, self._count_end),
]
elif self._reading_frame == 2:
return [
'{0}-{1}\\3 {2}-{3}'.format(self._count_start + 1,
self._count_end,
self._count_start + 2,
self._count_end),
'{0}-{1}'.format(self._count_start, self._count_end),
]
else:
return [
'{0}-{1}\\3 {2}-{3}'.format(self._count_start + 2,
self._count_end,
self._count_start,
self._count_end),
'{0}-{1}'.format(self._count_start + 1, self._count_end),
]