Switch to unified view

a b/singlecellmultiomics/modularDemultiplexer/demux.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
import importlib.resources as resources
4
import logging
5
from singlecellmultiomics.modularDemultiplexer.baseDemultiplexMethods import NonMultiplexable
6
from colorama import init
7
from singlecellmultiomics.modularDemultiplexer.demultiplexingStrategyLoader import DemultiplexingStrategyLoader
8
import singlecellmultiomics.libraryDetection.sequencingLibraryListing as sequencingLibraryListing
9
import singlecellmultiomics.barcodeFileParser.barcodeFileParser as barcodeFileParser
10
from singlecellmultiomics.fastqProcessing.fastqHandle import FastqHandle
11
import argparse
12
from colorama import Style
13
from colorama import Fore
14
import sys
15
import os
16
from singlecellmultiomics.utils.submission import submit_job
17
18
19
if __name__ == '__main__':
20
21
    # Location of this script
22
    demuxer_location = os.path.dirname(os.path.realpath(__file__))
23
24
    init()
25
26
    argparser = argparse.ArgumentParser(
27
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
28
        description="""%sMulti-library Single cell-demultiplexing%s, written by Buys de Barbanson, 2016-2024
29
                                        %sExample usage:%s
30
31
                                        List how the demultiplexer will interpret the data: (No demultiplexing is executed)
32
                                        %sdemultiplex.py ./*.fastq.gz%s
33
                                        Demultiplex a set of fastq files into separate files per cell:
34
                                        %sdemultiplex.py ./*.fastq.gz --y%s
35
                                        Demultiplex on the cluster:
36
                                        %sdemultiplex.py ./*.fastq.gz -submit demux.sh; sh demux.sh %s
37
        """ %
38
        (Style.BRIGHT,
39
         Style.RESET_ALL,
40
         Style.BRIGHT,
41
         Style.RESET_ALL,
42
         Style.DIM,
43
         Style.RESET_ALL,
44
         Style.DIM,
45
         Style.RESET_ALL,
46
         Style.DIM,
47
         Style.RESET_ALL))
48
49
    argparser.add_argument(
50
        'fastqfiles',
51
        metavar='fastqfile',
52
        type=str,
53
        nargs='*',
54
        help='Input fastq files. Read names should be in illumina format or LIBRARY_R1.fastq.gz, LIBRARY_R2.fastq.gz, if one file is supplied and the name does not end on .fastq. or fastq.gz the file is interpreted as a list of files')
55
    argparser.add_argument(
56
        '--y',
57
        help="Perform the demultiplexing (Accept current parameters)",
58
        action='store_true')
59
    argparser.add_argument(
60
        '-experimentName',
61
        '-e',
62
        help="Name of the experiment, set to uf to use the folder name",
63
        type=str,
64
        default='uf')
65
66
    inputArgs = argparser.add_argument_group('Input', '')
67
    inputArgs.add_argument(
68
        '-n',
69
        help="Only get the first n properly demultiplexable reads from a library",
70
        type=int)
71
    inputArgs.add_argument(
72
        '-r',
73
        help="Only get the first n reads from a library, then demultiplex",
74
        type=int)
75
    inputArgs.add_argument(
76
        '-slib',
77
        help="Assume all files belong to the same library, this flag supplies the name",
78
        type=str)
79
    inputArgs.add_argument(
80
        '-replace',
81
        action='append',
82
        default=None,
83
        help="""Replace part of the library name by another string, [SEARCH,REPLACEMENT]. For example if you want to remove FOO from the library name use "-replace FOO," if you want to replace FOO by BAR use "-replace FOO,BAR" """)
84
85
    inputArgs.add_argument(
86
        '-merge',
87
        default="_",
88
        help="""Merge libraries through multiple runs by selecting a merging method.
89
    Options:None, delimiter[position], examples, given the samplename 'library_a_b': -merge _ yields 'library', -merge _2 yields 'library_a'as identifier.""")
90
    inputArgs.add_argument(
91
        '--ignore',
92
        action='store_true',
93
        help="Ignore non-demultiplexable read files")
94
    #inputArgs.add_argument('-bfsp', help="Barcode file searchpaths", type=str, default='/media/sf_data/references,/hpc/hub_oudenaarden/bdebarbanson/ref')
95
    outputArgs = argparser.add_argument_group('Output', '')
96
    argparser.add_argument(
97
        '--norejects',
98
        help="Do not store rejected reads",
99
        action='store_true')
100
101
    outputArgs.add_argument(
102
        '-o',
103
        help="Output (cell) file directory, when not supplied the current directory/raw_demultiplexed is used",
104
        type=str,
105
        default='./raw_demultiplexed')
106
    outputArgs.add_argument(
107
        '--scsepf',
108
        help="Every cell gets a separate FQ file",
109
        action='store_true')
110
111
112
    bcArgs = argparser.add_argument_group('Barcode', '')
113
    bcArgs.add_argument(
114
        '-hd',
115
        help="Hamming distance barcode expansion; accept cells with barcodes N distances away from the provided barcodes. Collisions are dealt with automatically. ",
116
        type=int,
117
        default=0)
118
    bcArgs.add_argument(
119
        '--lbi',
120
        help="List barcodes being used for cell demultiplexing",
121
        action='store_true')
122
    bcArgs.add_argument(
123
        '-barcodeDir',
124
        default=str( resources.files('singlecellmultiomics') / 'modularDemultiplexer/barcodes/' ),
125
        help="Directory from which to obtain the barcodes, when nothing is supplied the package resources are used")
126
    
127
    indexArgs = argparser.add_argument_group('Sequencing indices', '')
128
    indexArgs.add_argument(
129
        '--li',
130
        help="List sequencing indices.",
131
        action='store_true')
132
    indexArgs.add_argument(
133
        '-indexDir',
134
        default=str( resources.files('singlecellmultiomics') / 'modularDemultiplexer/indices/' ),
135
        help="Directory from which to obtain the sequencing indices,  when nothing is supplied the package resources are used")
136
    indexArgs.add_argument(
137
        '-si', help="Select only these sequencing indices -si CGATGT,TTAGGC")
138
    indexArgs.add_argument(
139
        '-ifa',
140
        help="Index file alias, select from the list supplied when running --li",
141
        default='illumina_merged_ThruPlex48S_RP',
142
        type=str)
143
    indexArgs.add_argument(
144
        '-hdi',
145
        help="Hamming distance INDEX sequence expansion, the hamming distance used for resolving the sequencing INDEX. For cell barcode hamming distance see -hd",
146
        type=int,
147
        default=1)
148
149
    fragArgs = argparser.add_argument_group('Fragment configuration', '')
150
    #fragArgs.add_argument('--rc1', help="First mate is reverse complemented", action='store_true')
151
    #fragArgs.add_argument('--rc2', help="Second mate is reverse complemented", action='store_true')
152
    fragArgs.add_argument(
153
        '--se',
154
        help="Allow single end reads",
155
        action='store_true')
156
157
    techArgs = argparser.add_argument_group('Technical', '')
158
    techArgs.add_argument(
159
        '-g',
160
        help="group_id, don't use this yourself",
161
        type=int,
162
        default=None)
163
    techArgs.add_argument(
164
        '-fh',
165
        help="When demultiplexing to multiple cell files in multiple threads, the amount of opened files can exceed the limit imposed by your operating system. The amount of open handles per thread is kept below this parameter to prevent this from happening.",
166
        default=500,
167
        type=int)
168
    techArgs.add_argument(
169
        '-dsize',
170
        help="Amount of reads used to determine barcode type",
171
        type=int,
172
        default=2000)
173
    techArgs.add_argument(
174
        '--nochunk',
175
        help="Do not run lanes in separate jobs",
176
        action='store_true')
177
178
    argparser.add_argument(
179
        '-use',
180
        default=None,
181
        help='use these demultiplexing strategies, comma separate to select multiple. For example for cellseq 2 data with 6 basepair umi: -use CS2C8U6 , for combined mspji and Celseq2: MSPJIC8U3,CS2C8U6 if nothing is specified, the best scoring method is selected')
182
183
    #argparser.add_argument(
184
#        '-ignoreMethods',
185
        #help='Do not try to load these methods',
186
        #default="scarsMiSeq")
187
188
    argparser.add_argument(
189
        '-only_detect_methods',
190
        help='Only auto detect these methods, use a comma as a separator',
191
        default=None)
192
193
    argparser.add_argument(
194
        '-maxAutoDetectMethods',
195
        '-mxa',
196
        help='When --use is not specified, how many methods can the demultiplexer choose at the same time? This loosely corresponds to the amount of measurements you made in a single cell',
197
        default=1,
198
        type=int)
199
    argparser.add_argument(
200
        '-minAutoDetectPct',
201
        '-mia',
202
        help='When --use is not specified, what is the lowest percentage yield required to select a demultplexing strategy',
203
        default=2,
204
        type=float)
205
206
207
    cluster = argparser.add_argument_group('cluster execution')
208
    cluster.add_argument(
209
        '--cluster',
210
        action='store_true',
211
        help='split by chromosomes and submit the job on cluster')
212
213
    cluster.add_argument(
214
        '-mem',
215
        default=40,
216
        type=int,
217
        help='Memory requested per job')
218
    cluster.add_argument(
219
        '-time',
220
        default=52,
221
        type=int,
222
        help='Time requested per job')
223
224
    cluster.add_argument(
225
        '-sched',
226
        default=None,
227
        type=str,
228
        help='Scheduler to use: sge, slurm')
229
230
    args = argparser.parse_args()
231
    verbosity = 1
232
233
    if args.y and args.sched is not None:
234
        raise ValueError('Use --y or -sched [scheduler], never both')
235
236
    #ignoreMethods = args.ignoreMethods.split(',')
237
    only_detect_methods = args.only_detect_methods.split(',') if args.only_detect_methods is not None else None
238
    if any(('*' in fq_file for fq_file in args.fastqfiles)):
239
        raise ValueError(
240
            "One or more of the fastq file paths contain a '*', these files cannot be interpreted, review your input files")
241
242
    if len(set(args.fastqfiles)) != len(args.fastqfiles):
243
        print(f'{Fore.RED}{Style.BRIGHT}Some fastq files are supplied multiple times! Pruning those!{Style.RESET_ALL}')
244
        args.fastqfiles = set(args.fastqfiles)
245
246
    if len(args.fastqfiles) == 1:
247
        if not args.fastqfiles[0].endswith('.gz') and not args.fastqfiles[0].endswith(
248
                '.fastq') and not args.fastqfiles[0].endswith('.fq'):
249
            # File list:
250
            print('Input is interpreted as a list of files..')
251
            with open(args.fastqfiles[0]) as f:
252
                fqFiles = []
253
                for line in f:
254
                    fqFiles.append(line.strip())
255
            args.fastqfiles = fqFiles
256
257
    # Sort the fastq files..
258
    args.fastqfiles = sorted(args.fastqfiles)
259
260
    # Load barcodes
261
    barcodeParser = barcodeFileParser.BarcodeParser(
262
        hammingDistanceExpansion=args.hd,
263
        barcodeDirectory=args.barcodeDir,
264
        lazyLoad=("10x_3M-february-2018",)
265
        )
266
267
    # Setup the index parser
268
    indexFileAlias = args.ifa  # let the multiplex methods decide which index file to use
269
270
    if args.si:  # the user defines the sequencing indices
271
        useSequencingIndices = args.si.split(',')
272
        print(
273
            f"{Style.BRIGHT} Only these sequencing indices will be kept: {Style.RESET_ALL}")
274
        for sequencingIndex in useSequencingIndices:
275
            print(f"{Fore.GREEN}{sequencingIndex}{Style.RESET_ALL}")
276
277
        indexParser = barcodeFileParser.BarcodeParser()
278
        indexFileAlias = 'user'
279
        for index, sequencingIndex in enumerate(useSequencingIndices):
280
            indexParser.addBarcode(
281
                index=str(index),
282
                barcodeFileAlias='user',
283
                barcode=sequencingIndex,
284
                hammingDistance=0,
285
                originBarcode=None)
286
        # Perform expansion:
287
        indexParser.expand(args.hdi, alias='user')
288
289
    else:  # the sequencing indices are automatically detected
290
        indexParser = barcodeFileParser.BarcodeParser(
291
            hammingDistanceExpansion=args.hdi, barcodeDirectory=args.indexDir)
292
293
    if args.lbi:
294
        barcodeParser.list(showBarcodes=None)
295
    if args.li:
296
        indexParser.list(showBarcodes=None)
297
298
    # Load the demultiplexing strategies
299
    dmx = DemultiplexingStrategyLoader(
300
        barcodeParser=barcodeParser,
301
        indexParser=indexParser,
302
        #ignoreMethods=ignoreMethods,
303
        only_detect_methods = only_detect_methods,
304
        indexFileAlias=indexFileAlias)
305
    dmx.list()
306
307
    if len(args.fastqfiles) == 0:
308
        print(f'{Fore.RED}No files supplied, exitting.{Style.RESET_ALL}')
309
        exit()
310
311
    print(f"\n{Style.BRIGHT}Detected libraries:{Style.RESET_ALL}")
312
    libraries = sequencingLibraryListing.SequencingLibraryLister().detect(
313
        args.fastqfiles, args=args)
314
315
    # Detect the libraries:
316
    if args.use is None:
317
        if len(libraries) == 0:
318
            raise ValueError('No libraries found')
319
320
        print(
321
            f"\n{Style.BRIGHT}Demultiplexing method Autodetect results{Style.RESET_ALL}")
322
        # Run autodetect
323
        processedReadPairs, strategyYieldsForAllLibraries = dmx.detectLibYields(
324
            libraries, testReads=args.dsize, maxAutoDetectMethods=args.maxAutoDetectMethods, minAutoDetectPct=args.minAutoDetectPct, verbose=True)
325
326
    print(f"\n{Style.BRIGHT}Demultiplexing:{Style.RESET_ALL}")
327
328
    final_jobs = []
329
    for library in libraries:
330
        if args.use is None:
331
            processedReadPairs = strategyYieldsForAllLibraries[library]['processedReadPairs']
332
            strategyYieldForLibrary = strategyYieldsForAllLibraries[library]['strategyYields']
333
            selectedStrategies = dmx.selectedStrategiesBasedOnYield(
334
                processedReadPairs,
335
                strategyYieldForLibrary,
336
                maxAutoDetectMethods=args.maxAutoDetectMethods,
337
                minAutoDetectPct=args.minAutoDetectPct)
338
            selectedStrategies = dmx.getSelectedStrategiesFromStringList(
339
                selectedStrategies, verbose = False)
340
        else:
341
            selectedStrategies = dmx.getSelectedStrategiesFromStringList(
342
                args.use.split(','), verbose = False)
343
344
        print(f'Library {library} will be demultiplexed using:')
345
        for stra in selectedStrategies:
346
            print(f'\t{Fore.GREEN}{str(stra)}{Style.RESET_ALL}')
347
        if len(selectedStrategies) == 0:
348
            print(
349
                f'{Fore.RED}NONE! The library will not be demultiplexed! The used barcodes could not be detected automatically. Please supply the desired method using the -use flag or increase the -dsize parameter to use more reads for detecting the library type.{Style.RESET_ALL}')
350
351
        if not args.y and args.sched is None:
352
            print(f"\n{Style.BRIGHT}--y not supplied, supply --y or add -sched slurm to the command to run demultiplexing on the cluster{Style.RESET_ALL}")
353
354
        if args.sched is not None:
355
            cluster_file_folder = os.path.abspath(os.path.dirname(
356
                    args.o)) + '/cluster'
357
358
            arguments = " ".join(
359
                [x for x in sys.argv if x != '--dry' and  '-sched' !=x and 'slurm'!=x and 'sge'!=x and '--y' not in x and '-submit' not in x and '.fastq' not in x and '.fq' not in x]) + " --y"
360
361
            submit_in_chunks = (not args.scsepf and not args.nochunk)
362
            submitted_jobs = []
363
            filesForLib = []
364
            group_id = 0
365
366
367
            print("Submitting jobs ...")
368
            for lane in libraries[library]:
369
                files_to_submit = []
370
                for R1R2 in libraries[library][lane]:
371
372
                    for p in libraries[library][lane][R1R2]:
373
                        filesForLib.append(p)
374
                        files_to_submit.append(p)
375
376
                if submit_in_chunks:
377
                    job_name = f'DMX_{library}_{group_id}'
378
379
                    job_id = submit_job(f'{arguments} -g {group_id} -use {",".join([x.shortName for x in selectedStrategies])} {" ".join(files_to_submit)};',
380
                                        job_name=job_name,
381
                                        target_directory=cluster_file_folder,
382
                                        working_directory=None,
383
                                        threads_n=1,
384
                                        memory_gb=args.mem,
385
                                        time_h=args.time,
386
                                        scheduler=args.sched,
387
                                        copy_env=True,
388
                                        email=None,
389
                                        mail_when_finished=False,
390
                                        hold=None,
391
                                        silent=True,
392
                                        submit=True)
393
                    submitted_jobs.append(job_id)
394
                group_id += 1
395
396
397
            if not submit_in_chunks:
398
                job_name = f'DMX_{library}'
399
400
                job_id = submit_job(f'{arguments} -g {group_id} -use {",".join([x.shortName for x in selectedStrategies])} {" ".join(filesForLib)};', job_name=job_name, target_directory=cluster_file_folder,  working_directory=None,
401
                           threads_n=1, memory_gb=args.mem, time_h=args.time, scheduler=args.sched, copy_env=True,
402
                           email=None, mail_when_finished=False, hold=None,submit=True, silent=True)
403
404
                final_jobs.append(job_id)
405
406
            else:
407
                # we need a job which glues everything back together
408
409
                cmds = [
410
                    f'cat {args.o}/{library}/*_TEMP_demultiplexedR1.fastq.gz  > {args.o}/{library}/demultiplexedR1.fastq.gz && rm {args.o}/{library}/*_TEMP_demultiplexedR1.fastq.gz',
411
                    f'cat {args.o}/{library}/*_TEMP_demultiplexedR2.fastq.gz  > {args.o}/{library}/demultiplexedR2.fastq.gz && rm {args.o}/{library}/*_TEMP_demultiplexedR2.fastq.gz',
412
                    f'cat {args.o}/{library}/*_TEMP_demultiplexing.log  > {args.o}/{library}/demultiplexing.log && rm {args.o}/{library}/*_TEMP_demultiplexing.log']
413
                if not args.norejects:
414
                    cmds += [
415
                        f'cat {args.o}/{library}/*_TEMP_rejectsR1.fastq.gz  > {args.o}/{library}/rejectsR1.fastq.gz && rm {args.o}/{library}/*_TEMP_rejectsR1.fastq.gz',
416
                        f'cat {args.o}/{library}/*_TEMP_rejectsR2.fastq.gz  > {args.o}/{library}/rejectsR2.fastq.gz && rm {args.o}/{library}/*_TEMP_rejectsR2.fastq.gz'
417
                    ]
418
419
                for i, cmd in enumerate(cmds):
420
                    job_name = f'glue_{library}_{i}'
421
422
423
                    job_id = submit_job(cmd, job_name=job_name, target_directory=cluster_file_folder,  working_directory=None,
424
                               threads_n=1, memory_gb=2, time_h=4, scheduler=args.sched, copy_env=True,
425
                               email=None, mail_when_finished=False, hold=submitted_jobs,submit=True)
426
427
428
                    final_jobs.append(job_id)
429
430
        if args.y:
431
432
            targetDir = f'{args.o}/{library}'
433
            if not os.path.exists(targetDir):
434
                try:
435
                    os.makedirs(targetDir)
436
                except FileExistsError:
437
                    continue
438
439
            prefix = '' if args.g is None else f'{args.g}_TEMP_'
440
441
            # Check whether the output contains paired or single end reads
442
            paired_end = False
443
            for lane, readPairs in libraries[library].items():
444
                
445
                if len(readPairs)==2:
446
                    paired_end=True
447
                    print(f'\t- {Style.DIM}{lane}{Style.NORMAL} paired-end' )    
448
                else:
449
                    assert not paired_end, "Cannot demultiplex a mixture of single and paired end reads"
450
                    print(f'\t- {Style.DIM}{lane}{Style.NORMAL} single-end' )    
451
            if paired_end:
452
                print(f'\tAll data is {Style.BRIGHT}paired{Style.NORMAL} end')
453
            else:
454
                print(f'\tAll data is {Style.BRIGHT}single{Style.NORMAL} end')
455
            handle = FastqHandle(
456
                f'{args.o}/{library}/{prefix}demultiplexed',
457
                paired_end,
458
                single_cell=args.scsepf,
459
                maxHandles=args.fh)
460
            if args.norejects:
461
                rejectHandle = None
462
            else:
463
                rejectHandle = FastqHandle(
464
                    f'{args.o}/{library}/{prefix}rejects', paired_end)
465
            """Set up statistic file"""
466
467
            log_location = os.path.abspath(
468
                f'{args.o}/{library}/{prefix}demultiplexing.log')
469
            log_handle = open(log_location, 'w')
470
            log_handle.write(" ".join(sys.argv) + '\n')
471
            log_handle.write(
472
                f'Demultiplexing operation started, writing to {args.o}/{library}\n')
473
474
            processedReadPairsForThisLib = 0
475
            for lane, readPairs in libraries[library].items():
476
                if args.n and processedReadPairsForThisLib >= args.n:
477
                    break
478
                for readPair in readPairs:
479
                    pass
480
481
                for readPairIdx, _ in enumerate(readPairs[readPair]):
482
                    files = [readPairs[readPair][readPairIdx]
483
                             for readPair in readPairs]
484
                    log_handle.write(
485
                        f"processing input files:\t{','.join(files)}\n")
486
                    processedReadPairs, strategyYields = dmx.demultiplex(files,
487
                                                                         strategies=selectedStrategies,
488
                                                                         targetFile=handle,
489
                                                                         rejectHandle=rejectHandle,
490
                                                                         log_handle=log_handle,
491
                                                                         library=library,
492
                                                                         maxReadPairs=None if args.n is None else (args.n - processedReadPairsForThisLib))
493
                    processedReadPairsForThisLib += processedReadPairs
494
                    log_handle.write(
495
                        f"done, processed:\t{processedReadPairsForThisLib} reads\n")
496
497
                    if args.n and processedReadPairsForThisLib >= args.n:
498
                        break
499
            handle.close()
500
            if not args.norejects:
501
                rejectHandle.close()
502
            log_handle.write(f'Demultiplexing finished\n')
503
            log_handle.close()
504
505
    if args.sched is not None:
506
        print('Final job ids:', ','.join(final_jobs))