Switch to unified view

a b/singlecellmultiomics/molecule/iterator.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
from singlecellmultiomics.molecule import Molecule
4
from singlecellmultiomics.fragment import Fragment
5
from singlecellmultiomics.utils.prefetch import initialise_dict, initialise
6
from singlecellmultiomics.universalBamTagger import QueryNameFlagger
7
import pysamiterators.iterators
8
import collections
9
import pysam
10
11
12
class ReadIterator(pysamiterators.iterators.MatePairIterator):
13
    def __next__(self):
14
15
        try:
16
            rec = next(self.iterator)
17
            return tuple((rec, None))
18
        except StopIteration:
19
            raise
20
21
class MoleculeIterator():
22
    """Iterate over molecules in pysam.AlignmentFile or reads from a generator or list
23
24
    Example:
25
        >>> !wget https://github.com/BuysDB/SingleCellMultiOmics/blob/master/data/mini_nla_test.bam?raw=true -O mini_nla_test.bam
26
        >>> !wget https://github.com/BuysDB/SingleCellMultiOmics/blob/master/data/mini_nla_test.bam.bai?raw=true -O mini_nla_test.bam.bai
27
        >>> import pysam
28
        >>> from singlecellmultiomics.molecule import NlaIIIMolecule, MoleculeIterator
29
        >>> from singlecellmultiomics.fragment import NlaIIIFragment
30
        >>> import pysamiterators
31
        >>> alignments = pysam.AlignmentFile('mini_nla_test.bam')
32
        >>> for molecule in MoleculeIterator(
33
        >>>             alignments=alignments,
34
        >>>             molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
35
        >>>             fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
36
        >>>         ):
37
        >>>     break
38
        >>> molecule
39
        NlaIIIMolecule
40
        with 1 assinged fragments
41
        Allele :No allele assigned
42
            Fragment:
43
            sample:APKS1P25-NLAP2L2_57
44
            umi:CCG
45
            span:chr1 164834728-164834868
46
            strand:+
47
            has R1: yes
48
            has R2: no
49
            randomer trimmed: no
50
            DS:164834865
51
        RS:0
52
        RZ:CAT
53
        Restriction site:('chr1', 164834865)
54
55
56
    It is also possible to supply and iterator instead of a SAM/BAM file handle
57
    Example:
58
59
        >>> from singlecellmultiomics.molecule import MoleculeIterator
60
        >>> from singlecellmultiomics.fragment import Fragment
61
        >>> import pysam
62
        >>> # Create SAM file to write some example reads to:
63
        >>> test_sam = pysam.AlignmentFile('test.sam','w',reference_names=['chr1','chr2'],reference_lengths=[1000,1000])
64
        >>> read_A = pysam.AlignedSegment(test_sam.header)
65
        >>> read_A.set_tag('SM','CELL_1')
66
        >>> read_A.set_tag('RX','CAT')
67
        >>> read_A.reference_name = 'chr1'
68
        >>> read_A.reference_start = 100
69
        >>> read_A.query_sequence = 'ATCGGG'
70
        >>> read_A.cigarstring = '6M'
71
        >>> read_A.mapping_quality = 60
72
        >>> # Create a second read which is a duplicate of the previous
73
        >>> read_B = pysam.AlignedSegment(test_sam.header)
74
        >>> read_B.set_tag('SM','CELL_1')
75
        >>> read_B.set_tag('RX','CAT')
76
        >>> read_B.reference_name = 'chr1'
77
        >>> read_B.reference_start = 100
78
        >>> read_B.query_sequence = 'ATCGG'
79
        >>> read_B.cigarstring = '5M'
80
        >>> read_B.mapping_quality = 60
81
        >>> # Create a thids read which is belonging to another cell
82
        >>> read_C = pysam.AlignedSegment(test_sam.header)
83
        >>> read_C.set_tag('SM','CELL_2')
84
        >>> read_C.set_tag('RX','CAT')
85
        >>> read_C.reference_name = 'chr1'
86
        >>> read_C.reference_start = 100
87
        >>> read_C.query_sequence = 'ATCGG'
88
        >>> read_C.cigarstring = '5M'
89
        >>> read_C.mapping_quality = 60
90
        >>> # Set up an iterable containing the reads:
91
        >>> reads = [  read_A,read_B,read_C ]
92
        >>> molecules = []
93
        >>> for molecule in MoleculeIterator( reads ):
94
        >>>     print(molecule)
95
        Molecule
96
            with 2 assinged fragments
97
            Allele :No allele assigned
98
                Fragment:
99
                sample:CELL_1
100
                umi:CAT
101
                span:chr1 100-106
102
                strand:+
103
                has R1: yes
104
                has R2: no
105
                randomer trimmed: no
106
            Fragment:
107
                sample:CELL_1
108
                umi:CAT
109
                span:chr1 100-105
110
                strand:+
111
                has R1: yes
112
                has R2: no
113
                randomer trimmed: no
114
        Molecule
115
                with 1 assinged fragments
116
                Allele :No allele assigned
117
                    Fragment:
118
                    sample:CELL_2
119
                    umi:CAT
120
                    span:chr1 100-105
121
                    strand:+
122
                    has R1: yes
123
                    has R2: no
124
                    randomer trimmed: no
125
126
127
    In the next example the molecules overlapping with a single location on chromosome `'1'` position `420000` are extracted
128
    Don't forget to supply `check_eject_every = None`, this allows non-sorted data to be passed to the MoleculeIterator.
129
130
    Example:
131
132
        >>> from singlecellmultiomics.bamProcessing import mate_pileup
133
        >>> from singlecellmultiomics.molecule import MoleculeIterator
134
        >>> with pysam.AlignmentFile('example.bam') as alignments:
135
        >>>     for molecule in MoleculeIterator(
136
        >>>         mate_pileup(alignments, contig='1', position=420000, check_eject_every=None)
137
        >>>     ):
138
        >>>         pass
139
140
141
    Warning:
142
        Make sure the reads being supplied to the MoleculeIterator sorted by genomic coordinate! If the reads are not sorted set `check_eject_every=None`
143
    """
144
145
    def __init__(self, alignments, molecule_class=Molecule,
146
                 fragment_class=Fragment,
147
                 check_eject_every=10_000,  # bigger sizes are very speed benificial
148
                 molecule_class_args={},  # because the relative amount of molecules
149
                 # which can be ejected will be much higher
150
                 fragment_class_args={},
151
                 perform_qflag=True,
152
                 pooling_method=1,
153
                 yield_invalid=False,
154
                 yield_overflow=True,
155
                 query_name_flagger=None,
156
                 ignore_collisions=True, # Ignore read-dupe collisions
157
                 every_fragment_as_molecule=False,
158
                 yield_secondary =  False,
159
                 yield_supplementary= False,
160
                 max_buffer_size=None,  #Limit the amount of stored reads, when this value is exceeded, a MemoryError is thrown
161
                 iterator_class = pysamiterators.iterators.MatePairIterator,
162
                 skip_contigs=None,
163
                 progress_callback_function=None,
164
                 min_mapping_qual = None,
165
                 perform_allele_clustering = False,
166
167
                 **pysamArgs):
168
        """Iterate over molecules in pysam.AlignmentFile
169
170
        Args:
171
            alignments (pysam.AlignmentFile) or iterable yielding tuples: Alignments to extract molecules from
172
173
            molecule_class (pysam.FastaFile): Class to use for molecules.
174
175
            fragment_class (pysam.FastaFile): Class to use for fragments.
176
177
            check_eject_every (int): Check for yielding every N reads. When None is supplied, all reads are kept into memory making coordinate sorted data not required.
178
179
            molecule_class_args (dict): arguments to pass to molecule_class.
180
181
            fragment_class_args (dict): arguments to pass to fragment_class.
182
183
            perform_qflag (bool):  Make sure the sample/umi etc tags are copied
184
                from the read name into bam tags
185
186
            pooling_method(int) : 0: no  pooling, 1: only compare molecules with the same sample id and hash
187
188
            ignore_collisions   (bool) : parameter passed to pysamIterators MatePairIterator, this setting will throw a fatal error when a duplicated read is found
189
190
            yield_invalid (bool) : When true all fragments which are invalid will be yielded as a molecule
191
192
            yield_overflow(bool) : When true overflow fragments are yielded as separate molecules
193
194
            query_name_flagger(class) : class which contains the method digest(self, reads) which accepts pysam.AlignedSegments and adds at least the SM and RX tags
195
196
            every_fragment_as_molecule(bool): When set to true all valid fragments are emitted as molecule with one associated fragment, this is a way to disable deduplication.
197
198
            yield_secondary (bool):  When true all secondary alignments will be yielded as a molecule
199
200
            iterator_class : Class name of class which generates mate-pairs out of a pysam.AlignmentFile either (pysamIterators.MatePairIterator or pysamIterators.MatePairIteratorIncludingNonProper)
201
202
            skip_contigs (set) : Contigs to skip
203
204
            min_mapping_qual(int) : Dont process reads with a mapping quality lower than this value. These reads are not yielded as molecules!
205
206
            **kwargs: arguments to pass to the pysam.AlignmentFile.fetch function
207
208
        Yields:
209
            molecule (Molecule): Molecule
210
        """
211
        if query_name_flagger is None:
212
            query_name_flagger = QueryNameFlagger()
213
        self.query_name_flagger = query_name_flagger
214
        self.skip_contigs = skip_contigs if skip_contigs is not None else set()
215
        self.alignments = alignments
216
        self.molecule_class = molecule_class
217
        self.fragment_class = fragment_class
218
        self.check_eject_every = check_eject_every
219
        self.molecule_class_args = initialise_dict(molecule_class_args)
220
        self.fragment_class_args = initialise_dict(fragment_class_args)
221
        self.perform_qflag = perform_qflag
222
        self.pysamArgs = pysamArgs
223
        self.matePairIterator = None
224
        self.ignore_collisions = ignore_collisions
225
        self.pooling_method = pooling_method
226
        self.yield_invalid = yield_invalid
227
        self.yield_overflow = yield_overflow
228
        self.every_fragment_as_molecule = every_fragment_as_molecule
229
        self.progress_callback_function = progress_callback_function
230
        self.iterator_class = iterator_class
231
        self.max_buffer_size=max_buffer_size
232
        self.min_mapping_qual = min_mapping_qual
233
        self.perform_allele_clustering = perform_allele_clustering
234
235
        self._clear_cache()
236
237
    def _clear_cache(self):
238
        """Clear cache containing non yielded molecules"""
239
        self.waiting_fragments = 0
240
        self.yielded_fragments = 0
241
        self.deleted_fragments = 0
242
        self.check_ejection_iter = 0
243
        if self.pooling_method == 0:
244
            self.molecules = []
245
        elif self.pooling_method == 1:
246
            self.molecules_per_cell = collections.defaultdict(
247
                list)  # {hash:[], :}
248
        else:
249
            raise NotImplementedError()
250
251
    def __repr__(self):
252
        return f"""Molecule Iterator, generates fragments from {self.fragment_class} into molecules based on {self.molecule_class}.
253
        Yielded {self.yielded_fragments} fragments, {self.waiting_fragments} fragments are waiting to be ejected. {self.deleted_fragments} fragments rejected.
254
        {self.get_molecule_cache_size()} molecules cached.
255
        Mate pair iterator: {str(self.matePairIterator)}"""
256
257
    def get_molecule_cache_size(self):
258
        if self.pooling_method == 0:
259
            return len(self.molecules)
260
        elif self.pooling_method == 1:
261
            return sum(len(cell_molecules) for cell,
262
                       cell_molecules in self.molecules_per_cell.items())
263
264
        else:
265
            raise NotImplementedError()
266
267
268
    def yield_func(self, molecule_to_be_emitted):
269
        if self.perform_allele_clustering:
270
            if molecule_to_be_emitted.can_be_split_into_allele_molecules:
271
                new_molecules = molecule_to_be_emitted.split_into_allele_molecules()
272
                if len(new_molecules)>1:
273
                    yield from new_molecules
274
                else:
275
                    yield molecule_to_be_emitted
276
            else:
277
                yield molecule_to_be_emitted
278
        else:
279
            yield molecule_to_be_emitted
280
281
282
    def __iter__(self):
283
        if self.perform_qflag:
284
            qf = self.query_name_flagger
285
286
        self._clear_cache()
287
        self.waiting_fragments = 0
288
        # prepare the source iterator which generates the read pairs:
289
        if isinstance(self.alignments, pysam.libcalignmentfile.AlignmentFile):
290
291
            # Don't pass the ignore_collisions to other classes than the matepair iterator
292
            if self.iterator_class == pysamiterators.iterators.MatePairIterator:
293
                self.matePairIterator = self.iterator_class(
294
                    self.alignments,
295
                    performProperPairCheck=False,
296
                    ignore_collisions=self.ignore_collisions,
297
                    **self.pysamArgs)
298
            else:
299
                self.matePairIterator = self.iterator_class(
300
                    self.alignments,
301
                    performProperPairCheck=False,
302
                    **self.pysamArgs)
303
304
        else:
305
            # If an iterable is provided use this as read source:
306
            self.matePairIterator = self.alignments
307
308
        for iteration,reads in enumerate(self.matePairIterator):
309
310
            if self.progress_callback_function is not None and iteration%500==0:
311
                self.progress_callback_function(iteration, self, reads)
312
313
            if isinstance(reads, pysam.AlignedSegment):
314
                R1 = reads
315
                R2 = None
316
            elif len(reads) == 2:
317
                R1, R2 = reads
318
            elif (isinstance(reads, list) or isinstance(reads, tuple)) and len(reads) == 1:
319
                R1 = reads[0]
320
                R2 = None
321
            else:
322
                raise ValueError(
323
                    'Iterable not understood, supply either  pysam.AlignedSegment or lists of  pysam.AlignedSegment')
324
325
            # skip_contigs
326
            if len(self.skip_contigs)>0:
327
                keep = False
328
                for read in reads:
329
                    if read is not None and read.reference_name not in self.skip_contigs:
330
                        keep = True
331
                if not keep:
332
                    continue
333
334
            if self.min_mapping_qual is not None:
335
                keep = True
336
                for read in reads:
337
                    if read is not None and read.mapping_quality<self.min_mapping_qual:
338
                        self.deleted_fragments+=1
339
                        keep=False
340
                if not keep:
341
                    continue
342
343
            # Make sure the sample/umi etc tags are placed:
344
            if self.perform_qflag:
345
                qf.digest([R1, R2])
346
347
            fragment = self.fragment_class([R1, R2], **self.fragment_class_args)
348
349
            if not fragment.is_valid():
350
                if self.yield_invalid:
351
                    m = self.molecule_class(
352
                        fragment, **self.molecule_class_args)
353
                    m.__finalise__()
354
                    yield m
355
                else:
356
                    self.deleted_fragments+=1
357
                continue
358
359
            if self.every_fragment_as_molecule:
360
                m = self.molecule_class(fragment, **self.molecule_class_args)
361
                m.__finalise__()
362
                yield m
363
                continue
364
365
            added = False
366
            try:
367
                if self.pooling_method == 0:
368
                    for molecule in self.molecules:
369
                        if molecule.add_fragment(fragment, use_hash=False):
370
                            added = True
371
                            break
372
                elif self.pooling_method == 1:
373
                    for molecule in self.molecules_per_cell[fragment.match_hash]:
374
                        if molecule.add_fragment(fragment, use_hash=True):
375
                            added = True
376
                            break
377
            except OverflowError:
378
                # This means the fragment does belong to a molecule, but the molecule does not accept any more fragments.
379
                if self.yield_overflow:
380
                    m = self.molecule_class(fragment, **self.molecule_class_args)
381
                    m.set_rejection_reason('overflow')
382
                    m.__finalise__()
383
                    yield from self.yield_func(m)
384
                else:
385
                    self.deleted_fragments+=1
386
                continue
387
388
            if not added:
389
                if self.pooling_method == 0:
390
                    self.molecules.append(self.molecule_class(
391
                        fragment, **self.molecule_class_args))
392
                else:
393
                    self.molecules_per_cell[fragment.match_hash].append(
394
                        self.molecule_class(fragment, **self.molecule_class_args)
395
                    )
396
397
            self.waiting_fragments += 1
398
            self.check_ejection_iter += 1
399
400
            if self.max_buffer_size is not None and self.waiting_fragments>self.max_buffer_size:
401
                raise MemoryError(f'max_buffer_size exceeded with {self.waiting_fragments} waiting fragments')
402
403
            if self.check_eject_every is not None and self.check_ejection_iter > self.check_eject_every:
404
                current_chrom, _, current_position = fragment.get_span()
405
                if current_chrom is None:
406
                    continue
407
408
                self.check_ejection_iter = 0
409
                if self.pooling_method == 0:
410
                    to_pop = []
411
                    for i, m in enumerate(self.molecules):
412
                        if m.can_be_yielded(current_chrom, current_position):
413
                            to_pop.append(i)
414
                            self.waiting_fragments -= len(m)
415
                            self.yielded_fragments += len(m)
416
417
                    for i, j in enumerate(to_pop):
418
                        m = self.molecules.pop(i - j)
419
                        m.__finalise__()
420
                        yield from self.yield_func(m)
421
                else:
422
                    for hash_group, molecules in self.molecules_per_cell.items():
423
424
                        to_pop = []
425
                        for i, m in enumerate(molecules):
426
                            if m.can_be_yielded(
427
                                    current_chrom, current_position):
428
                                to_pop.append(i)
429
                                self.waiting_fragments -= len(m)
430
                                self.yielded_fragments += len(m)
431
432
                        for i, j in enumerate(to_pop):
433
                            m = self.molecules_per_cell[hash_group].pop(i - j)
434
                            m.__finalise__()
435
                            yield from self.yield_func(m)
436
437
        # Yield remains
438
        if self.pooling_method == 0:
439
            for m in self.molecules:
440
                m.__finalise__()
441
                yield from self.yield_func(m)
442
            #yield from iter(self.molecules)
443
        else:
444
445
            for hash_group, molecules in self.molecules_per_cell.items():
446
                for i, m in enumerate(molecules):
447
                    m.__finalise__()
448
                    yield from self.yield_func(m)
449
        self._clear_cache()