Switch to unified view

a b/tests/test_region_count.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
import unittest
4
from singlecellmultiomics.bamProcessing.bamCountRegions import count_regions
5
import tempfile
6
import pysam
7
from collections import Counter,defaultdict
8
import pandas as pd
9
import os
10
11
class TestRegionCount(unittest.TestCase):
12
13
14
    def test_region_counter(self):
15
        regions = {
16
            'regA':('chr1', 100,200),
17
            'regB':('chr1', 400,500),
18
            'regC':('chr1', 450,500),
19
        }
20
21
        reads = {
22
            # read name, cell index, library, mate, contig, cut location, strand : should be assigned  to:
23
            ('readA' , 1, 'LIBA', 1, 'chr1', 100, '+'): 'regA',
24
            ('readB' , 1, 'LIBA', 1, 'chr1', 150, '+'): 'regA',
25
            ('readC' , 1, 'LIBA', 1, 'chr1', 199, '+'): 'regA',
26
            ('readD' , 1, 'LIBA', 1, 'chr1', 200, '+'): None,
27
            ('readE' , 1, 'LIBA', 1, 'chr1', 210, '+'): None,
28
            ('readE' , 1, 'LIBA', 1, 'chr1', 399, '+'): None,
29
            ('readF' , 1, 'LIBA', 1, 'chr1', 400, '+'): 'regB',
30
            ('readG' , 1, 'LIBA', 1, 'chr1', 450, '+'): ['regB','regC'],
31
            ('readH' , 1, 'LIBB', 1, 'chr1', 450, '+'): ['regB','regC']
32
33
        }
34
35
        # Create bed file
36
        temp_bed_file = tempfile.NamedTemporaryFile(mode='w')
37
        for reg, (contig,start,end) in regions.items():
38
            temp_bed_file.write(f'{contig}\t{start}\t{end}\t{reg}\n')
39
        temp_bed_file.flush()
40
41
        # Create BAM file
42
        refseq = 'TTAATCATGAAACCGTGGAGGCAAATCGGAGTGTAAGGCTTGACTTTTAAGGGTTGAGATTTTCGAGAGGGATTCCTACGTTGCGTAGGTTCATGGGGGG'*10
43
44
        temp_bam_path = './temp_counting.bam'
45
        with pysam.AlignmentFile(
46
            temp_bam_path,
47
            'wb',
48
            reference_names=['chr1'],
49
            reference_lengths=[500]) as bam:
50
51
            for (read_name, cell_index, library, mate, contig, cut_location, strand) in reads:
52
                rlen = 20
53
                read_A = pysam.AlignedSegment(bam.header)
54
                read_A.reference_name = contig
55
                read_A.reference_start = cut_location
56
                # Before last A is a bogus G>A conversion to test strandness:
57
                read_A.query_sequence = refseq[cut_location:cut_location+rlen]
58
                read_A.cigarstring = f'{len(read_A.query_sequence)}M'
59
                read_A.qual = 'A'*len(read_A.query_sequence)
60
                read_A.mapping_quality = 60
61
                read_A.query_name = read_name
62
                read_A.set_tag('SM', f'{library}_{cell_index}')
63
                read_A.set_tag('LY', library)
64
                read_A.set_tag('DS', cut_location)
65
                read_A.set_tag('bi', cell_index)
66
                read_A.is_read1 = (mate==1)
67
                read_A.is_read2 = (mate!=1)
68
69
                read_A.is_paired = False
70
                read_A.is_proper_pair = True
71
72
                bam.write(read_A)
73
        pysam.index(temp_bam_path)
74
75
        ## Call the function to test:
76
        df = count_regions(temp_bed_file.name, bam_paths=[temp_bam_path,])
77
        temp_bed_file.close()
78
79
80
        targets = defaultdict(Counter)
81
        for (read_name, cell_index, library, mate, contig, cut_location, strand), goals in reads.items():
82
            if goals is None:
83
                continue
84
            if type(goals)!=list:
85
                goals = [goals]
86
            for goal in goals:
87
                targets[f'{library}_{cell_index}'][goal] += 1
88
        dfob = pd.DataFrame(targets).fillna(0)
89
90
        for feature, row in dfob.iterrows():
91
            for sample,value in row.items():
92
                self.assertTrue( df.loc[sample][feature] == value )
93
94
        os.remove(temp_bam_path)
95
        os.remove(temp_bam_path+'.bai')
96
97
if __name__ == '__main__':
98
    unittest.main()