a b/tests/test_molecule.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
import unittest
4
import singlecellmultiomics.molecule
5
import singlecellmultiomics.fragment
6
import pysam
7
import pysamiterators.iterators
8
import os
9
10
from singlecellmultiomics.molecule import MoleculeIterator, CHICMolecule
11
from singlecellmultiomics.fragment import CHICFragment
12
13
"""
14
These tests check if the Molecule module is working correctly
15
"""
16
17
class TestMolecule(unittest.TestCase):
18
19
    def test_chic_cigar_dedup(self):
20
        i = 0
21
        with pysam.AlignmentFile('./data/chic_test_region.bam') as alignments:
22
23
            for molecule in MoleculeIterator(alignments,CHICMolecule, CHICFragment):
24
                i+=1
25
26
        self.assertEqual(i,1)
27
28
29
    def test_eq(self):
30
31
32
        def get_chic_read(header,  qname, contig ='chr1', start=100, sequence='ATCGGG', cigar=None, umi='CAT', sample='CELL_1', is_reverse=False, read1=True, paired=False,proper_pair=True ):
33
                read = pysam.AlignedSegment(header)
34
                read.set_tag('SM',sample) # The sample to which the sample belongs is extracted from the SM tag
35
                read.set_tag('RX',umi) # The UMI is extracted from the RX tag
36
                read.set_tag('MX','scCHIC')
37
                # By default the molecule assignment is done based on the mapping location of read 1:
38
                read.reference_name = contig
39
                read.reference_start = start
40
                read.query_name = qname
41
                read.query_sequence = sequence
42
                read.is_reverse = is_reverse
43
                read.cigarstring = f'{len(sequence)}M' if cigar is None else cigar
44
                if read1:
45
                    read.is_read1 = True
46
                    read.is_read2 = False
47
                else:
48
                    read.is_read1 = False
49
                    read.is_read2 = True
50
                if paired:
51
                    read.is_paired = True
52
                    read.is_proper_pair = proper_pair
53
                return read
54
55
        from singlecellmultiomics.molecule import Molecule
56
        from singlecellmultiomics.fragment import Fragment
57
        import pysam
58
        # Create sam file to write some reads to:
59
        with pysam.AlignmentFile('test.sam','w',reference_names=['chr1','chr2'],reference_lengths=[1000,1000]) as test_sam:
60
61
            read_A = get_chic_read(test_sam.header, 'read_A')
62
            read_B = get_chic_read(test_sam.header, 'read_B', start=102)
63
            read_C = get_chic_read(test_sam.header, 'read_C', start=110)
64
            # A reverse read
65
            read_D = get_chic_read(test_sam.header, 'read_D', start=110-5, is_reverse=True)
66
            # A different sample read
67
            read_E = get_chic_read(test_sam.header, 'read_E', start=110-5, is_reverse=True, sample='CELL_2')
68
69
            # A umi error read
70
            read_F = get_chic_read(test_sam.header, 'read_F', start=110-5, is_reverse=True, sample='CELL_2', umi='CAG')
71
72
            # A proper pair
73
            read_G_a = get_chic_read(test_sam.header, 'read_G', start=102,  paired=True)
74
            read_G_b = get_chic_read(test_sam.header, 'read_G', start=130, read1=False, paired=True, is_reverse=True)
75
76
            # A stale R2
77
            read_H = get_chic_read(test_sam.header, 'read_H', start=130,  paired=True, proper_pair=False, read1=False)
78
79
            # Read softclipped from start
80
            read_I = get_chic_read(test_sam.header, 'soft_clipped_read', start=104, cigar='2S4M')
81
82
            frag_A = Fragment([read_A],umi_hamming_distance=0,assignment_radius=2)
83
            frag_B = Fragment([read_B],umi_hamming_distance=0,assignment_radius=2)
84
            frag_C = Fragment([read_C],umi_hamming_distance=0,assignment_radius=2)
85
            frag_D = Fragment([read_D],umi_hamming_distance=0,assignment_radius=2)
86
            frag_E = Fragment([read_E],umi_hamming_distance=0,assignment_radius=2)
87
            frag_F = Fragment([read_F],umi_hamming_distance=0,assignment_radius=2)
88
            frag_G = Fragment([read_G_a, read_G_b],umi_hamming_distance=0,assignment_radius=2)
89
90
            self.assertTrue(frag_A==frag_B)
91
            self.assertTrue(frag_A==frag_G)
92
            self.assertFalse(frag_A==frag_C)
93
            self.assertFalse(frag_A==frag_D)
94
            self.assertFalse(frag_C==frag_D)
95
            self.assertFalse(frag_E==frag_D)
96
97
            frag_A = CHICFragment([read_A],umi_hamming_distance=0,assignment_radius=2)
98
            frag_B = CHICFragment([read_B],umi_hamming_distance=0,assignment_radius=2)
99
            frag_C = CHICFragment([read_C],umi_hamming_distance=0,assignment_radius=2)
100
            frag_D = CHICFragment([read_D],umi_hamming_distance=0,assignment_radius=2)
101
            frag_E = CHICFragment([read_E],umi_hamming_distance=1,assignment_radius=2)
102
            frag_F = CHICFragment([read_F],umi_hamming_distance=1,assignment_radius=2)
103
            frag_G = CHICFragment([read_G_a, read_G_b],umi_hamming_distance=0,assignment_radius=2)
104
            frag_I = CHICFragment([read_I],umi_hamming_distance=1,assignment_radius=2)
105
106
107
            frag_H = CHICFragment([None, read_H],umi_hamming_distance=1,assignment_radius=2)
108
            self.assertFalse(frag_H.is_valid())
109
            self.assertTrue(frag_A.is_valid())
110
            self.assertTrue(frag_B.is_valid())
111
112
            self.assertTrue(frag_A==frag_B)
113
114
            self.assertEqual(frag_G, frag_I)
115
            self.assertFalse(frag_A==frag_C)
116
            self.assertFalse(frag_A==frag_D)
117
            self.assertFalse(frag_C==frag_D)
118
            self.assertFalse(frag_E==frag_D)
119
120
121
            self.assertTrue(frag_A==frag_B)
122
            self.assertFalse(frag_A==frag_C)
123
124
            molecule_A = CHICMolecule([frag_A])
125
            print(molecule_A.site_location[1])
126
            self.assertEqual(molecule_A.site_location[1],98)
127
            self.assertTrue(molecule_A==frag_B)
128
            self.assertFalse(molecule_A==frag_C)
129
130
            # Test construction of molecule:
131
            self.assertTrue( molecule_A.add_fragment(frag_B) )
132
            self.assertEqual(len(molecule_A),2)
133
134
            # Frag C cannot be added:
135
            self.assertFalse( molecule_A.add_fragment(frag_C) )
136
            self.assertEqual(len(molecule_A),2)
137
138
            # Frag D cannot be added:
139
            self.assertFalse( molecule_A.add_fragment(frag_D) )
140
            self.assertEqual(len(molecule_A),2)
141
142
            # Test moving of site location by read which is aligned more to left:
143
            molecule = CHICMolecule([frag_B])
144
            self.assertEqual(molecule.site_location[1],100)
145
            self.assertTrue( molecule.add_fragment(frag_A) )
146
            self.assertEqual(molecule.site_location[1],98)
147
148
            # Test moving of site location by read which is aligned more to right: (Reverse strand)
149
            molecule = CHICMolecule([frag_B])
150
            self.assertEqual(molecule.site_location[1],100)
151
            self.assertTrue( molecule.add_fragment(frag_A) )
152
            self.assertEqual(molecule.site_location[1],98)
153
154
            # Test umi distance:
155
            molecule = CHICMolecule([frag_E])
156
            self.assertTrue( molecule.add_fragment(frag_F) )
157
            read_F.set_tag('RX','GGG')
158
            frag_G = CHICFragment([read_F],umi_hamming_distance=1,assignment_radius=2)
159
            self.assertFalse( molecule.add_fragment(frag_G) )
160
161
162
            # Perform iteration
163
            reads = [read_A, read_B, read_C, read_D, read_E, read_F, (read_G_a, read_G_b), (None,read_H), (read_I,None)]
164
            molecules = list(MoleculeIterator( reads,
165
                            molecule_class = CHICMolecule,
166
                            fragment_class = CHICFragment,
167
                            yield_invalid=True,
168
                            fragment_class_args={
169
                                'assignment_radius':2,
170
                                'umi_hamming_distance':1
171
                            }))
172
173
            recovered_reads = []
174
            for molecule in molecules:
175
                for read in molecule.iter_reads():
176
                    recovered_reads.append(read)
177
                    print(read.query_name)
178
            self.assertEqual(len(recovered_reads),10)
179
180
        os.remove('test.sam')
181
182
183
184
    def test_chic_nocigar_dedup(self):
185
        i = 0
186
        with pysam.AlignmentFile('./data/chic_test_region.bam') as alignments:
187
            for molecule in MoleculeIterator(alignments,CHICMolecule, CHICFragment,fragment_class_args={'no_umi_cigar_processing':True}):
188
                i+=1
189
        self.assertEqual(i,2)
190
191
    def test_a_pysam_iterators(self):
192
        """Test if the pysamiterators package yields the proper amount or mate pairs"""
193
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
194
            for i,(R1,R2) in enumerate(pysamiterators.iterators.MatePairIterator(f)):
195
                pass
196
            self.assertEqual(i,224)
197
198
    def test_molecule_iterator_stability(self):
199
        """Test if the simplest molecule iterator works"""
200
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
201
            it = singlecellmultiomics.molecule.MoleculeIterator(
202
            alignments=f,
203
            molecule_class=singlecellmultiomics.molecule.Molecule,
204
            fragment_class=singlecellmultiomics.fragment.Fragment
205
            )
206
            pass
207
208
    def test_every_fragment_as_molecule(self):
209
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
210
            for i,m in enumerate(singlecellmultiomics.molecule.MoleculeIterator(
211
            alignments=f,
212
            molecule_class=singlecellmultiomics.molecule.Molecule,
213
            fragment_class=singlecellmultiomics.fragment.Fragment,
214
            every_fragment_as_molecule=True
215
            )):
216
                pass
217
            self.assertEqual(i,340)
218
219
    def test_every_fragment_as_molecule_np_iterator(self):
220
221
        to_be_found = set()
222
        amount_of_r1s_to_be_found=0
223
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
224
            for read in f:
225
                if read.is_read1 and not read.is_read2:
226
                    to_be_found.add(read.query_name)
227
                    amount_of_r1s_to_be_found+=1
228
        found=set()
229
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
230
            frag_count=0
231
            for i,m in enumerate(singlecellmultiomics.molecule.MoleculeIterator(
232
            alignments=f,
233
            molecule_class=singlecellmultiomics.molecule.Molecule,
234
            fragment_class=singlecellmultiomics.fragment.Fragment,
235
            every_fragment_as_molecule=True,
236
            yield_invalid=True,
237
            iterator_class=pysamiterators.MatePairIteratorIncludingNonProper
238
            )):
239
                if m[0].has_R1():
240
                    frag_count+=1
241
                    found.add( m[0][0].query_name )
242
243
244
            diff = to_be_found.difference( found )
245
            tm = found.difference(to_be_found)
246
            print('missed:')
247
            print(diff)
248
            print('too much:')
249
            print(tm)
250
            self.assertEqual( len(diff), 0)
251
            self.assertEqual(frag_count,amount_of_r1s_to_be_found)
252
            self.assertEqual(frag_count,293)
253
254
255
256
257
258
    def test_Molecule_repr_stability(self):
259
        """Test if the molecule representation function is stable"""
260
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
261
            it = singlecellmultiomics.molecule.MoleculeIterator(
262
            alignments=f,
263
            molecule_class=singlecellmultiomics.molecule.Molecule,
264
            fragment_class=singlecellmultiomics.fragment.Fragment
265
            )
266
            for molecule in it:
267
                str(molecule)
268
    def test_iterable_molecule_iter(self):
269
270
        from singlecellmultiomics.molecule import MoleculeIterator
271
        from singlecellmultiomics.fragment import Fragment
272
273
        with pysam.AlignmentFile('test.sam','w',reference_names=['chr1','chr2'],reference_lengths=[1000,1000]) as test_sam:
274
            read_A = pysam.AlignedSegment(test_sam.header)
275
            read_A.set_tag('SM','CELL_1')
276
            read_A.set_tag('RX','CAT')
277
            read_A.reference_name = 'chr1'
278
            read_A.reference_start = 100
279
            read_A.query_sequence = 'ATCGGG'
280
            read_A.cigarstring = '6M'
281
            read_A.mapping_quality = 60
282
283
            read_B = pysam.AlignedSegment(test_sam.header)
284
            read_B.set_tag('SM','CELL_1')
285
            read_B.set_tag('RX','CAT')
286
            read_B.reference_name = 'chr1'
287
            read_B.reference_start = 100
288
            read_B.query_sequence = 'ATCGG'
289
            read_B.cigarstring = '5M'
290
            read_B.mapping_quality = 60
291
292
            read_C = pysam.AlignedSegment(test_sam.header)
293
            read_C.set_tag('SM','CELL_2')
294
            read_C.set_tag('RX','CAT')
295
            read_C.reference_name = 'chr1'
296
            read_C.reference_start = 100
297
            read_C.query_sequence = 'ATCGG'
298
            read_C.cigarstring = '5M'
299
            read_C.mapping_quality = 60
300
301
            reads = [  read_A,read_B,read_C ]
302
            mi = MoleculeIterator( reads , yield_invalid=True)
303
            molecules=[]
304
            for molecule in mi:
305
                molecules.append(molecule)
306
307
            self.assertEqual(len(molecules),2)
308
            self.assertEqual(max( (len(m) for m in molecules) ),2)
309
            self.assertEqual(min( (len(m) for m in molecules) ),1)
310
311
            # Test tags:
312
            a = molecules[0]
313
            a.write_tags()
314
            self.assertEqual(a[0][0].get_tag('TF') , 2)
315
316
        os.remove('test.sam')
317
    def test_NLA_Molecule_repr_stability(self):
318
        """Test if the molecule representation function is stable"""
319
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
320
            it = singlecellmultiomics.molecule.MoleculeIterator(
321
            alignments=f,
322
            molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
323
            fragment_class=singlecellmultiomics.fragment.NlaIIIFragment
324
            )
325
            for molecule in it:
326
                str(molecule)
327
328
    def test_NLA_Molecule_ref_id(self):
329
        """Test if the molecule representation function is stable"""
330
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
331
            it = singlecellmultiomics.molecule.MoleculeIterator(
332
            alignments=f,
333
            molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
334
            fragment_class=singlecellmultiomics.fragment.NlaIIIFragment
335
            )
336
            for molecule in it:
337
                self.assertEqual( molecule.get_a_reference_id(),0)
338
339
340
341
    def test_NLA_molecule_iterator_stability(self):
342
        """Test if the simplest molecule iterator works"""
343
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
344
            it = singlecellmultiomics.molecule.MoleculeIterator(
345
            alignments=f,
346
            molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
347
            fragment_class=singlecellmultiomics.fragment.NlaIIIFragment
348
            )
349
            pass
350
351
    def test_consensus(self):
352
        """Test if a right consensus sequence can be produced from a noisy molecule"""
353
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
354
            it = singlecellmultiomics.molecule.MoleculeIterator(
355
            alignments=f,
356
            molecule_class=singlecellmultiomics.molecule.Molecule,
357
            fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
358
            fragment_class_args={
359
                'R1_primer_length':0,
360
                'R2_primer_length':6,
361
            }
362
            )
363
            for molecule in iter(it):
364
                #print(molecule.get_sample())
365
                if  molecule.get_sample()=='APKS3-P19-1-1_91':
366
                    break
367
368
            self.assertEqual(''.join( list(molecule.get_consensus().values()) ), 'CATGAGTTAGATATGGACTCTTCTTCAGACACTTTGTTTAAATTTTAAATTTTTTTCTGATTGCATATTACTAAAAATGTGTTATGAATATTTTCCATATCATTAAACATTCTTCTCAAGCATAACTTTAAATAACTGCATTATAGAAAATTTACGCTACTTTTGTTTTTGTTTTTTTTTTTTTTTTTTTACTATTATTAATAACACGGTGG')
369
370
    def test_fragment_sizes(self):
371
372
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
373
            for molecule in singlecellmultiomics.molecule.MoleculeIterator(
374
                alignments=f,
375
                molecule_class=singlecellmultiomics.molecule.Molecule,
376
                fragment_class_args={
377
                    'R1_primer_length':4,
378
                    'R2_primer_length':6,
379
                },
380
381
                fragment_class=singlecellmultiomics.fragment.NlaIIIFragment):
382
383
                # This is a dovetailed molecule, both R1 and R2 overshoot into the adapter
384
                if molecule.sample=='APKS2-P18-2-1_66' and molecule.umi=='CGC':
385
                    # The non-dovetailed region of this molecule is
386
                    # 164834866-164834975
387
                    # This means a fragment size of 109,
388
                    # the 4 bases of the CATG are not counted.
389
                    # the 6 bases of the random primer are also not counted
390
                    # Resulting in a fragment size of 109 - 10 = 101
391
392
                    start, end = molecule[0].get_safe_span()
393
                    self.assertEqual(
394
                        abs(end-start)
395
                        , 107)
396
                    self.assertEqual(
397
                        molecule.get_safely_aligned_length()
398
                        , 107)
399
400
401
    def test_get_match_mismatch_frequency(self):
402
        """Test if the matches and mismatches of reads in a molecule are counted properly"""
403
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
404
            it = singlecellmultiomics.molecule.MoleculeIterator(
405
            alignments=f,
406
            molecule_class=singlecellmultiomics.molecule.Molecule,
407
            fragment_class_args={
408
                'R1_primer_length':0,
409
                'R2_primer_length':0,
410
            },
411
412
            fragment_class=singlecellmultiomics.fragment.NlaIIIFragment)
413
            for molecule in iter(it):
414
                #print(molecule.get_sample())
415
                if  molecule.get_sample()=='APKS3-P19-1-1_91':
416
                    break
417
        #print(molecule.get_match_mismatch_frequency())
418
        self.assertEqual( (628, 13), molecule.get_match_mismatch_frequency() )
419
420
    def test_get_consensus_gc_ratio(self):
421
        """Test if the gc ratio of a molecule is properly determined"""
422
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
423
            it = singlecellmultiomics.molecule.MoleculeIterator(
424
            alignments=f,
425
            molecule_class=singlecellmultiomics.molecule.Molecule,
426
            fragment_class=singlecellmultiomics.fragment.Fragment,
427
            fragment_class_args={
428
                'R1_primer_length':0,
429
                'R2_primer_length':0,
430
            }
431
            )
432
            for molecule in iter(it):
433
                #print(molecule.get_sample())
434
                if  molecule.get_sample()=='APKS3-P19-1-1_91':
435
                    break
436
437
        self.assertAlmostEqual(0.23113207547169812, molecule.get_consensus_gc_ratio() )
438
439
440
    def test_deduplication(self):
441
        f= pysam.AlignmentFile('./data/mini_nla_test.bam')
442
        total_frag_count = 0
443
        for molecule in singlecellmultiomics.molecule.MoleculeIterator(
444
            alignments=f,
445
            molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
446
            fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
447
            fragment_class_args={'umi_hamming_distance':0},
448
            pooling_method=0,
449
            yield_invalid=True
450
        ):
451
            if 'APKS2-P14-1-1' in molecule.sample:
452
                for fragment in molecule:
453
454
                    is_dup = False
455
                    for read in fragment.reads:
456
                        if read is not None and read.is_duplicate:
457
                            is_dup = True
458
                    if not is_dup:
459
                        total_frag_count+=1
460
        self.assertEqual(total_frag_count,13)
461
462
    def _pool_test(self, pooling_method=0,hd=0):
463
        for sample in ['AP1-P22-1-1_318','APKS2-P8-2-2_52']:
464
465
            f= pysam.AlignmentFile('./data/mini_nla_test.bam')
466
            it = singlecellmultiomics.molecule.MoleculeIterator(
467
                alignments=f,
468
                molecule_class=singlecellmultiomics.molecule.Molecule,
469
                fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
470
                fragment_class_args={'umi_hamming_distance':hd},
471
                pooling_method=pooling_method
472
            )
473
474
            molecule_count = 0
475
            for molecule in iter(it):
476
                if molecule.get_sample()==sample:
477
                    molecule_count+=1
478
            if hd==0:
479
                # it has one fragment with a sequencing error in the UMI
480
                self.assertEqual(molecule_count,2)
481
            else:
482
                self.assertEqual(molecule_count,1)
483
484
    def test_molecule_pooling_vanilla_exact_umi(self):
485
        self._pool_test(0,0)
486
487
    def test_molecule_pooling_nlaIIIoptim_exact_umi(self):
488
        self._pool_test(1,0)
489
490
    def test_molecule_pooling_vanilla_umi_mismatch(self):
491
        self._pool_test(0,1)
492
493
    def test_molecule_pooling_nlaIIIoptim_umi_mismatch(self):
494
        self._pool_test(1,1)
495
496
    def test_max_associated_fragments(self):
497
498
        for i in range(1,3):
499
            with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
500
                for molecule in singlecellmultiomics.molecule.MoleculeIterator(
501
                    alignments=f,
502
                    molecule_class=singlecellmultiomics.molecule.Molecule,
503
                    fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
504
                    molecule_class_args={'max_associated_fragments':i},
505
                    fragment_class_args={'umi_hamming_distance':1}
506
507
                ):
508
                    self.assertTrue(len(molecule)<= i , f'{i}, {len(molecule)}')
509
510
511
    def test_rt_reaction_counting_HAMMING_1(self):
512
        # This is a dictionary containing molecules and the amount of rt reactions for location chr1:164834865
513
514
        f= pysam.AlignmentFile('./data/mini_nla_test.bam')
515
        it = singlecellmultiomics.molecule.MoleculeIterator(
516
            alignments=f,
517
            molecule_class=singlecellmultiomics.molecule.Molecule,
518
            fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
519
            fragment_class_args={'umi_hamming_distance':1}
520
521
        )
522
523
        hand_curated_truth = {
524
            # hd: 1:
525
            # hard example with N in random primer and N in UMI:
526
            'APKS2-P8-2-2_52':{'rt_count':3},
527
528
            # Simple examples:
529
            'APKS2-P18-1-1_318':{'rt_count':1},
530
            'APKS2-P18-1-1_369':{'rt_count':2},
531
            'APKS2-P18-2-1_66':{'rt_count':1},
532
            'APKS2-P18-2-1_76':{'rt_count':1},
533
            'APKS2-P18-2-1_76':{'rt_count':1},
534
        }
535
536
537
        obtained_rt_count = {}
538
        for molecule in it:
539
            site = molecule.get_cut_site()
540
            if site is not None and site[1]==164834865:
541
                obtained_rt_count[molecule.get_sample()]  = len( molecule.get_rt_reaction_fragment_sizes(max_N_distance=1) )
542
543
        # Validate:
544
        for sample, truth in hand_curated_truth.items():
545
            self.assertEqual( obtained_rt_count.get(sample,0),truth.get('rt_count',-1) )
546
547
548
    def test_rt_reaction_counting_HAMMING_0(self):
549
        # This is a dictionary containing molecules and the amount of rt reactions for location chr1:164834865
550
551
        f= pysam.AlignmentFile('./data/mini_nla_test.bam')
552
        it = singlecellmultiomics.molecule.MoleculeIterator(
553
            alignments=f,
554
            molecule_class=singlecellmultiomics.molecule.Molecule,
555
            fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
556
            fragment_class_args={'umi_hamming_distance':1}
557
558
        )
559
560
        hand_curated_truth = {
561
            # hd: 0:
562
            # hard example with N in random primer and N in UMI:
563
            'APKS2-P8-2-2_52':{'rt_count':3},
564
565
            # Simple examples:
566
            'APKS2-P18-1-1_318':{'rt_count':1},
567
            'APKS2-P18-1-1_369':{'rt_count':2},
568
            'APKS2-P18-2-1_66':{'rt_count':1},
569
            'APKS2-P18-2-1_76':{'rt_count':1},
570
            'APKS2-P18-2-1_76':{'rt_count':1},
571
        }
572
573
574
        obtained_rt_count = {}
575
        for molecule in it:
576
            site = molecule.get_cut_site()
577
            if site is not None and site[1]==164834865:
578
                obtained_rt_count[molecule.get_sample()]  = len( molecule.get_rt_reaction_fragment_sizes(max_N_distance=0) )
579
580
        # Validate:
581
        for sample, truth in hand_curated_truth.items():
582
            self.assertEqual( obtained_rt_count.get(sample,0),truth.get('rt_count',-1) )
583
584
585
    def test_feature_molecule(self):
586
        import singlecellmultiomics.features
587
        features = singlecellmultiomics.features.FeatureContainer()
588
        features.addFeature(chromosome='chr1',start=164835366, end=164835535, data=('test4'),name='test4')
589
        features.sort()
590
591
        f= pysam.AlignmentFile('./data/mini_nla_test.bam')
592
        it = singlecellmultiomics.molecule.MoleculeIterator(
593
            alignments=f,
594
            molecule_class=singlecellmultiomics.molecule.AnnotatedNLAIIIMolecule,
595
            molecule_class_args={'features':features},
596
            fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
597
            fragment_class_args={'umi_hamming_distance':1}
598
        )
599
600
        hit_count = 0
601
        for molecule in it:
602
            molecule.annotate()
603
            if len(molecule.hits):
604
                hit_count+=1
605
        self.assertEqual(hit_count,2)
606
607
    """
608
    def test_classification_consensus(self):
609
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f, pysam.AlignmentFile('./data/consensus_write_test.bam','wb',header=f.header) as target_bam:
610
            molecule_iterator =  singlecellmultiomics.molecule.MoleculeIterator(
611
                alignments=f,
612
                molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
613
                fragment_class=singlecellmultiomics.fragment.NlaIIIFragment
614
            )
615
616
            classifier = singlecellmultiomics.molecule.train_consensus_model(
617
                        molecule_iterator,
618
                        mask_variants=None,
619
                        skip_already_covered_bases=False,
620
                        n_train=100)
621
622
            molecule_iterator =  singlecellmultiomics.molecule.MoleculeIterator(
623
                alignments=f,
624
                molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
625
                fragment_class=singlecellmultiomics.fragment.NlaIIIFragment
626
            )
627
628
            for i,molecule in enumerate(molecule_iterator):
629
                read_name=f'consensus_{i}'
630
                reads = molecule.deduplicate_to_single_CIGAR_spaced(target_bam,
631
                read_name, classifier,reference=None)
632
633
                read = molecule.deduplicate_to_single(target_bam)
634
635
                consensus = molecule.get_consensus(classifier)
636
                """
637
if __name__ == '__main__':
638
    unittest.main()