Switch to unified view

a b/singlecellmultiomics/barcodeFileParser/barcodeFileParser.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
import glob
4
import logging
5
from colorama import Fore
6
from colorama import Back
7
from colorama import Style
8
import os
9
import collections
10
import itertools
11
import gzip
12
13
#logging.basicConfig(level=logging.DEBUG)
14
# http://codereview.stackexchange.com/questions/88912/create-a-list-of-all-strings-within-hamming-distance-of-a-reference-string-with
15
def hamming_circle(s, n, alphabet):
16
    for positions in itertools.combinations(range(len(s)), n):
17
        for replacements in itertools.product(
18
                range(len(alphabet) - 1), repeat=n):
19
            cousin = list(s)
20
            for p, r in zip(positions, replacements):
21
                if cousin[p] == alphabet[r]:
22
                    cousin[p] = alphabet[-1]
23
                else:
24
                    cousin[p] = alphabet[r]
25
            yield ''.join(cousin)
26
27
28
class BarcodeParser():
29
30
    def path_to_barcode_alias(self, path):
31
        barcodeFileAlias = os.path.splitext(
32
            os.path.basename(path))[0].replace('.gz','').replace('.bc','')
33
        return barcodeFileAlias
34
35
    def parse_pending_barcode_file_of_alias(self, alias):
36
        if not alias in self.pending_files:
37
            raise ValueError(f"trying to load {alias}, but barcode file not found")
38
39
        logging.info(f"Loading promised barcode file {alias}")
40
        self.parse_barcode_file( self.pending_files[alias] )
41
        logging.info(f"Performing hamming extension for {alias}")
42
        self.expand(self.hammingDistanceExpansion, alias=alias)
43
        del self.pending_files[alias]
44
45
    def parse_barcode_file(self, barcodeFile):
46
47
        barcodeFileAlias = self.path_to_barcode_alias(barcodeFile)
48
        # Decide the file type (index first or name first)
49
        indexNotFirst = False
50
        with gzip.open(barcodeFile,'rt') if barcodeFile.endswith('.gz') else open(barcodeFile) as f :
51
            for i, line in enumerate(f):
52
                parts = line.strip().split()
53
                if len(parts) == 1 and ' ' in line:
54
                    parts = line.strip().split(' ')
55
                if len(parts) == 1:
56
                    pass
57
                elif len(parts) == 2:
58
                    indexFirst = not all((c in 'ATCGNX') for c in parts[0])
59
                    if not indexFirst:
60
                        indexNotFirst = True
61
                    # print(parts[1],indexFirst)
62
63
        nospec=False
64
        i=0
65
        with gzip.open(barcodeFile,'rt') if barcodeFile.endswith('.gz') else open(barcodeFile) as f :
66
67
            for i, line in enumerate(f):
68
                parts = line.strip().split()
69
                if len(parts) == 1 and ' ' in line:
70
                    parts = line.strip().split(' ')
71
                if len(parts) == 1:
72
                    self.addBarcode(
73
                        barcodeFileAlias, barcode=parts[0], index=i+1)
74
                    #self.barcodes[barcodeFileAlias][parts[0]] = i
75
                    if not nospec: # only show this once:
76
                        logging.info(
77
                            f"\t{parts[0]}:{i} (No index specified in file)")
78
                        nospec=True
79
                elif len(parts) == 2:
80
                    if indexNotFirst:
81
                        barcode, index = parts
82
                    else:
83
                        index, barcode = parts
84
                    #self.barcodes[barcodeFileAlias][barcode] = index
85
86
                    # When the index is only digits, convert to integer
87
                    try:
88
                        if int(index)==int(str(int(index))):
89
                            index = int(index)
90
                        else:
91
                            pass
92
                    except Exception as e:
93
                        pass
94
                    self.addBarcode(
95
                        barcodeFileAlias, barcode=barcode, index=index)
96
                    if not nospec: # only show this once:
97
                        logging.info(
98
                            f"\t{barcode}:{index} (index was specified in file, {'index' if indexFirst else 'barcode'} on first column)")
99
                        nospec=True
100
                else:
101
                    e = f'The barcode file {barcodeFile} contains more than two columns. Failed to parse!'
102
                    logging.error(e)
103
                    raise ValueError(e)
104
        logging.info(f'done loading {i} barcodes')
105
106
107
    def __init__(
108
            self,
109
            barcodeDirectory='barcodes',
110
            hammingDistanceExpansion=0,
111
            spaceFill=False,
112
            lazyLoad=None # these aliases wiill not be loaded until requested, '*' matches all files
113
            ):
114
115
        self.hammingDistanceExpansion = hammingDistanceExpansion
116
117
        barcodeDirectory = os.path.join(
118
            os.path.dirname(
119
                os.path.realpath(__file__)),
120
            barcodeDirectory)
121
        barcode_files = list(glob.glob(f'{barcodeDirectory}/*'))
122
123
        self.spaceFill = spaceFill
124
        self.hammingDistanceExpansion = hammingDistanceExpansion
125
        self.barcodes = collections.defaultdict(
126
            dict)  # alias -> barcode -> index
127
        # alias -> barcode -> (index, hammingDistance)
128
        self.extendedBarcodes = collections.defaultdict(dict)
129
        self.pending_files = dict()
130
        for barcodeFile in barcode_files:
131
            barcodeFileAlias = self.path_to_barcode_alias(barcodeFile)
132
            if lazyLoad is not None and (barcodeFileAlias in lazyLoad or lazyLoad =='*'):
133
                logging.info(f"Lazy loading {barcodeFile}, alias {barcodeFileAlias}")
134
                self.pending_files[barcodeFileAlias] = barcodeFile
135
                continue
136
            logging.info(f"Parsing {barcodeFile}, alias {barcodeFileAlias}")
137
            self.parse_barcode_file(barcodeFile)
138
139
            if hammingDistanceExpansion > 0:
140
                self.expand(hammingDistanceExpansion, alias=barcodeFileAlias)
141
142
    def getTargetCount(self, barcodeFileAlias):
143
        return(len(self.barcodes[barcodeFileAlias]), len(self.extendedBarcodes[barcodeFileAlias]))
144
145
    def expand(
146
            self,
147
            hammingDistanceExpansion,
148
            alias,
149
            reportCollisions=True,
150
            spaceFill=None):
151
152
        barcodes = self.barcodes[alias]
153
        # hammingBarcode -> ( ( distance,origin) )
154
        hammingSpace = collections.defaultdict(list)
155
        for barcode in barcodes:
156
157
            for hammingDistance in range(0, hammingDistanceExpansion + 1):
158
                for hammingInstance in hamming_circle(
159
                        barcode, hammingDistance, 'ACTGN'):
160
                    hammingSpace[hammingInstance].append(
161
                        (hammingDistance, barcode))
162
        # Resolve all
163
        for hammingBarcode in hammingSpace:
164
165
            # Check if there is a closest origin:
166
            sortedDistances = sorted(hammingSpace[hammingBarcode])
167
            if len(sortedDistances) > 1 and (
168
                    sortedDistances[0][0] == sortedDistances[1][0]):
169
                # We cannot resolve this, two or more origins are at the same distance:
170
                #print('Cannot resolve %s' % hammingBarcode)
171
                continue
172
173
            hammingDistance, origin = sortedDistances[0]
174
175
            self.addBarcode(
176
                alias,
177
                barcode=hammingBarcode,
178
                index=self.barcodes[alias][origin],
179
                hammingDistance=hammingDistance,
180
                originBarcode=origin)
181
182
    def addBarcode(
183
            self,
184
            barcodeFileAlias,
185
            barcode,
186
            index,
187
            hammingDistance=0,
188
            originBarcode=None):
189
        if hammingDistance == 0:
190
            self.barcodes[barcodeFileAlias][barcode] = index
191
        else:
192
            if originBarcode is None:
193
                raise ValueError()
194
            self.extendedBarcodes[barcodeFileAlias][barcode] = (
195
                index, originBarcode, hammingDistance)
196
197
    # get index and hamming distance to barcode  returns none if not Available
198
    def getIndexCorrectedBarcodeAndHammingDistance(self, barcode, alias, try_lazy_load_pending=True):
199
200
        # Check if the alias still needs to be loaded:
201
202
203
        if barcode in self.barcodes[alias]:
204
            return (self.barcodes[alias][barcode], barcode, 0)
205
        if barcode in self.extendedBarcodes[alias]:
206
            return self.extendedBarcodes[alias][barcode]
207
208
        if alias in self.pending_files:
209
            if not try_lazy_load_pending:
210
                raise RecursionError()
211
            self.parse_pending_barcode_file_of_alias(alias)
212
            return self.getIndexCorrectedBarcodeAndHammingDistance(barcode, alias, try_lazy_load_pending=False)
213
214
        return (None, None, None)
215
216
    def list(self, showBarcodes=5):  # showBarcodes=None show all
217
        for barcodeAlias, mapping in self.barcodes.items():
218
            print(
219
                f'{len(mapping)} barcodes{Style.DIM} obtained from {Style.RESET_ALL}{barcodeAlias}')
220
            if len(mapping):
221
                for bcId in list(mapping.keys())[:showBarcodes]:
222
                    try:
223
                        print(('%s%s%s%s%s%s' % (Fore.GREEN, bcId,
224
                                                 Fore.WHITE, '→', mapping[bcId], Style.RESET_ALL)))
225
                    except Exception as e:
226
                        print(('%s%s%s%s%s%s' % (Fore.GREEN, bcId,
227
                                                 Fore.WHITE, '->', mapping[bcId], Style.RESET_ALL)))
228
                if showBarcodes is not None and len(mapping) > showBarcodes:
229
                    print(
230
                        f'{Style.DIM} %s more ...\n{Style.RESET_ALL}' %
231
                        (len(mapping) - showBarcodes))
232
233
    def getBarcodeMapping(self):
234
        return self.barcodes
235
236
    def __getitem__(self, alias):
237
        if alias in self.pending_files:
238
            self.parse_pending_barcode_file_of_alias(alias)
239
        return self.barcodes.get(alias)