Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 59 additions & 57 deletions co/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@
from functools import reduce
import logging

from Bio import Alphabet
from Bio.Seq import Seq
import operator
from Bio.SeqFeature import FeatureLocation, SeqFeature
from Bio.SeqRecord import SeqRecord
from intervaltree import IntervalTree
import itertools
import six

from co.difference import Diff
from co.feature import Feature, ComponentFeatureSet, Source
from co.translation import OverlapError, MutableTranslationTable
from co.feature import Feature, ComponentFeatureSet
from co.translation import OverlapError, MutableTranslationTable, shift_feature_location, TranslationTableChain


# logging.basicConfig(level=logging.DEBUG)

class Component(object):
"""
.. attribute:: features
Expand Down Expand Up @@ -118,7 +121,14 @@ def seq(self):
def tt(self, ancestor=None):
if ancestor in (None, self._parent):
return self._mutations_tt
raise KeyError("Translation table from {} to {} cannot be accessed.".format(repr(self), repr(ancestor)))

try:
ancestors = list(self.get_lineage())
i = ancestors.index(ancestor)
ancestors = [self] + ancestors[:i]
return TranslationTableChain(a.tt() for a in ancestors)
except ValueError:
raise KeyError("Translation table from {} to {} cannot be accessed.".format(repr(self), repr(ancestor)))

@classmethod
def combine(cls, *components, **kwargs):
Expand All @@ -140,7 +150,7 @@ def combine(cls, *components, **kwargs):
offset = 0
for component in components:
for feature in component.features:
combined.features.add(feature._shift(offset)) #move(offset + feature.position, component=combined))
combined.features.add(Feature.translate(feature, offset, combined)) #move(offset + feature.position, component=combined))
offset += len(component)
else:
offset = 0
Expand All @@ -155,26 +165,11 @@ def combine(cls, *components, **kwargs):

return combined


@property
def parent(self):
return self._parent

@staticmethod
def _translate_feature(feature, component, tt=None):
if tt is None:
tt = component.tt(feature.component)
#
# logging.debug('TRANSLATE {}'.format(feature))
# logging.debug('TRANSLATE T {}'.format(list(enumerate(tt))))

offset = tt[feature.location.start] - feature.location.start

# FIXME does not include ref and ref_db!
# TODO create a "broken reference" object and re-attach here
return feature._shift(offset, component)

def mutate(self, mutations, strict=True):
def mutate(self, mutations, strict=True, transform=None):
"""
Creates a copy of this :class:`Component` and applies all ``mutations`` in order.

Expand All @@ -195,13 +190,20 @@ def mutate(self, mutations, strict=True):
handle.

"""
component = Component(seq=self._seq, parent=self) # TODO use __new__ instead
if transform is None:
transform = Feature.transform


component = Component(seq=self._seq, parent=self, feature_class=self.features._feature_class) # TODO use __new__ instead
features = component.features

tt = MutableTranslationTable(size=len(self._seq))

sequence = self.seq.tomutable()

changed_features = set()
changed_features = IntervalTree()

# TODO check that mutations do not overlap

# check that all mutations are in range:
for mutation in mutations:
Expand All @@ -221,7 +223,7 @@ def mutate(self, mutations, strict=True):

if strict:
translated_start = tt[mutation.position]
else:
else: # TODO get rid of non-strict mode
try:
translated_start = tt.ge(mutation.position)
logging.debug('translated start: nonstrict=%s; strict=%s', translated_start, tt[mutation.position])
Expand Down Expand Up @@ -251,44 +253,41 @@ def mutate(self, mutations, strict=True):
logging.debug('new features: {}'.format(list(features)))
logging.info('features in mutation: {}'.format(affected_features))

for feature in affected_features | changed_features:
# logging.debug(list(range(len(self.sequence))))
# logging.debug(list(tt))
logging.info('{} with sequence "{}" affected by {}.'.format(feature, feature.seq, mutation))

if feature.end < mutation.start or feature.start > mutation.end:
continue
for interval in itertools.chain(
changed_features.search(mutation.start - 1, mutation.end),
affected_features):

# TODO move this into a previous loop:
try:
changed_features.remove(feature)
except KeyError:
changed_features.remove(interval)
feature = interval.data
feature_start = interval.begin
feature_end = interval.end
except ValueError:
feature = interval
features.remove(feature)
feature_start = feature.start
feature_end = feature.end

# TODO find untranslated equivalents and replace when all is over.
if mutation.start > feature.start:
if mutation.end < feature.end - 1 or mutation.size == 0: # mutation properly contained in feature.
logging.debug('FMMF from {} to {}({})'.format(
feature.start, # tt.ge(feature.start),
feature.end, # tt.le(feature.end),
feature.end - mutation.size + mutation.new_size))

changed_features.add(feature._move(feature.start,
feature.end - mutation.size + mutation.new_size))
else:
logging.debug('FMFM from {} to {}'.format(
tt[feature.start],
tt[mutation.start - 1])) # tt[mutation.start] ?

changed_features.add(feature._move(feature.start, mutation.start))
else: # mutation.start >= feature.start
assert not (feature_end < mutation.start or feature_start > mutation.end)

if mutation.end < feature.end - 1:
logging.debug('MFMF from {} to {}'.format(tt[mutation.end + 1], tt[feature.end]))
if mutation.start > feature_start:
if mutation.end < feature_end or mutation.size == 0: # mutation properly contained in feature.
logging.debug('FMMF from {} to {}({})'.format(
feature_start, # tt.ge(feature.start),
feature_end, # tt.le(feature.end),
feature_end - mutation.size + mutation.new_size))

changed_features.add(feature._move(mutation.end + 1, feature.end))
changed_features.addi(feature_start,
feature_end - mutation.size + mutation.new_size,
feature)
else:
pass # feature removed and not replaced.
logging.debug('MFFM feature removed')
logging.debug('FMFM from {} to {}'.format(tt[feature_start], tt[mutation.start - 1]))
changed_features.addi(feature_start, mutation.start, feature)
else: # if mutation.start >= feature.start
if mutation.end < feature_end:
logging.debug('MFMF from {} to {}'.format(tt[mutation.end], tt[feature_end-1]))
changed_features.addi(mutation.end, feature_end, feature)

logging.debug('applying mutation: at %s("%s") translating from %s("%s") delete %s bases and insert "%s"',
translated_start,
Expand Down Expand Up @@ -326,9 +325,12 @@ def mutate(self, mutations, strict=True):
component._mutations_tt = tt
component.features = features

for feature in changed_features:
for new_start, new_end, feature in changed_features:
new_location = FeatureLocation(new_start, new_end, strand=feature.location.strand)
new_location = shift_feature_location(tt, new_location)

logging.debug('changed: %s', feature)
translated_feature = Component._translate_feature(feature, component, tt)
translated_feature = transform(feature, new_location, component)
features.add(translated_feature)

logging.debug('translating %s: %s(%s) "%s" -> %s(%s) "%s"',
Expand Down
104 changes: 87 additions & 17 deletions co/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,24 @@ def __eq__(self, other):
self.ref == other.ref and \
self.ref_db == other.ref_db

def _move_to_location(self, location, component=None):
@classmethod
def transform(cls, feature, location, component):
return Feature(location=location,
component=component or self.component,
type=self.type,
location_operator=self.location_operator,
id=self.id,
qualifiers=dict(self.qualifiers.items()))

component=component or feature.component,
type=feature.type,
location_operator=feature.location_operator,
id=feature.id,
qualifiers=dict(feature.qualifiers.items()))
# TODO refs?

def _move(self, start, end):
# TODO ref
new_location = FeatureLocation(start, end, strand=self.location.strand)
return self._move_to_location(new_location)

def _shift(self, offset, component=None):
return self._move_to_location(self.location._shift(offset), component)
@classmethod
def translate(cls, feature, offset, component=None):
return Feature(location=feature.location._shift(offset),
component=component or feature.component,
type=feature.type,
location_operator=feature.location_operator,
id=feature.id,
qualifiers=dict(feature.qualifiers.items()))

@property
def seq(self):
Expand Down Expand Up @@ -224,6 +225,75 @@ def __gt__(self, other):
return False


class FeatureProxy(object):

def __init__(self, feature, component):
self._component = component

if isinstance(feature, FeatureProxy):
self._feature = feature.feature
else:
self._feature = feature

if self._feature.component == component:
self.location = feature.location
else:
self.location = self._translate_location(feature, component)

def __getattr__(self, item):
return getattr(self.feature, item)

def __lt__(self, other):
if self.start < other.start:
return True
if self.start > other.start:
return False
if self.end < other.end:
return True
return False

def __gt__(self, other):
if self.start > other.start:
return True
if self.start < other.start:
return False
if self.end > other.end:
return True
return False

def __repr__(self):
return 'Proxy({})/{}'.format(self.location, repr(self._feature))

@staticmethod
def _translate_location(feature, component, tt=None):
if tt is None:
tt = component.tt(feature.component)

offset = tt[feature.location.start] - feature.location.start
return feature.location._shift(offset)

@property
def feature(self):
return self._feature

@property
def origin(self):
return self._feature.component

@property
def component(self):
return self._component

@property
def start(self):
return self.location.start

@property
def end(self):
return self.location.end



class ComponentFeatureSet(FeatureSet):
"""
An extended version of :class:`FeatureSet` that binds to a :class:`Component` and inherits from any
Expand All @@ -250,8 +320,8 @@ def __init__(self, component, removed_features=None, feature_class=None):
def __iter__(self):
if self.parent_feature_set: # NOTE: this is where caching should kick in on any inherited implementation.
keep_features = (f for f in self.parent_feature_set if f not in self.removed_features)
translated_features = (self.component._translate_feature(f, self.component) for f in keep_features)
return heapq.merge({f.data for f in self._features}, translated_features)
translated_features = (FeatureProxy(f, self.component) for f in keep_features)
return heapq.merge(sorted(f.data for f in self._features), sorted(translated_features))
else:
return super(ComponentFeatureSet, self).__iter__()

Expand Down Expand Up @@ -288,7 +358,7 @@ def remove(self, feature):

def overlap(self, start, end, include_inherited=True):
"""
Returns an iterator over all features in the collection that overlap the given range.
Returns a set of all features in the collection that overlap the given range.

:param int start: overlap region start
:param int end: overlap region end
Expand Down
4 changes: 1 addition & 3 deletions co/mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ def end(self): # FIXME *dangerous* needs review.
"""
Computed end coordinate of the deletion. Use with caution.
"""
if self.size in (0, 1):
return self.position
return self.position + self.size - 1
return self.position + self.size

@property
def new_size(self):
Expand Down
15 changes: 15 additions & 0 deletions co/translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,18 @@ def substitute(self, position, size, strict=True):
:raises OverlapError: in various edge cases involving overlapping mutations, particularly in `strict` mode.
"""
self._insert_gap(position, size, size, strict)


class TranslationTableChain(object):
def __init__(self, tables):
self.tables = tuple(tables)

def __getitem__(self, position):
for table in reversed(self.tables):
position = table[position]
return position


def shift_feature_location(tt, location):
offset = tt[location.start] - location.start
return location._shift(offset)
Loading