#! /usr/bin/env python
############################################################################
## nexml.py
##
## Part of the PyNexml phylogenetic computation library.
##
## Copyright 2007 Jeet Sukumaran and Mark T. Holder.
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License along
## with this programm. If not, see .
##
############################################################################
"""
This module wraps routines needed for reading and writing trees in
NEXML format.
"""
import time
import textwrap
from pynexml import base
from pynexml import datasets
from pynexml import taxa
from pynexml import characters
from pynexml import trees
from pynexml import xmlparser
def _to_nexml_indent_items(items, indent="", indent_level=0):
"""
Renders list of items into a string of lines in which each line is
indented appropriately.
"""
return '\n'.join(["%s%s" % (indent * indent_level, str(item)) \
for item in items])
def _to_nexml_dict(annotes_dict, indent="", indent_level=0):
"""
Composes a nexml dict entry, given a python dictionary.
"""
main_indent = indent * indent_level
parts = []
parts.append('%s' % main_indent)
keyvals = _to_nexml_dict_keyvalues(annotes_dict=annotes_dict,
indent=indent,
indent_level=indent_level+1)
parts.append(_to_nexml_indent_items(keyvals, indent=indent, indent_level=0))
parts.append('%s' % (main_indent))
return parts
def _to_nexml_dict_keyvalues(annotes_dict, indent="", indent_level=0):
"""
Returns a list of lines corresponding to a nexml rendering of a
dictionary.
"""
parts = []
subindent = indent * (indent_level + 0)
for key, value in annotes_dict.items():
parts.append('%s%s' % (subindent, key))
anvalue = _to_nexml_dict_value(value=value[0],
type_hint=value[1],
indent=indent,
indent_level=indent_level)
parts.append(_to_nexml_indent_items(anvalue, indent, indent_level=0))
return parts
def _to_nexml_dict_value(value, type_hint=None, indent="", indent_level=0):
"""
Returns a list of lines nexml representation of a value. Right now, only deals
with lists/vector types vs 'others'. Which means dictionaries will
not get returned properly, and thus client code must handle nested
dictionaries themselves.
"""
main_indent = indent * indent_level
if type_hint is None:
value_type = _to_nexml_dict_value_type(value)
else:
value_type = type_hint
if value_type == 'boolean':
value = str(value==True).lower()
if isinstance(value, list):
value_str = "%s<%s>%s%s>" % (main_indent,
value_type,
' '.join([str(item) for item in value]),
value_type)
return [value_str]
elif isinstance(value, dict):
return _to_nexml_dict(value, indent=indent, indent_level=indent_level)
else:
return ["%s<%s>%s%s>" % (main_indent, value_type, str(value), value_type)]
def _to_nexml_dict_value_type(value):
"""
Figures out the value type, and returns and appropriate nexml
string corresponding to it.
"""
value_type = 'any'
if isinstance(value, list):
# assumes rest of the vector is same type as the first element
value_type = _to_nexml_dict_value_type(value[0]) + 'vector'
elif isinstance(value, dict):
value_type = 'dict'
else:
if type(value) == bool:
value_type = 'boolean'
elif type(value) == float:
value_type = 'float'
elif type(value) == int:
value_type = 'integer'
elif type(value) == str:
value_type = 'string'
else:
value_type = 'any'
return value_type
def _to_nexml_chartype(chartype):
"""
Returns nexml characters element attribute corresponding to given
chartype.
"""
if chartype == characters.DNA_CHARTYPE:
return "nex:DnaSeqs"
if chartype == characters.RNA_CHARTYPE:
return "nex:RnaSeqs"
return None
def _to_nexml_tree_length_type(length_type):
"""
Returns attribute string for nexml tree type depending on whether
`length_type` is an int or a float.
"""
if length_type == int:
return "nex:IntTree"
elif length_type == float:
return "nex:FloatTree"
else:
raise Exception('Unrecognized value class %s' % length_type)
def _from_nexml_tree_length_type(type_attr):
"""
Given an attribute string read from a nexml tree element, returns
the python class of the edge length attribute.
"""
if type_attr == "nex:IntTree":
return int
else:
return float
def _from_nexml_dict_value(value, value_type):
"""
A text representation of a value of type `type`, where `type`
is specified in terms of an nexml element, returns the Python
representation of the value.
"""
parsed_value = None
value = value.strip()
if value_type == "integer":
try:
parsed_value = int(value)
except ValueError:
raise Exception("Could not parse integer value")
elif value_type == "float":
try:
parsed_value = float(value)
except ValueError:
raise Exception("Could not parse float value")
elif value_type == "boolean":
try:
parsed_value = bool(value)
except ValueError:
raise Exception("Could not parse boolean value")
elif value_type == "string":
try:
parsed_value = str(value)
except ValueError:
raise Exception("Could not parse string value")
else:
# what else to do?
parsed_value = value
return parsed_value
def iterate_over_trees(file=None):
"""
Generator to iterate over trees in file without retaining any in memory.
"""
xml_doc = xmlparser.xml_document(file=file)
dataset = datasets.Dataset()
nexml_reader = NexmlReader()
nexml_reader.parse_taxa_blocks(xml_doc, dataset)
nx_tree_parser = _NexmlTreesParser()
for trees_idx, trees_element in enumerate(xml_doc.getiterator('trees')):
for tree in nx_tree_parser.parse_trees(trees_element, dataset, trees_idx, add_to_trees_block=False):
yield tree
class NexmlReader(datasets.Reader):
"""
Implements thinterface for handling NEXML files.
"""
def __init__(self):
"""
`tree_factory` is a DendroPy TreeFactory class or derived
object.
"""
datasets.Reader.__init__(self)
self.load_time = None
self.parse_time = None
## Implementation of the datasets.Reader interface ##
def read_dataset(self, src, dataset=None):
"""
Instantiates and returns a DataSet object based on the
NEXML-formatted contents read from the file descriptor object
`fileobj`. If `dataset` is given, its factory methods will be
used to instantiate objects.
"""
start = time.clock()
xml_doc = xmlparser.xml_document(file=src)
self.load_time = time.clock() - start
start = time.clock()
dataset = self.parse_dataset(xml_doc, dataset)
self.parse_time = time.clock() - start
return dataset
## Following methods are class-specific ###
def parse_dataset(self, xml_doc, dataset):
"""
Given an xml_document, parses the XmlElement representation of
taxon sets, character matrices, and trees into a DataSet object.
"""
if dataset is None:
dataset = datasets.Dataset()
self.parse_taxa_blocks(xml_doc, dataset)
self.parse_char_blocks(xml_doc, dataset)
self.parse_trees_blocks(xml_doc, dataset)
return dataset
def parse_taxa_blocks(self, xml_doc, dataset):
"""
Given an xml_document, parses the XmlElement representation of
taxon sets into a TaxaBlocks objects.
"""
nxt = _NexmlTaxaParser(self.taxa_block_factory, self.taxon_factory)
for taxa_element in xml_doc.getiterator('otus'):
taxa_block = nxt.parse_taxa(taxa_element, dataset)
def parse_char_blocks(self, xml_doc, dataset):
"""
Given an xml_document, parses the XmlElement representation of
character sequences into a list of CharacterMatrix objects.
"""
nxc = _NexmlCharBlockParser()
for char_block_element in xml_doc.getiterator('characters'):
nxc.parse_char_block(char_block_element, dataset)
def parse_trees_blocks(self, xml_doc, dataset):
"""
Given an xml_document object, parses the XmlElement structural
representations of a set of NEXML treeblocks (`nex:trees`) and
returns a TreesBlocks object corresponding to the NEXML.
"""
nx_tree_parser = _NexmlTreesParser(self.trees_block_factory, self.tree_factory, self.node_factory, self.edge_factory)
for trees_idx, trees_element in enumerate(xml_doc.getiterator('trees')):
for tree in nx_tree_parser.parse_trees(trees_element, dataset, trees_idx, add_to_trees_block=True):
pass
class _NexmlElementParser(object):
"""
Base parser class: wraps around annotations/dictionary element handling.
"""
def __init__(self):
"""
Right now, does nothing ...
"""
pass
def parse_annotations(self, annotated, nxelement):
"""
Given an nexml element, this looks for a 'dict' child element
and passes it to the dictionary parse if found. Results are
placed as attributes of `annotated`.
"""
xml_dict = nxelement.find('dict')
if xml_dict:
return self.parse_dict(annotated=annotated, xml_dict=xml_dict)
def parse_dict(self, annotated, xml_dict):
"""
This parses an xml_dict and sets the attributes of annotable
correspondingly.
"""
xml_keys = []
xml_values = []
for child in xml_dict.getchildren():
if child.tag == 'key':
xml_keys.append(child)
else:
xml_values.append(child)
if len(xml_keys) > 0 or len(xml_values) > 0:
if len(xml_keys) == len(xml_values):
xml_keyvals = dict(zip(xml_keys, xml_values))
self.parse_keyvals(annotated, xml_keyvals)
else:
raise Exception("Unequal numbers of keys and values in annotations")
def parse_keyvals(self, annotated, xml_keyvals):
"""
Given a dictionary where the keys are nexml dict key
XmlElements and the values are nexl dict value XmlElements
corresponding to those keys, this will parse the elements into
the attributes of an Annotable object.
"""
for xml_key, xml_value in xml_keyvals.items():
an_key = xml_key.text
an_value = None
if xml_value.tag == 'dict':
subannotable = base.Annotated()
self.parse_dict(subannotable, xml_value)
an_value = subannotable
elif xml_value.tag.count('vector'):
an_value = []
vector_text = xml_value.text
vector_text = vector_text.strip('\n').strip('\r').strip()
vector_type = xml_value.tag.replace('vector', '')
if vector_type == 'dict':
## must handle it here:
## loop through child elements of xml_value,
## parsing the dicts and building up a list of
## Annotable objects
raise NotImplementedError
else:
vector_items = vector_text.split()
for item in vector_items:
an_value.append(_from_nexml_dict_value(item, vector_type))
else:
an_value = _from_nexml_dict_value(xml_value.text, xml_value.tag)
if an_key is not None and an_value is not None:
setattr(annotated, an_key, an_value)
annotated.annotate(an_key)
class _NexmlTreesParser(_NexmlElementParser):
"""
Parses an XmlElement representation of NEXML format tree blocks.
"""
def __init__(self, trees_block_factory=None, tree_factory=None, node_factory=None, edge_factory=None):
"""
Must be given tree factory to create trees.
"""
super(_NexmlTreesParser, self).__init__()
if trees_block_factory is None:
self.trees_block_factory = trees.TreesBlock
else:
self.trees_block_factory = trees_block_factory
if tree_factory is None:
self.tree_factory = trees.Tree
else:
self.tree_factory = tree_factory
if node_factory is None:
self.node_factory = trees.Node
else:
self.node_factory = node_factory
if edge_factory is None:
self.edge_factory = trees.TreesBlock
else:
self.edge_factory = edge_factory
def parse_trees(self, nxtrees, dataset, trees_idx=None, add_to_trees_block=True):
"""
Given an XmlElement object representing a NEXML treeblock,
self.nxtrees (corresponding to a `nex:trees` element), this
will construct and return a TreesBlock object defined by the
underlying NEXML. If `add_to_trees_block` is False, then each tree,
*IS NOT ADDED TO THE DATASET*.
"""
elem_id = nxtrees.get('id', "Trees" + str(trees_idx))
label = nxtrees.get('label', None)
taxa_id = nxtrees.get('otus', None)
if taxa_id is None:
raise Exception("Taxa block not specified for trees block \"%s\"" % trees_block.elem_id)
taxa_block = dataset.find_taxa_block(elem_id = taxa_id)
if not taxa_block:
raise Exception("Taxa block \"%s\" not found" % taxa_id)
taxa_block = taxa_block
trees_block = dataset.add_trees_block(taxa_block=taxa_block,
trees_block=self.trees_block_factory(elem_id=elem_id, label=label))
self.parse_annotations(annotated=trees_block, nxelement=nxtrees)
tree_counter = 0
for tree_element in nxtrees.getiterator('tree'):
tree_counter = tree_counter + 1
elem_id = tree_element.get('id', tree_counter)
label = tree_element.get('label', '')
treeobj = self.tree_factory(elem_id=elem_id, label=label)
tree_type_attr = tree_element.get('{http://www.w3.org/2001/XMLSchema-instance}type')
treeobj.length_type = _from_nexml_tree_length_type(tree_type_attr)
self.parse_annotations(annotated=treeobj, nxelement=tree_element)
nodes = self.parse_nodes(tree_element, taxa_block=trees_block.taxa_block, node_factory=self.node_factory)
edges = self.parse_edges(tree_element, length_type=treeobj.length_type, edge_factory=self.edge_factory)
for edge in edges.values():
# EDGE-ON-ROOT:
# allow "blank" tail nodes: so we only enforce
# this check if tail node id is specified
if edge.tail_node_id and edge.tail_node_id not in nodes:
msg = 'Edge "%s" specifies a non-defined ' \
'source node ("%s")\nCurrent nodes: %s' % (edge.elem_id,
edge.tail_node_id,
(','.join([n for n in nodes])))
raise Exception(msg)
if edge.head_node_id not in nodes:
msg = 'Edge "%s" specifies a non-defined ' \
'target node ("%s")\nCurrent nodes: %s' % (edge.elem_id,
edge.head_node_id,
(','.join([n.elem_id for n in nodes])))
raise Exception(msg)
if edge.head_node_id and edge.tail_node_id:
head_node = nodes[edge.head_node_id]
head_node.edge = edge
tail_node = nodes[edge.tail_node_id]
tail_node.add_child(head_node)
elif edge.head_node_id and not edge.tail_node_id:
head_node = nodes[edge.head_node_id]
head_node.edge = edge
# find node(s) without parent
parentless = []
for node in nodes.values():
if node.parent_node == None:
parentless.append(node)
# If one parentless node found, this is the root: we use
# it as the tree head node. If multiple parentless nodes
# are found, then we add them all as children of the
# existing head node. If none, then we have some sort of
# cyclicity, and we are not dealing with a tree.
if len(parentless) == 1:
treeobj.seed_node = parentless[0]
elif len(parentless) > 1:
for node in parentless:
treeobj.seed_node.add_child(node)
else:
raise Exception("Structural error: tree must be acyclic.")
rootedge = self.parse_root_edge(tree_element, length_type=treeobj.length_type, edge_factory=self.edge_factory)
if rootedge:
if rootedge.head_node_id not in nodes:
msg = 'Edge "%s" specifies a non-defined ' \
'target node ("%s")\nCurrent nodes: %s' % (edge.elem_id,
edge.head_node_id,
(','.join([n.elem_id for n in nodes])))
raise Exception(msg)
else:
nodes[rootedge.head_node_id].edge = rootedge
### should we make this node the seed node by rerooting the tree here? ###
else:
treeobj.seed_node.edge = None
if add_to_trees_block:
trees_block.append(treeobj)
yield treeobj
def parse_nodes(self, tree_element, taxa_block, node_factory):
"""
Given an XmlElement representation of a NEXML tree element,
(`nex:tree`) this will return a dictionary of DendroPy Node
objects created with the node factory method, self.new_node,
with the node_id as the key.
"""
nodes = {}
for nxnode in tree_element.getiterator('node'):
node_id = nxnode.get('id', None)
nodes[node_id] = node_factory()
nodes[node_id].elem_id = node_id
nodes[node_id].label = nxnode.get('label', None)
taxon_id = nxnode.get('otu', None)
if taxon_id is not None:
taxon = taxa_block.find_taxon(elem_id=taxon_id, update=False)
if not taxon:
raise Exception('Taxon with id "%s" not defined in taxa block "%s"' % (taxon_id, taxa.elem_id))
nodes[node_id].taxon = taxon
self.parse_annotations(annotated=nodes[node_id], nxelement=nxnode)
return nodes
def parse_root_edge(self, tree_element, length_type, edge_factory):
"""
Returns the edge subtending the root node, or None if not defined.
"""
rootedge = tree_element.find('rootedge')
if rootedge:
edge = edge_factory()
edge.head_node_id = rootedge.get('target', None)
edge.elem_id = rootedge.get('id', 'e' + str(id(edge)))
edge_length_str = length_type(rootedge.get('length', '0.0'))
edge.rootedge = True
edge_length = None
try:
edge_length = length_type(edge_length_str)
except:
msg = 'Edge %d ("%s") `length` attribute is not a %s' \
% (edge_counter, edge.elem_id, str(length_type))
raise Exception(msg)
edge.length = edge_length
self.parse_annotations(annotated=edge, nxelement=rootedge)
return edge
else:
return None
def parse_edges(self, tree_element, length_type, edge_factory):
"""
Given an XmlElement representation of a NEXML tree element
this will return a dictionary of DendroPy Edge objects created with
the edge factory method, self.new_edge, with the elem_id as
key. As at this stage, this method knows nothing about defined
nodes, the Edge tail_node and head_node properties of the
Edge are not set, but the tail_node_id and head_node_id are.
"""
edges = {}
edge_counter = 0
for nxedge in tree_element.getiterator('edge'):
edge = edge_factory()
edge_counter = edge_counter + 1
edge.tail_node_id = nxedge.get('source', None)
edge.head_node_id = nxedge.get('target', None)
edge.elem_id = nxedge.get('id', 'e' + str(edge_counter))
edge_length_str = length_type(nxedge.get('length', '0.0'))
if not edge.tail_node_id:
msg = 'Edge %d ("%s") does not have a source' \
% (edge_counter, edge.elem_id)
raise Exception(msg)
if not edge.head_node_id:
msg = 'Edge %d ("%s") does not have a target' \
% (edge_counter, edge.elem_id)
raise Exception(msg)
edge_length = None
try:
edge_length = length_type(edge_length_str)
except:
msg = 'Edge %d ("%s") `length` attribute is not a %s' \
% (edge_counter, edge.elem_id, str(length_type))
raise Exception(msg)
edge.length = edge_length
self.parse_annotations(annotated=edge, nxelement=nxedge)
edges[edge.elem_id] = edge
return edges
class _NexmlTaxaParser(_NexmlElementParser):
"""
Parses an XmlElement representation of NEXML taxa blocks.
"""
def __init__(self, taxa_block_factory=None, taxon_factory=None):
"""
Does nothing too useful right now.
"""
super(_NexmlTaxaParser, self).__init__()
if taxa_block_factory is None:
self.taxa_block_factory = taxa.TaxaBlock
else:
self.taxa_block_factory = taxa_block_factory
if taxon_factory is None:
self.taxon_factory = taxa.Taxon
else:
self.taxon_factory = taxon_factory
def parse_taxa(self, nxtaxa, dataset):
"""
Given an XmlElement representing a nexml taxa block, this
instantiates and returns a corresponding DendroPy Taxa object.
"""
elem_id = nxtaxa.get('id', None)
label = nxtaxa.get('label', None)
taxa_block = self.taxa_block_factory(elem_id=elem_id, label=label)
self.parse_annotations(annotated=taxa_block, nxelement=nxtaxa)
for idx, nxtaxon in enumerate(nxtaxa.getiterator('otu')):
taxon = self.taxon_factory(nxtaxon.get('id', "s" + str(idx) ), nxtaxon.get('label', "Taxon" + str(idx)))
self.parse_annotations(annotated=taxon, nxelement=nxtaxon)
taxa_block.append(taxon)
dataset.taxa_blocks.append(taxa_block)
class _NexmlCharBlockParser(_NexmlElementParser):
"""
Parses an XmlElement representation of NEXML taxa blocks.
"""
def __init__(self):
"""
Does nothing too useful right now.
"""
super(_NexmlCharBlockParser, self).__init__()
# if char_block_factory is None:
# self.char_block_factory = characters.CharBlock()
# else:
# self.char_block_factory = char_block_factory
def parse_ambiguous_state(self, nxambiguous, state_alphabet):
"""
Parses an XmlElement represent an ambiguous discrete character state,
("uncertain_state_set")
and returns a corresponding StateAlphabetElement object.
"""
state = characters.StateAlphabetElement(elem_id=nxambiguous.get('id', None),
label=nxambiguous.get('label', None),
symbol=nxambiguous.get('symbol', None),
token=nxambiguous.get('token', None))
state.member_states = []
for nxmember in nxambiguous.getiterator('member'):
member_state_id = nxmember.get('state', None)
member_state = state_alphabet.get_state('elem_id', member_state_id)
state.member_states.append(member_state)
state.multistate = characters.StateAlphabetElement.AMBIGUOUS_STATE
return state
def parse_polymorphic_state(self, nxpolymorphic, state_alphabet):
"""
Parses an XmlElement represent a polymorphic discrete character state,
("polymorphic_state_set")
and returns a corresponding StateAlphabetElement object.
"""
state = characters.StateAlphabetElement(elem_id=nxpolymorphic.get('id', None),
label=nxpolymorphic.get('label', None),
symbol=nxpolymorphic.get('symbol', None),
token=nxpolymorphic.get('token', None))
state.member_states = []
for nxmember in nxpolymorphic.getiterator('member'):
member_state_id = nxmember.get('state', None)
member_state = state_alphabet.get_state('elem_id', member_state_id)
state.member_states.append(member_state)
for nxambiguous in nxpolymorphic.getiterator('uncertain_state_set'):
state.member_states.append(self.parse_ambiguous_state(nxambiguous, state_alphabet))
state.multistate = characters.StateAlphabetElement.POLYMORPHIC_STATE
return state
def parse_state_alphabet(self, nxstates):
"""
Given an XmlElement representing a nexml definition of (discrete or standard) states
("states"), this returns a corresponding StateAlphabet object.
"""
state_alphabet = characters.StateAlphabet(elem_id=nxstates.get('id', None),
label=nxstates.get('label', None))
for nxstate in nxstates.getiterator('state'):
state = characters.StateAlphabetElement(elem_id=nxstate.get('id', None),
label=nxstate.get('label', None),
symbol=nxstate.get('symbol', None),
token=nxstate.get('token', None))
state_alphabet.append(state)
for nxstate in nxstates.getiterator('uncertain_state_set'):
state_alphabet.append(self.parse_ambiguous_state(nxstate, state_alphabet))
for nxstate in nxstates.getiterator('polymorphic_state_set'):
state_alphabet.append(self.parse_polymorphic_state(nxstate, state_alphabet))
return state_alphabet
def parse_characters_format(self, nxformat, char_block):
"""
Given an XmlElement format element ("format"), this parses the
state definitions (if any) and characters (column definitions, if any),
and populates the given char_block accordingly.
"""
if nxformat is not None:
for nxstates in nxformat.getiterator('states'):
char_block.state_alphabets.append(self.parse_state_alphabet(nxstates))
for nxchars in nxformat.getiterator('char'):
col = characters.ColumnType(elem_id=nxchars.get('id', None))
char_state_set_id = nxchars.get('states')
if char_state_set_id is not None:
state_alphabet = None
for state_sets in char_block.state_alphabets:
if state_sets.elem_id == char_state_set_id:
state_alphabet = state_sets
break
if state_alphabet is None:
raise Exception("State set '%s' no defined" % char_state_set_id)
col.state_alphabet = state_alphabet
char_block.column_types.append(col)
def parse_char_block(self, nxchars, dataset):
"""
Given an XmlElement representing a nexml characters block, this
instantiates and returns a corresponding DendroPy CharacterMatrix object.
"""
nxchartype = nxchars.get('{http://www.w3.org/2001/XMLSchema-instance}type', None)
if nxchartype.startswith('nex:Dna'):
char_block = characters.DnaCharactersBlock()
elif nxchartype.startswith('nex:Rna'):
char_block = characters.RnaCharactersBlock()
elif nxchartype.startswith('nex:Protein'):
char_block = characters.ProteinCharactersBlock()
elif nxchartype.startswith('nex:Restriction'):
char_block = characters.RestrictionSitesCharactersBlock()
elif nxchartype.startswith('nex:Standard'):
char_block = characters.StandardCharactersBlock()
elif nxchartype.startswith('nex:Continuous'):
char_block = characters.ContinuousCharactersBlock()
else:
raise NotImplementedError('Character Block %s (\"%s\"): Character type "%s" not supported.'
% (char_block.elem_id, char_block.label, nxchartype))
elem_id = nxchars.get('id', None)
label = nxchars.get('label', None)
char_block.elem_id = elem_id
char_block.label = label
taxa_id = nxchars.get('otus', None)
if taxa_id is None:
raise Exception("Character Block %s (\"%s\"): Taxa block not specified for trees block \"%s\"" % (char_block.elem_id, char_block.label, char_block.elem_id))
taxa_block = dataset.find_taxa_block(elem_id = taxa_id)
if not taxa_block:
raise Exception("Character Block %s (\"%s\"): Taxa block \"%s\" not found" % (char_block.elem_id, char_block.label, taxa_id))
char_block.taxa_block = taxa_block
self.parse_annotations(annotated=char_block, nxelement=nxchars)
nxformat = nxchars.find('format')
if nxformat is not None:
self.parse_characters_format(nxformat, char_block)
matrix = nxchars.find('matrix')
self.parse_annotations(annotated=char_block.matrix, nxelement=matrix)
if char_block.column_types:
id_column_map = char_block.id_column_map()
column_ids = [char.elem_id for char in char_block.column_types]
else:
id_column_map = {}
column_ids = []
for nxrow in matrix.getiterator('row'):
row_id = nxrow.get('id', None)
label = nxrow.get('label', None)
taxon_id = nxrow.get('otu', None)
taxon = taxa_block.find_taxon(elem_id=taxon_id, update=False)
if not taxon:
raise Exception('Character Block %s (\"%s\"): Taxon with id "%s" not defined in taxa block "%s"' % (char_block.elem_id, char_block.label, taxon_id, taxa.elem_id))
character_vector = characters.CharacterDataVector(elem_id=row_id, label=label, taxon=taxon)
self.parse_annotations(annotated=character_vector, nxelement=nxrow)
if isinstance(char_block, characters.ContinuousCharactersBlock):
if nxchartype.endswith('Seqs'):
char_block.markup_as_sequences = True
seq = nxrow.findtext('seq')
if seq is not None:
seq = seq.replace('\n\r', ' ').replace('\r\n', ' ').replace('\n', ' ').replace('\r',' ')
for char in seq.split(' '):
char = char.strip()
if char:
character_vector.append(characters.CharacterDataCell(value=float(char)))
else:
char_block.markup_as_sequences = False
for nxcell in nxrow.getiterator('cell'):
column_id = nxcell.get('char', None)
pos_idx = column_ids.index(column_id)
# column = id_column_map[column_id]
# state = column.state_id_map[cell.get('state', None)]
cell = characters.CharacterDataCell(value=float(nxcell.get('state')), column_type=id_column_map[column_id])
self.parse_annotations(annotated=cell, nxelement=nxcell)
character_vector.set_cell_by_index(pos_idx, cell)
else:
if nxchartype.endswith('Seqs'):
char_block.markup_as_sequences = True
symbol_state_map = char_block.default_state_alphabet.symbol_state_map()
seq = nxrow.findtext('seq')
if seq is not None:
seq = seq.replace(' ', '').replace('\n', '').replace('\r', '')
for char in seq:
if char in symbol_state_map:
state = symbol_state_map[char]
else:
raise NameError('Character Block %s (\"%s\"): State with symbol "%s" in sequence "%s" not defined' % (char_block.elem_id, char_block.label, char, seq))
character_vector.append(characters.CharacterDataCell(value=state))
else:
char_block.markup_as_sequences = False
id_state_maps = {}
for nxcell in nxrow.getiterator('cell'):
column_id = nxcell.get('char', None)
column = id_column_map[column_id]
pos_idx = column_ids.index(column_id)
if column_id not in id_state_maps:
id_state_maps[column_id] = column.state_alphabet.id_state_map()
state = id_state_maps[column_id][nxcell.get('state')]
cell = characters.CharacterDataCell(value=state, column_type=column)
self.parse_annotations(annotated=cell, nxelement=nxcell)
character_vector.set_cell_by_index(pos_idx, cell)
char_block[taxon] = character_vector
dataset.char_blocks.append(char_block)
class NexmlWriter(datasets.Writer):
"""
Implements the DataWriter interface for handling NEXML files.
"""
def __init__(self):
"""
Calls the base class constructor.
"""
datasets.Writer.__init__(self)
self.indent = " "
### datasets.Writer interface ###
def write_dataset(self, dataset, dest):
"""
Writes a list of DendroPy Tree objects to a full NEXML
document.
"""
self.write_to_nexml_open(dest, indent_level=0)
self.write_taxa_blocks(taxa_blocks=dataset.taxa_blocks, dest=dest)
self.write_char_blocks(char_blocks=dataset.char_blocks, dest=dest)
self.write_trees_blocks(trees_blocks=dataset.trees_blocks, dest=dest)
self.write_to_nexml_close(dest, indent_level=0)
### class-specific ###
def write_taxa_blocks(self, taxa_blocks, dest, indent_level=1):
"""
Writes out TaxaBlocks.
"""
for idx, taxa_block in enumerate(taxa_blocks):
dest.write(self.indent * indent_level)
parts = []
parts.append('otus')
if taxa_block.elem_id is not None:
parts.append('id="%s"' % taxa_block.elem_id)
else:
raise Exception("Taxa block given without ID")
if taxa_block.label:
parts.append('label="%s"' % taxa_block.label)
dest.write("<%s>\n" % ' '.join(parts))
# annotate
if isinstance(taxa_block, base.Annotated) and taxa_block.has_annotations():
self.write_annotations(taxa_block, dest, indent_level=indent_level+1)
for taxon in taxa_block:
dest.write(self.indent * (indent_level+1))
parts = []
parts.append('otu')
if taxon.elem_id is not None:
parts.append('id="%s"' % taxon.elem_id)
else:
raise Exception("Taxon without ID")
if taxon.label:
parts.append('label="%s"' % taxon.label)
if isinstance(taxon, base.Annotated) and taxon.has_annotations():
dest.write("<%s>\n" % ' '.join(parts))
self.write_annotations(taxon, dest, indent_level=indent_level+2)
dest.write(self.indent * (indent_level+1))
dest.write("\n")
else:
dest.write("<%s />\n" % ' '.join(parts))
dest.write(self.indent * indent_level)
dest.write('\n')
def write_trees_blocks(self, trees_blocks, dest, indent_level=1):
"""
Writes out TreesBlocks.
"""
for idx, trees_block in enumerate(trees_blocks):
dest.write(self.indent * indent_level)
parts = []
parts.append('trees')
if trees_block.elem_id is not None:
parts.append('id="%s"' % trees_block.elem_id)
else:
raise Exception("Tree block given without ID")
if trees_block.label:
parts.append('label="%s"' % trees_block.label)
parts.append('otus="%s"' % trees_block.taxa_block.elem_id)
dest.write("<%s>\n" % ' '.join(parts))
# annotate
if isinstance(trees_block, base.Annotated) and trees_block.has_annotations():
self.write_annotations(trees_block, dest, indent_level=indent_level+1)
for tree in trees_block:
self.write_tree(tree=tree, dest=dest, indent_level=2)
dest.write(self.indent * indent_level)
dest.write('\n')
def compose_state_definition(self, state, indent_level):
"""
Writes out state definition.
"""
parts = []
if state.multistate == characters.StateAlphabetElement.SINGLE_STATE:
parts.append('%s'
% (self.indent * indent_level, state.elem_id, state.symbol))
else:
if state.multistate == characters.StateAlphabetElement.AMBIGUOUS_STATE:
tag = "uncertain_state_set"
else:
tag = "polymorphic_state_set"
parts.append('%s<%s id="%s" symbol="%s">'
% (self.indent * indent_level, tag, state.elem_id, state.symbol))
for member in state.member_states:
parts.extend(self.compose_state_definition(member, indent_level+1))
parts.append("%s%s>" % ((self.indent * indent_level), tag))
return parts
def write_char_blocks(self, char_blocks, dest, indent_level=1):
"""
Writes out character matrices.
"""
for idx, char_block in enumerate(char_blocks):
dest.write(self.indent * indent_level)
parts = []
parts.append('characters')
if char_block.elem_id is not None:
parts.append('id="%s"' % char_block.elem_id)
else:
raise Exception("Character block without ID")
if char_block.label:
parts.append('label="%s"' % char_block.label)
parts.append('otus="%s"' % char_block.taxa_block.elem_id)
if isinstance(char_block, characters.DnaCharactersBlock):
xsi_datatype = 'nex:Dna'
elif isinstance(char_block, characters.RnaCharactersBlock):
xsi_datatype = 'nex:Rna'
elif isinstance(char_block, characters.ProteinCharactersBlock):
xsi_datatype = 'nex:Protein'
elif isinstance(char_block, characters.RestrictionSitesCharactersBlock):
xsi_datatype = 'nex:Restriction'
elif isinstance(char_block, characters.StandardCharactersBlock):
xsi_datatype = 'nex:Standard'
elif isinstance(char_block, characters.ContinuousCharactersBlock):
xsi_datatype = 'nex:Continuous'
else:
raise Exception("Unrecognized character block data type.")
if char_block.markup_as_sequences:
xsi_markup = 'Seqs'
else:
xsi_markup = 'Cells'
xsi_type = xsi_datatype + xsi_markup
parts.append('xsi:type="%s"' % xsi_type)
dest.write("<%s>\n" % ' '.join(parts))
# annotate
if isinstance(char_block, base.Annotated) and char_block.has_annotations():
self.write_annotations(char_block, dest, indent_level=indent_level+1)
state_alphabet_parts = []
if isinstance(char_block, characters.StandardCharactersBlock):
for state_alphabet in char_block.state_alphabets:
state_alphabet_parts.append('%s'
% (self.indent * (indent_level+2), state_alphabet.elem_id))
for state in state_alphabet:
state_alphabet_parts.extend(self.compose_state_definition(state, indent_level+3))
state_alphabet_parts.append('%s' % (self.indent * (indent_level+2)))
column_types_parts = []
if char_block.column_types:
for column in char_block.column_types:
if column.state_alphabet:
column_state = 'states="%s" ' % column.state_alphabet.elem_id
else:
column_state = ' '
column_types_parts.append('%s'
% ((self.indent*(indent_level+1)), column.elem_id, column_state))
if state_alphabet_parts or column_types_parts:
dest.write("%s\n" % (self.indent*(indent_level+1)))
if state_alphabet_parts:
dest.write(('\n'.join(state_alphabet_parts)) + '\n')
if column_types_parts:
dest.write(('\n'.join(column_types_parts)) + '\n')
pass
dest.write("%s\n" % (self.indent*(indent_level+1)))
dest.write("%s\n" % (self.indent * (indent_level+1)))
if isinstance(char_block.matrix, base.Annotated) and char_block.matrix.has_annotations():
self.write_annotations(char_block.matrix, dest, indent_level=indent_level+1)
for taxon, row in char_block.matrix.items():
dest.write(self.indent*(indent_level+2))
parts = []
parts.append('row')
if row.elem_id is not None:
parts.append('id="%s"' % row.elem_id)
else:
raise Exception("Row without ID")
if taxon:
parts.append('otu="%s"' % taxon.elem_id)
dest.write("<%s>\n" % ' '.join(parts))
if isinstance(row, base.Annotated) and row.has_annotations():
self.write_annotations(row, dest, indent_level=indent_level+3)
if char_block.markup_as_sequences:
### actual sequences get written here ###
if isinstance(char_block, characters.DnaCharactersBlock) \
or isinstance(char_block, characters.RnaCharactersBlock) \
or isinstance(char_block, characters.ProteinCharactersBlock) \
or isinstance(char_block, characters.RestrictionSitesCharactersBlock):
separator = ''
break_long_words = True
else:
# Standard or Continuous
separator = ' '
break_long_words = False
seqlines = textwrap.fill(separator.join([str(c) for c in row]),
width=70,
initial_indent=self.indent*(indent_level+3) + "",
subsequent_indent=self.indent*(indent_level+4),
break_long_words=break_long_words)
seqlines = seqlines + "\n"
dest.write(seqlines)
else:
for cell in row:
parts = []
parts.append('%s| \n')
self.write_annotations(cell, dest, indent_level=indent_level+4)
dest.write('%s | ' % (self.indent*(indent_level+3)))
else:
dest.write('/>\n')
dest.write(self.indent * (indent_level+2))
dest.write('\n')
dest.write("%s\n" % (self.indent * (indent_level+1)))
dest.write(self.indent * indent_level)
dest.write('\n')
def write_tree(self, tree, dest, indent_level=0):
"""
Writes a single DendroPy Tree object as a NEXML nex:tree
element.
"""
parts = []
parts.append('tree')
if hasattr(tree, 'elem_id') and tree.elem_id is not None:
parts.append('id="%s"' % tree.elem_id)
else:
parts.append('id="%s"' % ("Tree" + str(id(tree))))
if hasattr(tree, 'label') and tree.label:
parts.append('label="%s"' % tree.label)
if hasattr(tree, 'length_type') and tree.length_type:
parts.append('xsi:type="%s"' % _to_nexml_tree_length_type(tree.length_type))
else:
parts.append('xsi:type="nex:FloatTree"')
parts = ' '.join(parts)
dest.write('%s<%s>\n'
% (self.indent * indent_level, parts))
# annotate
if isinstance(tree, base.Annotated) and tree.has_annotations():
self.write_annotations(tree, dest, indent_level=indent_level+1)
for node in tree.preorder_node_iter():
self.write_node(node=node, dest=dest, indent_level=indent_level+1)
for edge in tree.preorder_edge_iter():
self.write_edge(edge=edge, dest=dest, indent_level=indent_level+1)
dest.write('%s\n' % (self.indent * indent_level))
def write_to_nexml_open(self, dest, indent_level=0):
"""
Writes the opening tag for a nexml element.
"""
parts = []
parts.append('')
parts.append('\n'
% (self.indent * (indent_level+1)))
dest.write('\n'.join(parts))
def write_to_nexml_close(self, dest, indent_level=0):
"""
Closing tag for a nexml element.
"""
dest.write('%s' % (self.indent*indent_level))
def write_node(self, node, dest, indent_level=0):
"""
Writes out a NEXML node element.
"""
parts = []
parts.append('\n')
self.write_annotations(node, dest, indent_level=indent_level+1)
dest.write('%s\n' % (self.indent * indent_level))
else:
dest.write(' />\n')
def write_edge(self, edge, dest, indent_level=0):
"""
Writes out a NEXML edge element.
"""
if edge and edge.head_node:
parts = []
if edge.tail_elem_id != None:
tag = "edge"
parts.append('<%s' % tag)
parts.append('source="%s"' % edge.tail_elem_id)
else:
# EDGE-ON-ROOT:
tag = "rootedge"
parts.append('<%s' % tag)
if edge.head_elem_id != None:
parts.append('target="%s"' % edge.head_elem_id)
if hasattr(edge, 'elem_id') and edge.elem_id:
parts.append('id="%s"' % edge.elem_id)
if hasattr(edge, 'length') and edge.length != None:
parts.append('length="%s"' % edge.length)
# only write if we have more than just the 'edge' and '/' bit
if len(parts) > 2:
parts = ' '.join(parts)
dest.write('%s%s' % ((self.indent * indent_level), parts))
if edge.has_annotations():
dest.write('>\n')
self.write_annotations(edge, dest,
indent_level=indent_level+1)
dest.write('%s%s>\n' % ((self.indent * indent_level), tag))
else:
dest.write(' />\n')
def write_annotations(self, annotated, dest, indent_level=0):
"""
Writes out annotations for an Annotable object.
"""
annotes_dict = annotated.annotations()
if len(annotes_dict) > 0:
parts = _to_nexml_dict(annotes_dict, self.indent, indent_level)
parts = '\n'.join(parts)
dest.write(parts + '\n')
def basic_test():
source = "tests/sources/comprehensive.xml"
nexmlr = NexmlReader()
dataset = nexmlr.read_dataset(source)
target = "tests/output/parsed.xml"
nexmlw = NexmlWriter()
print
print
#print nexmlw.compose_dataset(dataset)
output = open(target, 'w')
nexmlw.store_dataset(dataset=dataset, destination=output)
if __name__ == "__main__":
basic_test()