|
a |
|
b/tests/test_molecule.py |
|
|
1 |
#!/usr/bin/env python3 |
|
|
2 |
# -*- coding: utf-8 -*- |
|
|
3 |
import unittest |
|
|
4 |
import singlecellmultiomics.molecule |
|
|
5 |
import singlecellmultiomics.fragment |
|
|
6 |
import pysam |
|
|
7 |
import pysamiterators.iterators |
|
|
8 |
import os |
|
|
9 |
|
|
|
10 |
from singlecellmultiomics.molecule import MoleculeIterator, CHICMolecule |
|
|
11 |
from singlecellmultiomics.fragment import CHICFragment |
|
|
12 |
|
|
|
13 |
""" |
|
|
14 |
These tests check if the Molecule module is working correctly |
|
|
15 |
""" |
|
|
16 |
|
|
|
17 |
class TestMolecule(unittest.TestCase): |
|
|
18 |
|
|
|
19 |
def test_chic_cigar_dedup(self): |
|
|
20 |
i = 0 |
|
|
21 |
with pysam.AlignmentFile('./data/chic_test_region.bam') as alignments: |
|
|
22 |
|
|
|
23 |
for molecule in MoleculeIterator(alignments,CHICMolecule, CHICFragment): |
|
|
24 |
i+=1 |
|
|
25 |
|
|
|
26 |
self.assertEqual(i,1) |
|
|
27 |
|
|
|
28 |
|
|
|
29 |
def test_eq(self): |
|
|
30 |
|
|
|
31 |
|
|
|
32 |
def get_chic_read(header, qname, contig ='chr1', start=100, sequence='ATCGGG', cigar=None, umi='CAT', sample='CELL_1', is_reverse=False, read1=True, paired=False,proper_pair=True ): |
|
|
33 |
read = pysam.AlignedSegment(header) |
|
|
34 |
read.set_tag('SM',sample) # The sample to which the sample belongs is extracted from the SM tag |
|
|
35 |
read.set_tag('RX',umi) # The UMI is extracted from the RX tag |
|
|
36 |
read.set_tag('MX','scCHIC') |
|
|
37 |
# By default the molecule assignment is done based on the mapping location of read 1: |
|
|
38 |
read.reference_name = contig |
|
|
39 |
read.reference_start = start |
|
|
40 |
read.query_name = qname |
|
|
41 |
read.query_sequence = sequence |
|
|
42 |
read.is_reverse = is_reverse |
|
|
43 |
read.cigarstring = f'{len(sequence)}M' if cigar is None else cigar |
|
|
44 |
if read1: |
|
|
45 |
read.is_read1 = True |
|
|
46 |
read.is_read2 = False |
|
|
47 |
else: |
|
|
48 |
read.is_read1 = False |
|
|
49 |
read.is_read2 = True |
|
|
50 |
if paired: |
|
|
51 |
read.is_paired = True |
|
|
52 |
read.is_proper_pair = proper_pair |
|
|
53 |
return read |
|
|
54 |
|
|
|
55 |
from singlecellmultiomics.molecule import Molecule |
|
|
56 |
from singlecellmultiomics.fragment import Fragment |
|
|
57 |
import pysam |
|
|
58 |
# Create sam file to write some reads to: |
|
|
59 |
with pysam.AlignmentFile('test.sam','w',reference_names=['chr1','chr2'],reference_lengths=[1000,1000]) as test_sam: |
|
|
60 |
|
|
|
61 |
read_A = get_chic_read(test_sam.header, 'read_A') |
|
|
62 |
read_B = get_chic_read(test_sam.header, 'read_B', start=102) |
|
|
63 |
read_C = get_chic_read(test_sam.header, 'read_C', start=110) |
|
|
64 |
# A reverse read |
|
|
65 |
read_D = get_chic_read(test_sam.header, 'read_D', start=110-5, is_reverse=True) |
|
|
66 |
# A different sample read |
|
|
67 |
read_E = get_chic_read(test_sam.header, 'read_E', start=110-5, is_reverse=True, sample='CELL_2') |
|
|
68 |
|
|
|
69 |
# A umi error read |
|
|
70 |
read_F = get_chic_read(test_sam.header, 'read_F', start=110-5, is_reverse=True, sample='CELL_2', umi='CAG') |
|
|
71 |
|
|
|
72 |
# A proper pair |
|
|
73 |
read_G_a = get_chic_read(test_sam.header, 'read_G', start=102, paired=True) |
|
|
74 |
read_G_b = get_chic_read(test_sam.header, 'read_G', start=130, read1=False, paired=True, is_reverse=True) |
|
|
75 |
|
|
|
76 |
# A stale R2 |
|
|
77 |
read_H = get_chic_read(test_sam.header, 'read_H', start=130, paired=True, proper_pair=False, read1=False) |
|
|
78 |
|
|
|
79 |
# Read softclipped from start |
|
|
80 |
read_I = get_chic_read(test_sam.header, 'soft_clipped_read', start=104, cigar='2S4M') |
|
|
81 |
|
|
|
82 |
frag_A = Fragment([read_A],umi_hamming_distance=0,assignment_radius=2) |
|
|
83 |
frag_B = Fragment([read_B],umi_hamming_distance=0,assignment_radius=2) |
|
|
84 |
frag_C = Fragment([read_C],umi_hamming_distance=0,assignment_radius=2) |
|
|
85 |
frag_D = Fragment([read_D],umi_hamming_distance=0,assignment_radius=2) |
|
|
86 |
frag_E = Fragment([read_E],umi_hamming_distance=0,assignment_radius=2) |
|
|
87 |
frag_F = Fragment([read_F],umi_hamming_distance=0,assignment_radius=2) |
|
|
88 |
frag_G = Fragment([read_G_a, read_G_b],umi_hamming_distance=0,assignment_radius=2) |
|
|
89 |
|
|
|
90 |
self.assertTrue(frag_A==frag_B) |
|
|
91 |
self.assertTrue(frag_A==frag_G) |
|
|
92 |
self.assertFalse(frag_A==frag_C) |
|
|
93 |
self.assertFalse(frag_A==frag_D) |
|
|
94 |
self.assertFalse(frag_C==frag_D) |
|
|
95 |
self.assertFalse(frag_E==frag_D) |
|
|
96 |
|
|
|
97 |
frag_A = CHICFragment([read_A],umi_hamming_distance=0,assignment_radius=2) |
|
|
98 |
frag_B = CHICFragment([read_B],umi_hamming_distance=0,assignment_radius=2) |
|
|
99 |
frag_C = CHICFragment([read_C],umi_hamming_distance=0,assignment_radius=2) |
|
|
100 |
frag_D = CHICFragment([read_D],umi_hamming_distance=0,assignment_radius=2) |
|
|
101 |
frag_E = CHICFragment([read_E],umi_hamming_distance=1,assignment_radius=2) |
|
|
102 |
frag_F = CHICFragment([read_F],umi_hamming_distance=1,assignment_radius=2) |
|
|
103 |
frag_G = CHICFragment([read_G_a, read_G_b],umi_hamming_distance=0,assignment_radius=2) |
|
|
104 |
frag_I = CHICFragment([read_I],umi_hamming_distance=1,assignment_radius=2) |
|
|
105 |
|
|
|
106 |
|
|
|
107 |
frag_H = CHICFragment([None, read_H],umi_hamming_distance=1,assignment_radius=2) |
|
|
108 |
self.assertFalse(frag_H.is_valid()) |
|
|
109 |
self.assertTrue(frag_A.is_valid()) |
|
|
110 |
self.assertTrue(frag_B.is_valid()) |
|
|
111 |
|
|
|
112 |
self.assertTrue(frag_A==frag_B) |
|
|
113 |
|
|
|
114 |
self.assertEqual(frag_G, frag_I) |
|
|
115 |
self.assertFalse(frag_A==frag_C) |
|
|
116 |
self.assertFalse(frag_A==frag_D) |
|
|
117 |
self.assertFalse(frag_C==frag_D) |
|
|
118 |
self.assertFalse(frag_E==frag_D) |
|
|
119 |
|
|
|
120 |
|
|
|
121 |
self.assertTrue(frag_A==frag_B) |
|
|
122 |
self.assertFalse(frag_A==frag_C) |
|
|
123 |
|
|
|
124 |
molecule_A = CHICMolecule([frag_A]) |
|
|
125 |
print(molecule_A.site_location[1]) |
|
|
126 |
self.assertEqual(molecule_A.site_location[1],98) |
|
|
127 |
self.assertTrue(molecule_A==frag_B) |
|
|
128 |
self.assertFalse(molecule_A==frag_C) |
|
|
129 |
|
|
|
130 |
# Test construction of molecule: |
|
|
131 |
self.assertTrue( molecule_A.add_fragment(frag_B) ) |
|
|
132 |
self.assertEqual(len(molecule_A),2) |
|
|
133 |
|
|
|
134 |
# Frag C cannot be added: |
|
|
135 |
self.assertFalse( molecule_A.add_fragment(frag_C) ) |
|
|
136 |
self.assertEqual(len(molecule_A),2) |
|
|
137 |
|
|
|
138 |
# Frag D cannot be added: |
|
|
139 |
self.assertFalse( molecule_A.add_fragment(frag_D) ) |
|
|
140 |
self.assertEqual(len(molecule_A),2) |
|
|
141 |
|
|
|
142 |
# Test moving of site location by read which is aligned more to left: |
|
|
143 |
molecule = CHICMolecule([frag_B]) |
|
|
144 |
self.assertEqual(molecule.site_location[1],100) |
|
|
145 |
self.assertTrue( molecule.add_fragment(frag_A) ) |
|
|
146 |
self.assertEqual(molecule.site_location[1],98) |
|
|
147 |
|
|
|
148 |
# Test moving of site location by read which is aligned more to right: (Reverse strand) |
|
|
149 |
molecule = CHICMolecule([frag_B]) |
|
|
150 |
self.assertEqual(molecule.site_location[1],100) |
|
|
151 |
self.assertTrue( molecule.add_fragment(frag_A) ) |
|
|
152 |
self.assertEqual(molecule.site_location[1],98) |
|
|
153 |
|
|
|
154 |
# Test umi distance: |
|
|
155 |
molecule = CHICMolecule([frag_E]) |
|
|
156 |
self.assertTrue( molecule.add_fragment(frag_F) ) |
|
|
157 |
read_F.set_tag('RX','GGG') |
|
|
158 |
frag_G = CHICFragment([read_F],umi_hamming_distance=1,assignment_radius=2) |
|
|
159 |
self.assertFalse( molecule.add_fragment(frag_G) ) |
|
|
160 |
|
|
|
161 |
|
|
|
162 |
# Perform iteration |
|
|
163 |
reads = [read_A, read_B, read_C, read_D, read_E, read_F, (read_G_a, read_G_b), (None,read_H), (read_I,None)] |
|
|
164 |
molecules = list(MoleculeIterator( reads, |
|
|
165 |
molecule_class = CHICMolecule, |
|
|
166 |
fragment_class = CHICFragment, |
|
|
167 |
yield_invalid=True, |
|
|
168 |
fragment_class_args={ |
|
|
169 |
'assignment_radius':2, |
|
|
170 |
'umi_hamming_distance':1 |
|
|
171 |
})) |
|
|
172 |
|
|
|
173 |
recovered_reads = [] |
|
|
174 |
for molecule in molecules: |
|
|
175 |
for read in molecule.iter_reads(): |
|
|
176 |
recovered_reads.append(read) |
|
|
177 |
print(read.query_name) |
|
|
178 |
self.assertEqual(len(recovered_reads),10) |
|
|
179 |
|
|
|
180 |
os.remove('test.sam') |
|
|
181 |
|
|
|
182 |
|
|
|
183 |
|
|
|
184 |
def test_chic_nocigar_dedup(self): |
|
|
185 |
i = 0 |
|
|
186 |
with pysam.AlignmentFile('./data/chic_test_region.bam') as alignments: |
|
|
187 |
for molecule in MoleculeIterator(alignments,CHICMolecule, CHICFragment,fragment_class_args={'no_umi_cigar_processing':True}): |
|
|
188 |
i+=1 |
|
|
189 |
self.assertEqual(i,2) |
|
|
190 |
|
|
|
191 |
def test_a_pysam_iterators(self): |
|
|
192 |
"""Test if the pysamiterators package yields the proper amount or mate pairs""" |
|
|
193 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
194 |
for i,(R1,R2) in enumerate(pysamiterators.iterators.MatePairIterator(f)): |
|
|
195 |
pass |
|
|
196 |
self.assertEqual(i,224) |
|
|
197 |
|
|
|
198 |
def test_molecule_iterator_stability(self): |
|
|
199 |
"""Test if the simplest molecule iterator works""" |
|
|
200 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
201 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
202 |
alignments=f, |
|
|
203 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
204 |
fragment_class=singlecellmultiomics.fragment.Fragment |
|
|
205 |
) |
|
|
206 |
pass |
|
|
207 |
|
|
|
208 |
def test_every_fragment_as_molecule(self): |
|
|
209 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
210 |
for i,m in enumerate(singlecellmultiomics.molecule.MoleculeIterator( |
|
|
211 |
alignments=f, |
|
|
212 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
213 |
fragment_class=singlecellmultiomics.fragment.Fragment, |
|
|
214 |
every_fragment_as_molecule=True |
|
|
215 |
)): |
|
|
216 |
pass |
|
|
217 |
self.assertEqual(i,340) |
|
|
218 |
|
|
|
219 |
def test_every_fragment_as_molecule_np_iterator(self): |
|
|
220 |
|
|
|
221 |
to_be_found = set() |
|
|
222 |
amount_of_r1s_to_be_found=0 |
|
|
223 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
224 |
for read in f: |
|
|
225 |
if read.is_read1 and not read.is_read2: |
|
|
226 |
to_be_found.add(read.query_name) |
|
|
227 |
amount_of_r1s_to_be_found+=1 |
|
|
228 |
found=set() |
|
|
229 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
230 |
frag_count=0 |
|
|
231 |
for i,m in enumerate(singlecellmultiomics.molecule.MoleculeIterator( |
|
|
232 |
alignments=f, |
|
|
233 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
234 |
fragment_class=singlecellmultiomics.fragment.Fragment, |
|
|
235 |
every_fragment_as_molecule=True, |
|
|
236 |
yield_invalid=True, |
|
|
237 |
iterator_class=pysamiterators.MatePairIteratorIncludingNonProper |
|
|
238 |
)): |
|
|
239 |
if m[0].has_R1(): |
|
|
240 |
frag_count+=1 |
|
|
241 |
found.add( m[0][0].query_name ) |
|
|
242 |
|
|
|
243 |
|
|
|
244 |
diff = to_be_found.difference( found ) |
|
|
245 |
tm = found.difference(to_be_found) |
|
|
246 |
print('missed:') |
|
|
247 |
print(diff) |
|
|
248 |
print('too much:') |
|
|
249 |
print(tm) |
|
|
250 |
self.assertEqual( len(diff), 0) |
|
|
251 |
self.assertEqual(frag_count,amount_of_r1s_to_be_found) |
|
|
252 |
self.assertEqual(frag_count,293) |
|
|
253 |
|
|
|
254 |
|
|
|
255 |
|
|
|
256 |
|
|
|
257 |
|
|
|
258 |
def test_Molecule_repr_stability(self): |
|
|
259 |
"""Test if the molecule representation function is stable""" |
|
|
260 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
261 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
262 |
alignments=f, |
|
|
263 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
264 |
fragment_class=singlecellmultiomics.fragment.Fragment |
|
|
265 |
) |
|
|
266 |
for molecule in it: |
|
|
267 |
str(molecule) |
|
|
268 |
def test_iterable_molecule_iter(self): |
|
|
269 |
|
|
|
270 |
from singlecellmultiomics.molecule import MoleculeIterator |
|
|
271 |
from singlecellmultiomics.fragment import Fragment |
|
|
272 |
|
|
|
273 |
with pysam.AlignmentFile('test.sam','w',reference_names=['chr1','chr2'],reference_lengths=[1000,1000]) as test_sam: |
|
|
274 |
read_A = pysam.AlignedSegment(test_sam.header) |
|
|
275 |
read_A.set_tag('SM','CELL_1') |
|
|
276 |
read_A.set_tag('RX','CAT') |
|
|
277 |
read_A.reference_name = 'chr1' |
|
|
278 |
read_A.reference_start = 100 |
|
|
279 |
read_A.query_sequence = 'ATCGGG' |
|
|
280 |
read_A.cigarstring = '6M' |
|
|
281 |
read_A.mapping_quality = 60 |
|
|
282 |
|
|
|
283 |
read_B = pysam.AlignedSegment(test_sam.header) |
|
|
284 |
read_B.set_tag('SM','CELL_1') |
|
|
285 |
read_B.set_tag('RX','CAT') |
|
|
286 |
read_B.reference_name = 'chr1' |
|
|
287 |
read_B.reference_start = 100 |
|
|
288 |
read_B.query_sequence = 'ATCGG' |
|
|
289 |
read_B.cigarstring = '5M' |
|
|
290 |
read_B.mapping_quality = 60 |
|
|
291 |
|
|
|
292 |
read_C = pysam.AlignedSegment(test_sam.header) |
|
|
293 |
read_C.set_tag('SM','CELL_2') |
|
|
294 |
read_C.set_tag('RX','CAT') |
|
|
295 |
read_C.reference_name = 'chr1' |
|
|
296 |
read_C.reference_start = 100 |
|
|
297 |
read_C.query_sequence = 'ATCGG' |
|
|
298 |
read_C.cigarstring = '5M' |
|
|
299 |
read_C.mapping_quality = 60 |
|
|
300 |
|
|
|
301 |
reads = [ read_A,read_B,read_C ] |
|
|
302 |
mi = MoleculeIterator( reads , yield_invalid=True) |
|
|
303 |
molecules=[] |
|
|
304 |
for molecule in mi: |
|
|
305 |
molecules.append(molecule) |
|
|
306 |
|
|
|
307 |
self.assertEqual(len(molecules),2) |
|
|
308 |
self.assertEqual(max( (len(m) for m in molecules) ),2) |
|
|
309 |
self.assertEqual(min( (len(m) for m in molecules) ),1) |
|
|
310 |
|
|
|
311 |
# Test tags: |
|
|
312 |
a = molecules[0] |
|
|
313 |
a.write_tags() |
|
|
314 |
self.assertEqual(a[0][0].get_tag('TF') , 2) |
|
|
315 |
|
|
|
316 |
os.remove('test.sam') |
|
|
317 |
def test_NLA_Molecule_repr_stability(self): |
|
|
318 |
"""Test if the molecule representation function is stable""" |
|
|
319 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
320 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
321 |
alignments=f, |
|
|
322 |
molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule, |
|
|
323 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment |
|
|
324 |
) |
|
|
325 |
for molecule in it: |
|
|
326 |
str(molecule) |
|
|
327 |
|
|
|
328 |
def test_NLA_Molecule_ref_id(self): |
|
|
329 |
"""Test if the molecule representation function is stable""" |
|
|
330 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
331 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
332 |
alignments=f, |
|
|
333 |
molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule, |
|
|
334 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment |
|
|
335 |
) |
|
|
336 |
for molecule in it: |
|
|
337 |
self.assertEqual( molecule.get_a_reference_id(),0) |
|
|
338 |
|
|
|
339 |
|
|
|
340 |
|
|
|
341 |
def test_NLA_molecule_iterator_stability(self): |
|
|
342 |
"""Test if the simplest molecule iterator works""" |
|
|
343 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
344 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
345 |
alignments=f, |
|
|
346 |
molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule, |
|
|
347 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment |
|
|
348 |
) |
|
|
349 |
pass |
|
|
350 |
|
|
|
351 |
def test_consensus(self): |
|
|
352 |
"""Test if a right consensus sequence can be produced from a noisy molecule""" |
|
|
353 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
354 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
355 |
alignments=f, |
|
|
356 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
357 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment, |
|
|
358 |
fragment_class_args={ |
|
|
359 |
'R1_primer_length':0, |
|
|
360 |
'R2_primer_length':6, |
|
|
361 |
} |
|
|
362 |
) |
|
|
363 |
for molecule in iter(it): |
|
|
364 |
#print(molecule.get_sample()) |
|
|
365 |
if molecule.get_sample()=='APKS3-P19-1-1_91': |
|
|
366 |
break |
|
|
367 |
|
|
|
368 |
self.assertEqual(''.join( list(molecule.get_consensus().values()) ), 'CATGAGTTAGATATGGACTCTTCTTCAGACACTTTGTTTAAATTTTAAATTTTTTTCTGATTGCATATTACTAAAAATGTGTTATGAATATTTTCCATATCATTAAACATTCTTCTCAAGCATAACTTTAAATAACTGCATTATAGAAAATTTACGCTACTTTTGTTTTTGTTTTTTTTTTTTTTTTTTTACTATTATTAATAACACGGTGG') |
|
|
369 |
|
|
|
370 |
def test_fragment_sizes(self): |
|
|
371 |
|
|
|
372 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
373 |
for molecule in singlecellmultiomics.molecule.MoleculeIterator( |
|
|
374 |
alignments=f, |
|
|
375 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
376 |
fragment_class_args={ |
|
|
377 |
'R1_primer_length':4, |
|
|
378 |
'R2_primer_length':6, |
|
|
379 |
}, |
|
|
380 |
|
|
|
381 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment): |
|
|
382 |
|
|
|
383 |
# This is a dovetailed molecule, both R1 and R2 overshoot into the adapter |
|
|
384 |
if molecule.sample=='APKS2-P18-2-1_66' and molecule.umi=='CGC': |
|
|
385 |
# The non-dovetailed region of this molecule is |
|
|
386 |
# 164834866-164834975 |
|
|
387 |
# This means a fragment size of 109, |
|
|
388 |
# the 4 bases of the CATG are not counted. |
|
|
389 |
# the 6 bases of the random primer are also not counted |
|
|
390 |
# Resulting in a fragment size of 109 - 10 = 101 |
|
|
391 |
|
|
|
392 |
start, end = molecule[0].get_safe_span() |
|
|
393 |
self.assertEqual( |
|
|
394 |
abs(end-start) |
|
|
395 |
, 107) |
|
|
396 |
self.assertEqual( |
|
|
397 |
molecule.get_safely_aligned_length() |
|
|
398 |
, 107) |
|
|
399 |
|
|
|
400 |
|
|
|
401 |
def test_get_match_mismatch_frequency(self): |
|
|
402 |
"""Test if the matches and mismatches of reads in a molecule are counted properly""" |
|
|
403 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
404 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
405 |
alignments=f, |
|
|
406 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
407 |
fragment_class_args={ |
|
|
408 |
'R1_primer_length':0, |
|
|
409 |
'R2_primer_length':0, |
|
|
410 |
}, |
|
|
411 |
|
|
|
412 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment) |
|
|
413 |
for molecule in iter(it): |
|
|
414 |
#print(molecule.get_sample()) |
|
|
415 |
if molecule.get_sample()=='APKS3-P19-1-1_91': |
|
|
416 |
break |
|
|
417 |
#print(molecule.get_match_mismatch_frequency()) |
|
|
418 |
self.assertEqual( (628, 13), molecule.get_match_mismatch_frequency() ) |
|
|
419 |
|
|
|
420 |
def test_get_consensus_gc_ratio(self): |
|
|
421 |
"""Test if the gc ratio of a molecule is properly determined""" |
|
|
422 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
423 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
424 |
alignments=f, |
|
|
425 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
426 |
fragment_class=singlecellmultiomics.fragment.Fragment, |
|
|
427 |
fragment_class_args={ |
|
|
428 |
'R1_primer_length':0, |
|
|
429 |
'R2_primer_length':0, |
|
|
430 |
} |
|
|
431 |
) |
|
|
432 |
for molecule in iter(it): |
|
|
433 |
#print(molecule.get_sample()) |
|
|
434 |
if molecule.get_sample()=='APKS3-P19-1-1_91': |
|
|
435 |
break |
|
|
436 |
|
|
|
437 |
self.assertAlmostEqual(0.23113207547169812, molecule.get_consensus_gc_ratio() ) |
|
|
438 |
|
|
|
439 |
|
|
|
440 |
def test_deduplication(self): |
|
|
441 |
f= pysam.AlignmentFile('./data/mini_nla_test.bam') |
|
|
442 |
total_frag_count = 0 |
|
|
443 |
for molecule in singlecellmultiomics.molecule.MoleculeIterator( |
|
|
444 |
alignments=f, |
|
|
445 |
molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule, |
|
|
446 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment, |
|
|
447 |
fragment_class_args={'umi_hamming_distance':0}, |
|
|
448 |
pooling_method=0, |
|
|
449 |
yield_invalid=True |
|
|
450 |
): |
|
|
451 |
if 'APKS2-P14-1-1' in molecule.sample: |
|
|
452 |
for fragment in molecule: |
|
|
453 |
|
|
|
454 |
is_dup = False |
|
|
455 |
for read in fragment.reads: |
|
|
456 |
if read is not None and read.is_duplicate: |
|
|
457 |
is_dup = True |
|
|
458 |
if not is_dup: |
|
|
459 |
total_frag_count+=1 |
|
|
460 |
self.assertEqual(total_frag_count,13) |
|
|
461 |
|
|
|
462 |
def _pool_test(self, pooling_method=0,hd=0): |
|
|
463 |
for sample in ['AP1-P22-1-1_318','APKS2-P8-2-2_52']: |
|
|
464 |
|
|
|
465 |
f= pysam.AlignmentFile('./data/mini_nla_test.bam') |
|
|
466 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
467 |
alignments=f, |
|
|
468 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
469 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment, |
|
|
470 |
fragment_class_args={'umi_hamming_distance':hd}, |
|
|
471 |
pooling_method=pooling_method |
|
|
472 |
) |
|
|
473 |
|
|
|
474 |
molecule_count = 0 |
|
|
475 |
for molecule in iter(it): |
|
|
476 |
if molecule.get_sample()==sample: |
|
|
477 |
molecule_count+=1 |
|
|
478 |
if hd==0: |
|
|
479 |
# it has one fragment with a sequencing error in the UMI |
|
|
480 |
self.assertEqual(molecule_count,2) |
|
|
481 |
else: |
|
|
482 |
self.assertEqual(molecule_count,1) |
|
|
483 |
|
|
|
484 |
def test_molecule_pooling_vanilla_exact_umi(self): |
|
|
485 |
self._pool_test(0,0) |
|
|
486 |
|
|
|
487 |
def test_molecule_pooling_nlaIIIoptim_exact_umi(self): |
|
|
488 |
self._pool_test(1,0) |
|
|
489 |
|
|
|
490 |
def test_molecule_pooling_vanilla_umi_mismatch(self): |
|
|
491 |
self._pool_test(0,1) |
|
|
492 |
|
|
|
493 |
def test_molecule_pooling_nlaIIIoptim_umi_mismatch(self): |
|
|
494 |
self._pool_test(1,1) |
|
|
495 |
|
|
|
496 |
def test_max_associated_fragments(self): |
|
|
497 |
|
|
|
498 |
for i in range(1,3): |
|
|
499 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f: |
|
|
500 |
for molecule in singlecellmultiomics.molecule.MoleculeIterator( |
|
|
501 |
alignments=f, |
|
|
502 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
503 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment, |
|
|
504 |
molecule_class_args={'max_associated_fragments':i}, |
|
|
505 |
fragment_class_args={'umi_hamming_distance':1} |
|
|
506 |
|
|
|
507 |
): |
|
|
508 |
self.assertTrue(len(molecule)<= i , f'{i}, {len(molecule)}') |
|
|
509 |
|
|
|
510 |
|
|
|
511 |
def test_rt_reaction_counting_HAMMING_1(self): |
|
|
512 |
# This is a dictionary containing molecules and the amount of rt reactions for location chr1:164834865 |
|
|
513 |
|
|
|
514 |
f= pysam.AlignmentFile('./data/mini_nla_test.bam') |
|
|
515 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
516 |
alignments=f, |
|
|
517 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
518 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment, |
|
|
519 |
fragment_class_args={'umi_hamming_distance':1} |
|
|
520 |
|
|
|
521 |
) |
|
|
522 |
|
|
|
523 |
hand_curated_truth = { |
|
|
524 |
# hd: 1: |
|
|
525 |
# hard example with N in random primer and N in UMI: |
|
|
526 |
'APKS2-P8-2-2_52':{'rt_count':3}, |
|
|
527 |
|
|
|
528 |
# Simple examples: |
|
|
529 |
'APKS2-P18-1-1_318':{'rt_count':1}, |
|
|
530 |
'APKS2-P18-1-1_369':{'rt_count':2}, |
|
|
531 |
'APKS2-P18-2-1_66':{'rt_count':1}, |
|
|
532 |
'APKS2-P18-2-1_76':{'rt_count':1}, |
|
|
533 |
'APKS2-P18-2-1_76':{'rt_count':1}, |
|
|
534 |
} |
|
|
535 |
|
|
|
536 |
|
|
|
537 |
obtained_rt_count = {} |
|
|
538 |
for molecule in it: |
|
|
539 |
site = molecule.get_cut_site() |
|
|
540 |
if site is not None and site[1]==164834865: |
|
|
541 |
obtained_rt_count[molecule.get_sample()] = len( molecule.get_rt_reaction_fragment_sizes(max_N_distance=1) ) |
|
|
542 |
|
|
|
543 |
# Validate: |
|
|
544 |
for sample, truth in hand_curated_truth.items(): |
|
|
545 |
self.assertEqual( obtained_rt_count.get(sample,0),truth.get('rt_count',-1) ) |
|
|
546 |
|
|
|
547 |
|
|
|
548 |
def test_rt_reaction_counting_HAMMING_0(self): |
|
|
549 |
# This is a dictionary containing molecules and the amount of rt reactions for location chr1:164834865 |
|
|
550 |
|
|
|
551 |
f= pysam.AlignmentFile('./data/mini_nla_test.bam') |
|
|
552 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
553 |
alignments=f, |
|
|
554 |
molecule_class=singlecellmultiomics.molecule.Molecule, |
|
|
555 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment, |
|
|
556 |
fragment_class_args={'umi_hamming_distance':1} |
|
|
557 |
|
|
|
558 |
) |
|
|
559 |
|
|
|
560 |
hand_curated_truth = { |
|
|
561 |
# hd: 0: |
|
|
562 |
# hard example with N in random primer and N in UMI: |
|
|
563 |
'APKS2-P8-2-2_52':{'rt_count':3}, |
|
|
564 |
|
|
|
565 |
# Simple examples: |
|
|
566 |
'APKS2-P18-1-1_318':{'rt_count':1}, |
|
|
567 |
'APKS2-P18-1-1_369':{'rt_count':2}, |
|
|
568 |
'APKS2-P18-2-1_66':{'rt_count':1}, |
|
|
569 |
'APKS2-P18-2-1_76':{'rt_count':1}, |
|
|
570 |
'APKS2-P18-2-1_76':{'rt_count':1}, |
|
|
571 |
} |
|
|
572 |
|
|
|
573 |
|
|
|
574 |
obtained_rt_count = {} |
|
|
575 |
for molecule in it: |
|
|
576 |
site = molecule.get_cut_site() |
|
|
577 |
if site is not None and site[1]==164834865: |
|
|
578 |
obtained_rt_count[molecule.get_sample()] = len( molecule.get_rt_reaction_fragment_sizes(max_N_distance=0) ) |
|
|
579 |
|
|
|
580 |
# Validate: |
|
|
581 |
for sample, truth in hand_curated_truth.items(): |
|
|
582 |
self.assertEqual( obtained_rt_count.get(sample,0),truth.get('rt_count',-1) ) |
|
|
583 |
|
|
|
584 |
|
|
|
585 |
def test_feature_molecule(self): |
|
|
586 |
import singlecellmultiomics.features |
|
|
587 |
features = singlecellmultiomics.features.FeatureContainer() |
|
|
588 |
features.addFeature(chromosome='chr1',start=164835366, end=164835535, data=('test4'),name='test4') |
|
|
589 |
features.sort() |
|
|
590 |
|
|
|
591 |
f= pysam.AlignmentFile('./data/mini_nla_test.bam') |
|
|
592 |
it = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
593 |
alignments=f, |
|
|
594 |
molecule_class=singlecellmultiomics.molecule.AnnotatedNLAIIIMolecule, |
|
|
595 |
molecule_class_args={'features':features}, |
|
|
596 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment, |
|
|
597 |
fragment_class_args={'umi_hamming_distance':1} |
|
|
598 |
) |
|
|
599 |
|
|
|
600 |
hit_count = 0 |
|
|
601 |
for molecule in it: |
|
|
602 |
molecule.annotate() |
|
|
603 |
if len(molecule.hits): |
|
|
604 |
hit_count+=1 |
|
|
605 |
self.assertEqual(hit_count,2) |
|
|
606 |
|
|
|
607 |
""" |
|
|
608 |
def test_classification_consensus(self): |
|
|
609 |
with pysam.AlignmentFile('./data/mini_nla_test.bam') as f, pysam.AlignmentFile('./data/consensus_write_test.bam','wb',header=f.header) as target_bam: |
|
|
610 |
molecule_iterator = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
611 |
alignments=f, |
|
|
612 |
molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule, |
|
|
613 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment |
|
|
614 |
) |
|
|
615 |
|
|
|
616 |
classifier = singlecellmultiomics.molecule.train_consensus_model( |
|
|
617 |
molecule_iterator, |
|
|
618 |
mask_variants=None, |
|
|
619 |
skip_already_covered_bases=False, |
|
|
620 |
n_train=100) |
|
|
621 |
|
|
|
622 |
molecule_iterator = singlecellmultiomics.molecule.MoleculeIterator( |
|
|
623 |
alignments=f, |
|
|
624 |
molecule_class=singlecellmultiomics.molecule.NlaIIIMolecule, |
|
|
625 |
fragment_class=singlecellmultiomics.fragment.NlaIIIFragment |
|
|
626 |
) |
|
|
627 |
|
|
|
628 |
for i,molecule in enumerate(molecule_iterator): |
|
|
629 |
read_name=f'consensus_{i}' |
|
|
630 |
reads = molecule.deduplicate_to_single_CIGAR_spaced(target_bam, |
|
|
631 |
read_name, classifier,reference=None) |
|
|
632 |
|
|
|
633 |
read = molecule.deduplicate_to_single(target_bam) |
|
|
634 |
|
|
|
635 |
consensus = molecule.get_consensus(classifier) |
|
|
636 |
""" |
|
|
637 |
if __name__ == '__main__': |
|
|
638 |
unittest.main() |