|
a |
|
b/tests/test_region_count.py |
|
|
1 |
#!/usr/bin/env python3 |
|
|
2 |
# -*- coding: utf-8 -*- |
|
|
3 |
import unittest |
|
|
4 |
from singlecellmultiomics.bamProcessing.bamCountRegions import count_regions |
|
|
5 |
import tempfile |
|
|
6 |
import pysam |
|
|
7 |
from collections import Counter,defaultdict |
|
|
8 |
import pandas as pd |
|
|
9 |
import os |
|
|
10 |
|
|
|
11 |
class TestRegionCount(unittest.TestCase): |
|
|
12 |
|
|
|
13 |
|
|
|
14 |
def test_region_counter(self): |
|
|
15 |
regions = { |
|
|
16 |
'regA':('chr1', 100,200), |
|
|
17 |
'regB':('chr1', 400,500), |
|
|
18 |
'regC':('chr1', 450,500), |
|
|
19 |
} |
|
|
20 |
|
|
|
21 |
reads = { |
|
|
22 |
# read name, cell index, library, mate, contig, cut location, strand : should be assigned to: |
|
|
23 |
('readA' , 1, 'LIBA', 1, 'chr1', 100, '+'): 'regA', |
|
|
24 |
('readB' , 1, 'LIBA', 1, 'chr1', 150, '+'): 'regA', |
|
|
25 |
('readC' , 1, 'LIBA', 1, 'chr1', 199, '+'): 'regA', |
|
|
26 |
('readD' , 1, 'LIBA', 1, 'chr1', 200, '+'): None, |
|
|
27 |
('readE' , 1, 'LIBA', 1, 'chr1', 210, '+'): None, |
|
|
28 |
('readE' , 1, 'LIBA', 1, 'chr1', 399, '+'): None, |
|
|
29 |
('readF' , 1, 'LIBA', 1, 'chr1', 400, '+'): 'regB', |
|
|
30 |
('readG' , 1, 'LIBA', 1, 'chr1', 450, '+'): ['regB','regC'], |
|
|
31 |
('readH' , 1, 'LIBB', 1, 'chr1', 450, '+'): ['regB','regC'] |
|
|
32 |
|
|
|
33 |
} |
|
|
34 |
|
|
|
35 |
# Create bed file |
|
|
36 |
temp_bed_file = tempfile.NamedTemporaryFile(mode='w') |
|
|
37 |
for reg, (contig,start,end) in regions.items(): |
|
|
38 |
temp_bed_file.write(f'{contig}\t{start}\t{end}\t{reg}\n') |
|
|
39 |
temp_bed_file.flush() |
|
|
40 |
|
|
|
41 |
# Create BAM file |
|
|
42 |
refseq = 'TTAATCATGAAACCGTGGAGGCAAATCGGAGTGTAAGGCTTGACTTTTAAGGGTTGAGATTTTCGAGAGGGATTCCTACGTTGCGTAGGTTCATGGGGGG'*10 |
|
|
43 |
|
|
|
44 |
temp_bam_path = './temp_counting.bam' |
|
|
45 |
with pysam.AlignmentFile( |
|
|
46 |
temp_bam_path, |
|
|
47 |
'wb', |
|
|
48 |
reference_names=['chr1'], |
|
|
49 |
reference_lengths=[500]) as bam: |
|
|
50 |
|
|
|
51 |
for (read_name, cell_index, library, mate, contig, cut_location, strand) in reads: |
|
|
52 |
rlen = 20 |
|
|
53 |
read_A = pysam.AlignedSegment(bam.header) |
|
|
54 |
read_A.reference_name = contig |
|
|
55 |
read_A.reference_start = cut_location |
|
|
56 |
# Before last A is a bogus G>A conversion to test strandness: |
|
|
57 |
read_A.query_sequence = refseq[cut_location:cut_location+rlen] |
|
|
58 |
read_A.cigarstring = f'{len(read_A.query_sequence)}M' |
|
|
59 |
read_A.qual = 'A'*len(read_A.query_sequence) |
|
|
60 |
read_A.mapping_quality = 60 |
|
|
61 |
read_A.query_name = read_name |
|
|
62 |
read_A.set_tag('SM', f'{library}_{cell_index}') |
|
|
63 |
read_A.set_tag('LY', library) |
|
|
64 |
read_A.set_tag('DS', cut_location) |
|
|
65 |
read_A.set_tag('bi', cell_index) |
|
|
66 |
read_A.is_read1 = (mate==1) |
|
|
67 |
read_A.is_read2 = (mate!=1) |
|
|
68 |
|
|
|
69 |
read_A.is_paired = False |
|
|
70 |
read_A.is_proper_pair = True |
|
|
71 |
|
|
|
72 |
bam.write(read_A) |
|
|
73 |
pysam.index(temp_bam_path) |
|
|
74 |
|
|
|
75 |
## Call the function to test: |
|
|
76 |
df = count_regions(temp_bed_file.name, bam_paths=[temp_bam_path,]) |
|
|
77 |
temp_bed_file.close() |
|
|
78 |
|
|
|
79 |
|
|
|
80 |
targets = defaultdict(Counter) |
|
|
81 |
for (read_name, cell_index, library, mate, contig, cut_location, strand), goals in reads.items(): |
|
|
82 |
if goals is None: |
|
|
83 |
continue |
|
|
84 |
if type(goals)!=list: |
|
|
85 |
goals = [goals] |
|
|
86 |
for goal in goals: |
|
|
87 |
targets[f'{library}_{cell_index}'][goal] += 1 |
|
|
88 |
dfob = pd.DataFrame(targets).fillna(0) |
|
|
89 |
|
|
|
90 |
for feature, row in dfob.iterrows(): |
|
|
91 |
for sample,value in row.items(): |
|
|
92 |
self.assertTrue( df.loc[sample][feature] == value ) |
|
|
93 |
|
|
|
94 |
os.remove(temp_bam_path) |
|
|
95 |
os.remove(temp_bam_path+'.bai') |
|
|
96 |
|
|
|
97 |
if __name__ == '__main__': |
|
|
98 |
unittest.main() |