Diff of /tests/test_bam_util.py [000000] .. [2c420a]

Switch to unified view

a b/tests/test_bam_util.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
import unittest
4
import singlecellmultiomics.molecule
5
import singlecellmultiomics.fragment
6
import pysam
7
import pysamiterators.iterators
8
from singlecellmultiomics.bamProcessing import sorted_bam_file,write_program_tag,verify_and_fix_bam
9
from singlecellmultiomics.bamProcessing.bamExtractSamples import extract_samples
10
import os
11
import sys
12
from shutil import copyfile,rmtree
13
from singlecellmultiomics.bamProcessing import get_contigs_with_reads
14
from singlecellmultiomics.utils.sequtils import pick_best_base_call
15
16
class TestFunctions(unittest.TestCase):
17
18
    def test_get_contigs_with_reads_1(self):
19
        cwr = list(get_contigs_with_reads('./data/mini_nla_test.bam'))
20
        self.assertEqual(len(cwr), 1)
21
        self.assertIn('chr1', cwr)
22
23
    def test_get_contigs_with_reads_2(self):
24
        cwr = list(get_contigs_with_reads('./data/chic_test_region.bam'))
25
        self.assertEqual(len(cwr), 1)
26
        self.assertIn('8', cwr)
27
28
class TestSorted(unittest.TestCase):
29
30
    def test_verify_and_fix_bam_autoindex(self):
31
        file_path_without_index = './data/temp_without_index.bam'
32
        copyfile('./data/mini_nla_test.bam',file_path_without_index)
33
34
        try:
35
            os.remove(file_path_without_index+'.bai')
36
        except Exception as e:
37
            pass
38
39
        verify_and_fix_bam(file_path_without_index)
40
41
        self.assertTrue( os.path.exists(file_path_without_index+'.bai') )
42
43
        with pysam.AlignmentFile(file_path_without_index) as f:
44
            i =0
45
            # Test if the file has reads.
46
            for read in f.fetch(contig='chr1'):
47
                if read.is_read1:
48
                    i+=1
49
            self.assertEqual(i, 293)
50
51
        try:
52
            os.remove(file_path_without_index)
53
        except Exception as e:
54
            pass
55
56
        try:
57
            os.remove(file_path_without_index+'.bai')
58
        except Exception as e:
59
            pass
60
61
62
    def test_write_to_sorted(self):
63
        write_path = './data/write_test.bam'
64
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
65
            with sorted_bam_file(write_path, origin_bam=f) as out:
66
                for molecule in singlecellmultiomics.molecule.MoleculeIterator(
67
                    alignments=f,
68
                    molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
69
                    fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
70
                    fragment_class_args={'umi_hamming_distance':0},
71
                    pooling_method=0,
72
                    yield_invalid=True
73
                ):
74
                    molecule.write_pysam(out)
75
76
        self.assertTrue(os.path.exists(write_path))
77
        try:
78
            os.remove(write_path)
79
            os.remove(write_path+'.bai')
80
        except Exception as e:
81
            pass
82
83
    def test_write_to_sorted_custom_compression(self):
84
        write_path = './data/write_test.bam'
85
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
86
            with sorted_bam_file(write_path, origin_bam=f,fast_compression=True) as out:
87
                for molecule in singlecellmultiomics.molecule.MoleculeIterator(
88
                    alignments=f,
89
                    molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
90
                    fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
91
                    fragment_class_args={'umi_hamming_distance':0},
92
                    pooling_method=0,
93
                    yield_invalid=True
94
                ):
95
                    molecule.write_pysam(out)
96
97
        self.assertTrue(os.path.exists(write_path))
98
        try:
99
            os.remove(write_path)
100
            os.remove(write_path+'.bai')
101
        except Exception as e:
102
            pass
103
104
    def test_write_to_sorted_non_existing_folder(self):
105
        write_folder =  './data/non_yet_existing_folder/'
106
        write_path = write_folder + 'write_test.bam'
107
        if os.path.exists(write_path):
108
            os.remove(write_path)
109
110
        rmtree(write_folder, ignore_errors=True)
111
112
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
113
            with sorted_bam_file(write_path, origin_bam=f) as out:
114
                for molecule in singlecellmultiomics.molecule.MoleculeIterator(
115
                    alignments=f,
116
                    molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
117
                    fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
118
                    fragment_class_args={'umi_hamming_distance':0},
119
                    pooling_method=0,
120
                    yield_invalid=True
121
                ):
122
                    molecule.write_pysam(out)
123
124
        self.assertTrue(os.path.exists(write_path))
125
126
        with pysam.AlignmentFile(write_path) as f:
127
            i =0
128
            # Test if the file has reads.
129
            for read in f:
130
                if read.is_read1:
131
                    i+=1
132
            self.assertEqual(i, 293)
133
134
        try:
135
            os.remove(write_path)
136
            os.remove(write_path+'.bai')
137
        except Exception as e:
138
            pass
139
140
        rmtree(write_folder, ignore_errors=True)
141
142
    def test_write_to_read_grouped_sorted(self):
143
        write_path = './data/write_test_rg.bam'
144
        read_groups = set()
145
        with pysam.AlignmentFile('./data/mini_nla_test.bam') as f:
146
147
            input_header = f.header.as_dict()
148
            write_program_tag(input_header,
149
                program_name='test_bam_util_test1',
150
                command_line = " ".join(sys.argv),
151
                version = singlecellmultiomics.__version__,
152
                description = f'a description'
153
                )
154
155
            write_program_tag(input_header,
156
                program_name='test_bam_util_test2',
157
                command_line = " ".join(sys.argv),
158
                version = singlecellmultiomics.__version__,
159
                description = f'a description'
160
                )
161
            #print([x for x in input_header['PG'] if not 'bwa mem' in x.get('CL','')])
162
            with sorted_bam_file(write_path, header=input_header,read_groups=read_groups) as out:
163
                for molecule in singlecellmultiomics.molecule.MoleculeIterator(
164
                    alignments=f,
165
                    molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule,
166
                    fragment_class=singlecellmultiomics.fragment.NlaIIIFragment,
167
                    fragment_class_args={'umi_hamming_distance':0},
168
                    pooling_method=0,
169
                    yield_invalid=True
170
                ):
171
                    molecule.write_pysam(out)
172
                    for frag in molecule:
173
                        read_groups.add( frag.get_read_group() )
174
175
        self.assertTrue(os.path.exists(write_path))
176
177
        # Now test if the program tag is there...
178
        with pysam.AlignmentFile(write_path) as f:
179
            self.assertTrue( 1==len([x for x in f.header['PG'] if 'test_bam_util_test1' in x.get('PN','')]) )
180
            self.assertTrue( 1==len([x for x in f.header['PG'] if 'test_bam_util_test2' in x.get('PN','')]) )
181
            i =0
182
183
            # Test if the file has reads.
184
            for read in f:
185
                if read.is_read1:
186
                    i+=1
187
            self.assertEqual(i, 293)
188
189
        try:
190
            os.remove(write_path)
191
        except Exception as e:
192
            pass
193
194
        try:
195
            os.remove(write_path+'.bai')
196
        except Exception as e:
197
            pass
198
199
200
    def  test_sample_extraction(self):
201
202
        output_path= './data/write_test_extract.bam'
203
        final_output_paths= ['./data/write_test_extract0.bam', './data/write_test_extract1.bam']
204
205
        capture_samples = {'0':['APKS1P25-NLAP2L1_30','APKS1P25-NLAP2L1_31'],
206
        # 4 reads expected
207
         '1':['APKS1P25-NLAP2L2_86']}
208
         #samtools view ./data/mini_nla_test.bam | grep 'APKS1P25-NLAP2L2_86' | wc -l -> 8
209
        extract_samples( './data/mini_nla_test.bam', output_path, capture_samples )
210
211
        with pysam.AlignmentFile(final_output_paths[0]) as f:
212
            for i,_ in enumerate(f):
213
                pass
214
            self.assertEqual(i, 4-1)
215
216
        with pysam.AlignmentFile(final_output_paths[1]) as f:
217
            for i,_ in enumerate(f):
218
                pass
219
            self.assertEqual(i, 8-1)
220
221
222
        for p in final_output_paths:
223
            try:
224
                os.remove(p)
225
                os.remove(f'{p}.bai')
226
            except Exception as e:
227
                pass
228
229
class TestBaseCalling(unittest.TestCase):
230
231
    def test_pick_best(self):
232
233
        self.assertTrue( pick_best_base_call( ('A',32), ('C',22) ) == ('A', 32) )
234
        self.assertTrue( pick_best_base_call( ('C',22), ('A',32) ) == ('A', 32))
235
        self.assertTrue( pick_best_base_call( ('C',32), ('A',32) ) == ('N',0) )
236
237
238
239
240
241
if __name__ == '__main__':
242
    unittest.main()