Switch to unified view

a b/singlecellmultiomics/libraryDetection/sequencingLibraryListing.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
import glob
5
import os
6
import re
7
from colorama import Fore
8
from colorama import Back
9
from colorama import Style
10
11
12
def formatColor(string):
13
    return(
14
        string.replace("[GREEN]", Fore.GREEN)
15
        .replace("[RED]", Fore.RED)
16
        .replace("[DIM]", Style.DIM)
17
        .replace("[RESET]", Style.RESET_ALL)
18
        .replace("[BRIGHT]", Style.BRIGHT)
19
        .replace("[NORMAL]", Style.NORMAL)
20
    )
21
22
23
def sprint(val, silent=False):
24
    if not silent:
25
        print(val)
26
27
class SequencingLibraryLister():
28
    def __init__(self, verbose=True):
29
        self.verbose = verbose
30
31
    # Function which replaces the substring(s) in library within replace
32
33
    def libraryReplace(self, library, replace):
34
35
        if replace is None:
36
            return library
37
38
        for k in replace:
39
            origin, replace = k.split(',')
40
            library = library.replace(origin, replace)
41
        return library
42
43
    
44
    def detect(self, filesToList, replace=None, slib=None, merge=None, se=False, ignore=False,  args=None, silent=False):
45
        if args is not None:
46
            replace = args.replace
47
            slib = args.slib
48
            merge = args.merge
49
            se = args.se
50
            ignore = args.ignore
51
52
        fastqfiles = filesToList
53
54
        if replace:
55
            try:
56
                if self.verbose:
57
                    sprint("Library name replacement:", silent)
58
                    for k in replace:
59
                        origin, replace = k.split(',')
60
                        sprint(
61
                            formatColor(
62
                                "  -> [DIM]looking for[RESET] '%s' [DIM]replace with:[RESET]'%s'" %
63
                                (origin, replace)), silent)
64
            except Exception as e:
65
                if self.verbose:
66
                    sprint(e, silent)
67
        self.libraries = {}
68
        mergeReport = False
69
70
        # Glob expansion:
71
        if any('*' in path for path in fastqfiles):
72
            fqfiles = []
73
            for path in fastqfiles:
74
                fqfiles += list(glob.glob(path))
75
        else:
76
            fqfiles = fastqfiles
77
78
        for path in fqfiles:
79
            completefastqFileName = os.path.basename(path)
80
            fastqFileName = completefastqFileName.replace(
81
                '.fastq',
82
                '').replace(
83
                '.gz',
84
                '').replace(
85
                '.fq',
86
                '')
87
88
            # Base Clear format:
89
            # Organoid-VG-diff_32158_TTAGGCATTCTTTCCC_L001_R2_001_BHGFKLBCX2.filt.fastq.gz
90
            if fastqFileName.endswith('.filt'):
91
                fastqFileName = fastqFileName.rsplit('_', 1)[0]
92
93
            # Check if we are dealing with a raw illumina or SRR fastq file:
94
95
            if fastqFileName.endswith('_R1') or fastqFileName.endswith('_R2'):
96
                lane = 'single_file'  # Create "fake" lane
97
                library = self.libraryReplace(
98
                    fastqFileName.rsplit('_R', 1)[0], replace)
99
                r1ORr2 = fastqFileName.rsplit('_', 1)[-1]
100
101
                if library not in self.libraries:
102
                    self.libraries[library] = {lane: {}}
103
104
                if lane not in self.libraries[library]:
105
                    self.libraries[library][lane] = {}
106
107
                if r1ORr2 not in self.libraries[library][lane]:
108
                    self.libraries[library][lane][r1ORr2] = []
109
110
                self.libraries[library][lane][r1ORr2].append(path)
111
                #print(path, library, r1ORr2, self.libraries)
112
113
            elif fastqFileName.endswith('R1') or fastqFileName.endswith('R2'):
114
                lane = 'single_file'  # Create "fake" lane
115
                library = self.libraryReplace(
116
                    fastqFileName.rsplit('R', 1)[0], replace)
117
                r1ORr2 = 'R' + fastqFileName[-1]
118
119
                if library not in self.libraries:
120
                    self.libraries[library] = {lane: {}}
121
122
                if lane not in self.libraries[library]:
123
                    self.libraries[library][lane] = {}
124
125
                if r1ORr2 not in self.libraries[library][lane]:
126
                    self.libraries[library][lane][r1ORr2] = []
127
128
                self.libraries[library][lane][r1ORr2].append(path)
129
                #print(path, library, r1ORr2, self.libraries)
130
131
132
            elif fastqFileName.startswith("SRR"):
133
134
                library, r1ORr2 = fastqFileName.split('_')
135
                library = self.libraryReplace(library, replace)
136
                r1ORr2 = 'R%s' % r1ORr2  # The demultiplexer expects the format 'R1'
137
                if slib is not None:
138
                    lane = library
139
                    library = slib
140
                else:
141
                    lane = 'single_file'
142
143
                if library not in self.libraries:
144
                    self.libraries[library] = {lane: {}}
145
                if lane not in self.libraries[library]:
146
                    self.libraries[library][lane] = {}
147
148
                if r1ORr2 not in self.libraries[library][lane]:
149
                    self.libraries[library][lane][r1ORr2] = []
150
                self.libraries[library][lane][r1ORr2].append(path)
151
            else:
152
                library = self.libraryReplace(
153
                    re.sub(
154
                        r'_L[0-9]{3}_R(1|2)_[0-9]{3}',
155
                        '',
156
                        fastqFileName),
157
                    replace)
158
                if slib is not None:
159
                    lane = library
160
                    library = slib
161
162
                if merge:
163
                    delim = merge[0]
164
                    nThSplit = int(merge[1:]) if len(
165
                        merge) > 1 else 1
166
                    newLibraryName = "".join(
167
                        library.split(merge[0])[:nThSplit])
168
                    if not mergeReport:
169
                        #print("Library merger: %sSplitting on '%s%s%s%s', until part %s%s%s, %s %s->%s %s" % (Style.DIM, Style.RESET_ALL, delim, Style.DIM, Style.RESET_ALL,  nThSplit, Style.DIM, Style.RESET_ALL, library, Style.DIM, Style.RESET_ALL, newLibraryName))
170
                        if self.verbose:
171
                            sprint(
172
                                formatColor("Library merger: [DIM]Splitting on '[RESET]%s[DIM]', until part [RESET]%s[DIM], [RESET]%s[DIM] -> [RESET]%s") %
173
                                (delim, nThSplit, library, newLibraryName), silent)
174
175
                        mergeReport = True
176
                    library = newLibraryName
177
                if library not in self.libraries:
178
                    self.libraries[library] = {}
179
                lane = re.sub(r'_R(1|2)_[0-9]{3}', '', fastqFileName)
180
                if lane not in self.libraries[library]:
181
                    self.libraries[library][lane] = {}
182
                # Obtaining that it is R1 or R2:
183
                r1ORr2 = re.sub(
184
                    r'_[0-9]{3}',
185
                    '',
186
                    fastqFileName.replace(
187
                        '%s_' %
188
                        lane,
189
                        ''))
190
191
                if r1ORr2 not in self.libraries[library][lane]:
192
                    self.libraries[library][lane][r1ORr2] = []
193
                self.libraries[library][lane][r1ORr2].append(path)
194
195
        inconsistent = False
196
        ignoreFiles = []
197
        for idx, lib in enumerate(sorted(self.libraries)):
198
            if self.verbose:
199
                sprint(('%s%s%s %s' %
200
                       ('\n' if idx > 0 else '', lib, Style.DIM, Style.RESET_ALL)), silent)
201
202
            inconsistentLane = False
203
            for lane in sorted(self.libraries[lib]):
204
                if self.verbose:
205
                    sprint(("   %s%s%s" % (Style.DIM, lane, Style.RESET_ALL)),silent)
206
                if len(self.libraries[lib][lane]) != 2:
207
                    if not se:
208
                        inconsistent = True
209
                        inconsistentLane = True
210
                        if ignore:
211
                            ignoreFiles.append((lib, lane))
212
                            if self.verbose:
213
                                sprint(('%s    %s IGNORED FILE.. BOTH MATES NOT AVAILABLE or no mates? %s' % (
214
                                    Fore.RED, lib, Style.RESET_ALL)),silent)
215
                        else:
216
                            if self.verbose:
217
                                sprint(('%s    %s BOTH MATES NOT AVAILABLE%s' %
218
                                       (Fore.RED, lib, Style.RESET_ALL)), silent)
219
220
                prevSize = None
221
                for R1R2 in sorted(self.libraries[lib][lane]):
222
                    if prevSize is not None and prevSize != len(
223
                            self.libraries[lib][lane][R1R2]):
224
                        # Missing a mate file
225
                        inconsistent = True
226
                        if self.verbose:
227
                            sprint(("%s    %s %s%s" % (Fore.RED, R1R2, ', '.join(
228
                                self.libraries[lib][lane][R1R2]), Style.RESET_ALL)), silent)
229
                        if ignore:
230
                            ignoreFiles.append((lib, lane))
231
                    else:
232
                        prevSize = len(self.libraries[lib][lane][R1R2])
233
                        # Correct library
234
                        if self.verbose:
235
                            sprint(("%s    %s %s%s" % (Fore.RED if inconsistentLane else Fore.GREEN, R1R2, ', '.join(
236
                                self.libraries[lib][lane][R1R2]), Style.RESET_ALL)), silent)
237
238
        if inconsistent:
239
            if ignore:
240
                if self.verbose:
241
                    sprint(
242
                        "Mate information missing for some files. --ignore was supplied, ignoring these files:", silent)
243
                for ignore in ignoreFiles:
244
                    print("%s %s" % (ignore[0], ignore[1]))
245
                    del self.libraries[ignore[0]][ignore[1]]
246
                # Drop empty self.libraries:
247
                dropLibs = []
248
                for lib in self.libraries:
249
                    if len(self.libraries[lib]) == 0:
250
                        dropLibs.append(lib)
251
                for d in list(set(dropLibs)):
252
                    try:
253
                        del self.libraries[d]
254
                    except BaseException:
255
                        pass
256
            else:
257
                if self.verbose:
258
                    sprint(
259
                        (
260
                            '%sExitting, mate-information missing%s. Supply --se to allow single end reads or --ignore to ignore these files.' %
261
                            (Fore.RED, Style.RESET_ALL)), silent)
262
                exit()
263
        return self.libraries