diff --git a/co/component.py b/co/component.py index 940ad7f..9a8b1b8 100644 --- a/co/component.py +++ b/co/component.py @@ -2,18 +2,21 @@ from functools import reduce import logging -from Bio import Alphabet from Bio.Seq import Seq import operator from Bio.SeqFeature import FeatureLocation, SeqFeature from Bio.SeqRecord import SeqRecord +from intervaltree import IntervalTree +import itertools import six from co.difference import Diff -from co.feature import Feature, ComponentFeatureSet, Source -from co.translation import OverlapError, MutableTranslationTable +from co.feature import Feature, ComponentFeatureSet +from co.translation import OverlapError, MutableTranslationTable, shift_feature_location, TranslationTableChain +# logging.basicConfig(level=logging.DEBUG) + class Component(object): """ .. attribute:: features @@ -118,7 +121,14 @@ def seq(self): def tt(self, ancestor=None): if ancestor in (None, self._parent): return self._mutations_tt - raise KeyError("Translation table from {} to {} cannot be accessed.".format(repr(self), repr(ancestor))) + + try: + ancestors = list(self.get_lineage()) + i = ancestors.index(ancestor) + ancestors = [self] + ancestors[:i] + return TranslationTableChain(a.tt() for a in ancestors) + except ValueError: + raise KeyError("Translation table from {} to {} cannot be accessed.".format(repr(self), repr(ancestor))) @classmethod def combine(cls, *components, **kwargs): @@ -140,7 +150,7 @@ def combine(cls, *components, **kwargs): offset = 0 for component in components: for feature in component.features: - combined.features.add(feature._shift(offset)) #move(offset + feature.position, component=combined)) + combined.features.add(Feature.translate(feature, offset, combined)) #move(offset + feature.position, component=combined)) offset += len(component) else: offset = 0 @@ -155,26 +165,11 @@ def combine(cls, *components, **kwargs): return combined - @property def parent(self): return self._parent - @staticmethod - def _translate_feature(feature, component, tt=None): - if tt is None: - tt = component.tt(feature.component) - # - # logging.debug('TRANSLATE {}'.format(feature)) - # logging.debug('TRANSLATE T {}'.format(list(enumerate(tt)))) - - offset = tt[feature.location.start] - feature.location.start - - # FIXME does not include ref and ref_db! - # TODO create a "broken reference" object and re-attach here - return feature._shift(offset, component) - - def mutate(self, mutations, strict=True): + def mutate(self, mutations, strict=True, transform=None): """ Creates a copy of this :class:`Component` and applies all ``mutations`` in order. @@ -195,13 +190,20 @@ def mutate(self, mutations, strict=True): handle. """ - component = Component(seq=self._seq, parent=self) # TODO use __new__ instead + if transform is None: + transform = Feature.transform + + + component = Component(seq=self._seq, parent=self, feature_class=self.features._feature_class) # TODO use __new__ instead features = component.features + tt = MutableTranslationTable(size=len(self._seq)) sequence = self.seq.tomutable() - changed_features = set() + changed_features = IntervalTree() + + # TODO check that mutations do not overlap # check that all mutations are in range: for mutation in mutations: @@ -221,7 +223,7 @@ def mutate(self, mutations, strict=True): if strict: translated_start = tt[mutation.position] - else: + else: # TODO get rid of non-strict mode try: translated_start = tt.ge(mutation.position) logging.debug('translated start: nonstrict=%s; strict=%s', translated_start, tt[mutation.position]) @@ -251,44 +253,41 @@ def mutate(self, mutations, strict=True): logging.debug('new features: {}'.format(list(features))) logging.info('features in mutation: {}'.format(affected_features)) - for feature in affected_features | changed_features: - # logging.debug(list(range(len(self.sequence)))) - # logging.debug(list(tt)) - logging.info('{} with sequence "{}" affected by {}.'.format(feature, feature.seq, mutation)) - - if feature.end < mutation.start or feature.start > mutation.end: - continue + for interval in itertools.chain( + changed_features.search(mutation.start - 1, mutation.end), + affected_features): + # TODO move this into a previous loop: try: - changed_features.remove(feature) - except KeyError: + changed_features.remove(interval) + feature = interval.data + feature_start = interval.begin + feature_end = interval.end + except ValueError: + feature = interval features.remove(feature) + feature_start = feature.start + feature_end = feature.end - # TODO find untranslated equivalents and replace when all is over. - if mutation.start > feature.start: - if mutation.end < feature.end - 1 or mutation.size == 0: # mutation properly contained in feature. - logging.debug('FMMF from {} to {}({})'.format( - feature.start, # tt.ge(feature.start), - feature.end, # tt.le(feature.end), - feature.end - mutation.size + mutation.new_size)) - - changed_features.add(feature._move(feature.start, - feature.end - mutation.size + mutation.new_size)) - else: - logging.debug('FMFM from {} to {}'.format( - tt[feature.start], - tt[mutation.start - 1])) # tt[mutation.start] ? - - changed_features.add(feature._move(feature.start, mutation.start)) - else: # mutation.start >= feature.start + assert not (feature_end < mutation.start or feature_start > mutation.end) - if mutation.end < feature.end - 1: - logging.debug('MFMF from {} to {}'.format(tt[mutation.end + 1], tt[feature.end])) + if mutation.start > feature_start: + if mutation.end < feature_end or mutation.size == 0: # mutation properly contained in feature. + logging.debug('FMMF from {} to {}({})'.format( + feature_start, # tt.ge(feature.start), + feature_end, # tt.le(feature.end), + feature_end - mutation.size + mutation.new_size)) - changed_features.add(feature._move(mutation.end + 1, feature.end)) + changed_features.addi(feature_start, + feature_end - mutation.size + mutation.new_size, + feature) else: - pass # feature removed and not replaced. - logging.debug('MFFM feature removed') + logging.debug('FMFM from {} to {}'.format(tt[feature_start], tt[mutation.start - 1])) + changed_features.addi(feature_start, mutation.start, feature) + else: # if mutation.start >= feature.start + if mutation.end < feature_end: + logging.debug('MFMF from {} to {}'.format(tt[mutation.end], tt[feature_end-1])) + changed_features.addi(mutation.end, feature_end, feature) logging.debug('applying mutation: at %s("%s") translating from %s("%s") delete %s bases and insert "%s"', translated_start, @@ -326,9 +325,12 @@ def mutate(self, mutations, strict=True): component._mutations_tt = tt component.features = features - for feature in changed_features: + for new_start, new_end, feature in changed_features: + new_location = FeatureLocation(new_start, new_end, strand=feature.location.strand) + new_location = shift_feature_location(tt, new_location) + logging.debug('changed: %s', feature) - translated_feature = Component._translate_feature(feature, component, tt) + translated_feature = transform(feature, new_location, component) features.add(translated_feature) logging.debug('translating %s: %s(%s) "%s" -> %s(%s) "%s"', diff --git a/co/feature.py b/co/feature.py index 6e958db..162ceda 100644 --- a/co/feature.py +++ b/co/feature.py @@ -165,23 +165,24 @@ def __eq__(self, other): self.ref == other.ref and \ self.ref_db == other.ref_db - def _move_to_location(self, location, component=None): + @classmethod + def transform(cls, feature, location, component): return Feature(location=location, - component=component or self.component, - type=self.type, - location_operator=self.location_operator, - id=self.id, - qualifiers=dict(self.qualifiers.items())) - + component=component or feature.component, + type=feature.type, + location_operator=feature.location_operator, + id=feature.id, + qualifiers=dict(feature.qualifiers.items())) # TODO refs? - def _move(self, start, end): - # TODO ref - new_location = FeatureLocation(start, end, strand=self.location.strand) - return self._move_to_location(new_location) - - def _shift(self, offset, component=None): - return self._move_to_location(self.location._shift(offset), component) + @classmethod + def translate(cls, feature, offset, component=None): + return Feature(location=feature.location._shift(offset), + component=component or feature.component, + type=feature.type, + location_operator=feature.location_operator, + id=feature.id, + qualifiers=dict(feature.qualifiers.items())) @property def seq(self): @@ -224,6 +225,75 @@ def __gt__(self, other): return False +class FeatureProxy(object): + + def __init__(self, feature, component): + self._component = component + + if isinstance(feature, FeatureProxy): + self._feature = feature.feature + else: + self._feature = feature + + if self._feature.component == component: + self.location = feature.location + else: + self.location = self._translate_location(feature, component) + + def __getattr__(self, item): + return getattr(self.feature, item) + + def __lt__(self, other): + if self.start < other.start: + return True + if self.start > other.start: + return False + if self.end < other.end: + return True + return False + + def __gt__(self, other): + if self.start > other.start: + return True + if self.start < other.start: + return False + if self.end > other.end: + return True + return False + + def __repr__(self): + return 'Proxy({})/{}'.format(self.location, repr(self._feature)) + + @staticmethod + def _translate_location(feature, component, tt=None): + if tt is None: + tt = component.tt(feature.component) + + offset = tt[feature.location.start] - feature.location.start + return feature.location._shift(offset) + + @property + def feature(self): + return self._feature + + @property + def origin(self): + return self._feature.component + + @property + def component(self): + return self._component + + @property + def start(self): + return self.location.start + + @property + def end(self): + return self.location.end + + + class ComponentFeatureSet(FeatureSet): """ An extended version of :class:`FeatureSet` that binds to a :class:`Component` and inherits from any @@ -250,8 +320,8 @@ def __init__(self, component, removed_features=None, feature_class=None): def __iter__(self): if self.parent_feature_set: # NOTE: this is where caching should kick in on any inherited implementation. keep_features = (f for f in self.parent_feature_set if f not in self.removed_features) - translated_features = (self.component._translate_feature(f, self.component) for f in keep_features) - return heapq.merge({f.data for f in self._features}, translated_features) + translated_features = (FeatureProxy(f, self.component) for f in keep_features) + return heapq.merge(sorted(f.data for f in self._features), sorted(translated_features)) else: return super(ComponentFeatureSet, self).__iter__() @@ -288,7 +358,7 @@ def remove(self, feature): def overlap(self, start, end, include_inherited=True): """ - Returns an iterator over all features in the collection that overlap the given range. + Returns a set of all features in the collection that overlap the given range. :param int start: overlap region start :param int end: overlap region end diff --git a/co/mutation.py b/co/mutation.py index feff8cc..63897f2 100644 --- a/co/mutation.py +++ b/co/mutation.py @@ -65,9 +65,7 @@ def end(self): # FIXME *dangerous* needs review. """ Computed end coordinate of the deletion. Use with caution. """ - if self.size in (0, 1): - return self.position - return self.position + self.size - 1 + return self.position + self.size @property def new_size(self): diff --git a/co/translation.py b/co/translation.py index 6f6b3a1..ebc642a 100644 --- a/co/translation.py +++ b/co/translation.py @@ -487,3 +487,18 @@ def substitute(self, position, size, strict=True): :raises OverlapError: in various edge cases involving overlapping mutations, particularly in `strict` mode. """ self._insert_gap(position, size, size, strict) + + +class TranslationTableChain(object): + def __init__(self, tables): + self.tables = tuple(tables) + + def __getitem__(self, position): + for table in reversed(self.tables): + position = table[position] + return position + + +def shift_feature_location(tt, location): + offset = tt[location.start] - location.start + return location._shift(offset) diff --git a/tests/test_component.py b/tests/test_component.py index f0f9f37..c192326 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -128,6 +128,42 @@ def test_quickstart_feature_inherit(self): Feature(new_slogan, FeatureLocation(10, 20))], list(new_slogan.features)) self.assertEqual(['Co', 'components'], [str(f.seq) for f in new_slogan.features]) + def test_mutate_inherit_feature_twice(self): + + source = Component('potatoapplecarrot', features=[ + SeqFeature(FeatureLocation(6, 11), type='fruit'), + SeqFeature(FeatureLocation(11, 17), type='vegetable')]) + + self.assertEqual(Seq('apple'), next(iter(source.features)).seq) + c2 = source.mutate([DEL(7, 2)]) + + self.assertEqual('potatoalecarrot', str(c2.seq)) + self.assertEqual({Seq('ale'), Seq('carrot')}, {f.seq for f in c2.features}) + + c3 = c2.mutate([INS(7, 'pp'), INS(6, 'pine')]) + + self.assertEqual('potatopineapplecarrot', str(c3.seq)) + self.assertEqual({Seq('apple'), Seq('carrot')}, {f.seq for f in c3.features}) + self.assertEqual({Seq('apple'), Seq('carrot')}, {c3.seq[f.start:f.end] for f in c3.features}) + + + # FIXME this test fails: + @unittest.SkipTest + def test_inherit_feature_shift(self): + source = Component('12345aaaaa67890', features=[ + SeqFeature(FeatureLocation(5, 10), type='foo')]) + + self.assertEqual(Seq('aaaaa'), next(iter(source.features)).seq) + + c2 = source.mutate([INS(1, 'XXX')]) + self.assertEqual(Seq('aaaaa'), next(iter(c2.features)).seq) + self.assertEqual({Seq('aaaaa')}, {c2.seq[f.start:f.end] for f in c2.features}) + + c3 = c2.mutate([]) + print(c2.features) + print(c3.features) # why is this empty? + self.assertEqual(Seq('aaaaa'), next(iter(c3.features)).seq) + self.assertEqual({Seq('aaaaa')}, {c3.seq[f.start:f.end] for f in c3.features}) @unittest.SkipTest def test_mutate_break_source(self):