Switch to unified view

a b/singlecellmultiomics/molecule/nlaIII.py
1
from singlecellmultiomics.molecule.molecule import Molecule
2
from singlecellmultiomics.molecule.featureannotatedmolecule import FeatureAnnotatedMolecule
3
4
5
class NlaIIIMolecule(Molecule):
6
    """Nla III based Molecule class
7
8
    Args:
9
        fragments (singlecellmultiomics.fragment.Fragment): Fragments to associate to the molecule
10
11
        **kwargs: extra args
12
13
    """
14
15
    def __init__(self, fragment,
16
                 site_has_to_be_mapped=False,
17
                 # molecule is invalid when NLA site does not map  (requires
18
                 # reference)
19
                 **kwargs):
20
21
        self.site_location = None
22
        self.site_has_to_be_mapped = site_has_to_be_mapped
23
        Molecule.__init__(self, fragment, **kwargs)
24
25
26
    def _add_fragment(self,fragment):
27
        # Update the cut coordinate tho the (left most extreme value)
28
        self.assignment_radius = fragment.assignment_radius
29
30
        if fragment.site_location is not None:
31
32
            if self.site_location is None:
33
                self.site_location = [fragment.site_location[0],fragment.site_location[1]]
34
35
            elif fragment.strand: # Reverse:
36
                self.site_location[1] = max(fragment.site_location[1], self.site_location[1]) # this is the coordinate
37
            else:
38
                self.site_location[1] = min(fragment.site_location[1], self.site_location[1]) # this is the coordinate
39
40
        # else : writing a fragment which has no cut location associated
41
        Molecule._add_fragment(self, fragment)
42
43
44
    def write_tags(self):
45
        Molecule.write_tags(self)
46
        if self.reference is not None:  # Only calculate this statistic when a reference is available
47
            if self.get_cut_site() is not None:
48
                self.set_meta('Us', self.get_undigested_site_count())
49
50
    def is_valid(self, set_rejection_reasons=False, reference=None):
51
52
        try:
53
            chrom, start, strand = self.get_cut_site()
54
        except Exception as e:
55
            if set_rejection_reasons:
56
                self.set_rejection_reason('no_cut_site_found')
57
            return False
58
59
        if self.site_has_to_be_mapped:
60
61
            if reference is None:
62
                if self.reference is None:
63
                    raise ValueError(
64
                        'Please supply a reference (PySAM.FastaFile)')
65
            reference = self.reference
66
67
            if reference.fetch(chrom, start, start + 4).upper() != 'CATG':
68
                if set_rejection_reasons:
69
                    self.set_rejection_reason('no_CATG_in_ref')
70
                return False
71
72
        return True
73
74
    def get_fragment_span_sequence(self, reference=None):
75
        """Obtain the sequence between the start and end of the molecule
76
        Args:
77
            reference(pysam.FastaFile) : reference  to use.
78
                If not specified `self.reference` is used
79
        Returns:
80
            sequence (str)
81
        """
82
        if self.chromosome is None:
83
            return ''
84
85
        if reference is None:
86
            if self.reference is None:
87
                raise ValueError('Please supply a reference (PySAM.FastaFile)')
88
        reference = self.reference
89
        return reference.fetch(
90
            self.chromosome,
91
            self.spanStart,
92
            self.spanEnd).upper()
93
94
    def get_undigested_site_count(self, reference=None):
95
        """
96
        Obtain the amount of undigested sites in the span of the molecule
97
98
        Args:
99
            reference(pysam.FastaFile) : reference handle
100
101
        Returns:
102
            undigested_site_count : int
103
            amount of undigested cut sites in the mapping span of the molecule
104
105
        Raises:
106
            ValueError : when the span of the molecule is not properly defined
107
        """
108
        if reference is None:
109
            if self.reference is None:
110
                raise ValueError('Please supply a reference (PySAM.FastaFile)')
111
        reference = self.reference
112
113
        seq = self.get_fragment_span_sequence(reference)
114
        total = seq.count('CATG')
115
        if self.strand == 0 and seq.endswith('CATG'):
116
            total -= 1
117
        elif self.strand == 1 and seq.startswith('CATG'):
118
            total -= 1
119
        return total
120
121
    def write_tags_to_psuedoreads(self, reads, call_super=True):
122
        if call_super:
123
            Molecule.write_tags_to_psuedoreads(self, reads)
124
        if self.reference is not None:  # Only calculate this statistic when a reference is available
125
            if self.get_cut_site() is not None:
126
                us = self.get_undigested_site_count()
127
                for read in reads:
128
                    read.set_tag('Us', us)
129
130
131
class AnnotatedNLAIIIMolecule(FeatureAnnotatedMolecule, NlaIIIMolecule):
132
    """Nla III based Molecule which is annotated with features (genes/exons/introns, .. )
133
134
    Args:
135
        fragments (singlecellmultiomics.fragment.Fragment): Fragments to associate to the molecule
136
        features (singlecellmultiomics.features.FeatureContainer) : container to use to obtain features from
137
        **kwargs: extra args
138
139
    """
140
141
    def write_tags(self):
142
        NlaIIIMolecule.write_tags(self)
143
        FeatureAnnotatedMolecule.write_tags(self)
144
145
    def __init__(self, fragment, features, **kwargs):
146
        self.site_location = None
147
        FeatureAnnotatedMolecule.__init__(self, fragment, features, **kwargs)
148
        NlaIIIMolecule.__init__(self, fragment, **kwargs)
149
150
    def write_tags_to_psuedoreads(self,reads):
151
        NlaIIIMolecule.write_tags_to_psuedoreads(self, reads)
152
        FeatureAnnotatedMolecule.write_tags_to_psuedoreads(self,reads,call_super=False)