Diff of /qiita_ware/ebi.py [000000] .. [879b32]

Switch to unified view

a b/qiita_ware/ebi.py
1
# -----------------------------------------------------------------------------
2
# Copyright (c) 2014--, The Qiita Development Team.
3
#
4
# Distributed under the terms of the BSD 3-clause License.
5
#
6
# The full license is in the file LICENSE, distributed with this software.
7
# -----------------------------------------------------------------------------
8
9
import hashlib
10
from os.path import basename, join, isdir, isfile, exists
11
from shutil import copyfile, rmtree
12
from os import remove, listdir, makedirs
13
from datetime import date, timedelta
14
from urllib.parse import quote
15
from itertools import zip_longest
16
from xml.etree import ElementTree as ET
17
from xml.etree.ElementTree import ParseError
18
from xml.sax.saxutils import escape
19
from gzip import GzipFile
20
from functools import partial
21
from h5py import File
22
from qiita_files.demux import to_per_sample_ascii
23
24
from qiita_core.qiita_settings import qiita_config
25
from qiita_ware.exceptions import EBISubmissionError
26
from qiita_db.util import create_nested_path
27
from qiita_db.logger import LogEntry
28
from qiita_db.ontology import Ontology
29
from qiita_db.util import convert_to_id, get_mountpoint, open_file
30
from qiita_db.artifact import Artifact
31
from qiita_db.metadata_template.constants import (
32
    TARGET_GENE_DATA_TYPES, PREP_TEMPLATE_COLUMNS_TARGET_GENE)
33
from qiita_db.processing_job import _system_call as system_call
34
35
36
def clean_whitespace(text):
37
    """Standardizes whitespaces so there is only one space separating tokens
38
39
    Parameters
40
    ----------
41
    text : str
42
        The fixed text
43
44
    Returns
45
    -------
46
    str
47
        fixed text
48
    """
49
    return ' '.join(str(text).split())
50
51
52
class EBISubmission(object):
53
    """Define an EBI submission, generate submission files and submit
54
55
    Submit an artifact to EBI
56
57
    The steps for EBI submission are:
58
    1. Validate that we have all required info to submit
59
    2. Generate per sample demultiplexed files
60
    3. Generate XML files for submission
61
    4. Submit sequences files
62
    5. Submit XML files. The answer has the EBI submission numbers.
63
64
    Parameters
65
    ----------
66
    artifact_id : int
67
        The artifact id to submit
68
    action : str
69
        The action to perform. Valid options see
70
        EBISubmission.valid_ebi_actions
71
72
    Raises
73
    ------
74
    EBISubmissionError
75
        - If the action is not in EBISubmission.valid_ebi_actions
76
        - If the artifact cannot be submitted to EBI
77
        - If the artifact has been already submitted to EBI and the action
78
        is different from 'MODIFY'
79
        - If the status of the study attached to the artifact is `submitting`
80
        - If the prep template investigation type is not in the
81
        ena_ontology.terms or not in the ena_ontology.user_defined_terms
82
        - If the submission is missing required EBI fields either in the sample
83
        or prep template
84
        - If the sample preparation metadata doesn't have a platform field or
85
        it isn't a EBISubmission.valid_platforms
86
    """
87
    FWD_READ_SUFFIX = '.R1.fastq.gz'
88
    REV_READ_SUFFIX = '.R2.fastq.gz'
89
90
    valid_ebi_actions = ('ADD', 'VALIDATE', 'MODIFY')
91
    valid_ebi_submission_states = ('submitting')
92
    # valid_platforms dict of 'platform': ['valid_instrument_models']
93
    valid_platforms = {'LS454': ['454 GS', '454 GS 20', '454 GS FLX',
94
                                 '454 GS FLX+', '454 GS FLX TITANIUM',
95
                                 '454 GS JUNIOR', 'UNSPECIFIED'],
96
                       'ION_TORRENT': ['ION TORRENT PGM', 'ION TORRENT PROTON',
97
                                       'ION TORRENT S5', 'ION TORRENT S5 XL'],
98
                       'ILLUMINA': ['HISEQ X FIVE',
99
                                    'HISEQ X TEN',
100
                                    'ILLUMINA GENOME ANALYZER',
101
                                    'ILLUMINA GENOME ANALYZER II',
102
                                    'ILLUMINA GENOME ANALYZER IIX',
103
                                    'ILLUMINA HISCANSQ',
104
                                    'ILLUMINA HISEQ 1000',
105
                                    'ILLUMINA HISEQ 1500',
106
                                    'ILLUMINA HISEQ 2000',
107
                                    'ILLUMINA HISEQ 2500',
108
                                    'ILLUMINA HISEQ 3000',
109
                                    'ILLUMINA HISEQ 4000',
110
                                    'ILLUMINA MISEQ',
111
                                    'ILLUMINA MINISEQ',
112
                                    'ILLUMINA NOVASEQ 6000',
113
                                    'ILLUMINA NOVASEQ X',
114
                                    'NEXTSEQ 500',
115
                                    'NEXTSEQ 550',
116
                                    'UNSPECIFIED'],
117
                       'OXFORD_NANOPORE': ['GRIDION'],
118
                       'PACBIO_SMRT': ['PACBIO RS',
119
                                       'PACBIO RS II',
120
                                       'SEQUEL',
121
                                       'ONSO',
122
                                       'REVIO',
123
                                       'SEQUEL IIE',
124
                                       'SEQUEL II']}
125
126
    xmlns_xsi = "http://www.w3.org/2001/XMLSchema-instance"
127
    xsi_noNSL = "ftp://ftp.sra.ebi.ac.uk/meta/xsd/sra_1_3/SRA.%s.xsd"
128
129
    def __init__(self, artifact_id, action):
130
        error_msgs = []
131
132
        if action not in self.valid_ebi_actions:
133
            error_msg = ("%s is not a valid EBI submission action, valid "
134
                         "actions are: %s" %
135
                         (action, ', '.join(self.valid_ebi_actions)))
136
            LogEntry.create('Runtime', error_msg)
137
            raise EBISubmissionError(error_msg)
138
139
        ena_ontology = Ontology(convert_to_id('ENA', 'ontology'))
140
        self.action = action
141
        self.artifact = Artifact(artifact_id)
142
        if not self.artifact.can_be_submitted_to_ebi:
143
            error_msg = ("Artifact %d cannot be submitted to EBI"
144
                         % self.artifact.id)
145
            LogEntry.create('Runtime', error_msg)
146
            raise EBISubmissionError(error_msg)
147
148
        self.study = self.artifact.study
149
        self.sample_template = self.study.sample_template
150
        # If we reach this point, there should be only one prep template
151
        # attached to the artifact. By design, each artifact has at least one
152
        # prep template. Artifacts with more than one prep template cannot be
153
        # submitted to EBI, so the attribute 'can_be_submitted_to_ebi' should
154
        # be set to false, which is checked in the previous if statement
155
        self.prep_template = self.artifact.prep_templates[0]
156
157
        if self.artifact.is_submitted_to_ebi and action != 'MODIFY':
158
            error_msg = ("Cannot resubmit! Artifact %d has already "
159
                         "been submitted to EBI." % artifact_id)
160
            LogEntry.create('Runtime', error_msg)
161
            raise EBISubmissionError(error_msg)
162
163
        self.artifact_id = artifact_id
164
        self.study_title = self.study.title
165
        self.study_abstract = self.study.info['study_abstract']
166
167
        it = self.prep_template.investigation_type
168
        if it in ena_ontology.terms:
169
            self.investigation_type = it
170
            self.new_investigation_type = None
171
        elif it in ena_ontology.user_defined_terms:
172
            self.investigation_type = 'Other'
173
            self.new_investigation_type = it
174
        else:
175
            # This should never happen
176
            error_msgs.append("Unrecognized investigation type: '%s'. This "
177
                              "term is neither one of the official terms nor "
178
                              "one of the user-defined terms in the ENA "
179
                              "ontology." % it)
180
        _, base_fp = get_mountpoint("preprocessed_data")[0]
181
        self.ebi_dir = '%d_ebi_submission' % artifact_id
182
        self.full_ebi_dir = join(base_fp, self.ebi_dir)
183
        self.ascp_reply = join(self.full_ebi_dir, 'ascp_reply.txt')
184
        self.curl_reply = join(self.full_ebi_dir, 'curl_reply.xml')
185
        self.xml_dir = join(self.full_ebi_dir, 'xml_dir')
186
        self.study_xml_fp = None
187
        self.sample_xml_fp = None
188
        self.experiment_xml_fp = None
189
        self.run_xml_fp = None
190
        self.submission_xml_fp = None
191
        self.per_sample_FASTQ_reverse = False
192
        self.publications = self.study.publications
193
194
        # getting the restrictions
195
        st_restrictions = [self.sample_template.columns_restrictions['EBI']]
196
        pt_restrictions = [self.prep_template.columns_restrictions['EBI']]
197
        if self.artifact.data_type in TARGET_GENE_DATA_TYPES:
198
            # adding restrictions on primer and barcode as these are
199
            # conditionally requiered for target gene
200
            pt_restrictions.append(
201
                PREP_TEMPLATE_COLUMNS_TARGET_GENE['demultiplex'])
202
        st_missing = self.sample_template.check_restrictions(st_restrictions)
203
        pt_missing = self.prep_template.check_restrictions(pt_restrictions)
204
        # testing if there are any missing columns
205
        if st_missing:
206
            error_msgs.append("Missing column in the sample template: %s" %
207
                              ', '.join(list(st_missing)))
208
        if pt_missing:
209
            error_msgs.append("Missing column in the prep template: %s" %
210
                              ', '.join(list(pt_missing)))
211
212
        # generating all samples from sample template
213
        self.samples = {}
214
        self.samples_prep = {}
215
        self.sample_demux_fps = {}
216
        get_output_fp = partial(join, self.full_ebi_dir)
217
        nvp = []
218
        nvim = []
219
        for k, sample_prep in self.prep_template.items():
220
            # validating required fields
221
            if ('platform' not in sample_prep or
222
                    sample_prep['platform'] is None):
223
                nvp.append(k)
224
            else:
225
                platform = sample_prep['platform'].upper()
226
                if platform not in self.valid_platforms:
227
                    nvp.append(k)
228
                else:
229
                    if ('instrument_model' not in sample_prep or
230
                            sample_prep['instrument_model'] is None):
231
                        nvim.append(k)
232
                    else:
233
                        im = sample_prep['instrument_model'].upper()
234
                        if im not in self.valid_platforms[platform]:
235
                            nvim.append(k)
236
237
            # IMPORTANT: note that we are generating the samples we are going
238
            # to be using during submission and they come from the sample info
239
            # file, however, we are only retrieving the samples that exist in
240
            # the prep AKA not all samples
241
            self.samples[k] = self.sample_template.get(sample_prep.id)
242
            self.samples_prep[k] = sample_prep
243
            self.sample_demux_fps[k] = get_output_fp(k)
244
245
        if nvp:
246
            error_msgs.append("These samples do not have a valid platform "
247
                              "(instrumet model wasn't checked): %s" % (
248
                                  ', '.join(nvp)))
249
        if nvim:
250
            error_msgs.append("These samples do not have a valid instrument "
251
                              "model: %s" % (', '.join(nvim)))
252
        if error_msgs:
253
            error_msgs = ("Errors found during EBI submission for study #%d, "
254
                          "artifact #%d and prep template #%d:\n%s"
255
                          % (self.study.id, artifact_id,
256
                             self.prep_template.id, '\n'.join(error_msgs)))
257
            LogEntry.create('Runtime', error_msgs)
258
            raise EBISubmissionError(error_msgs)
259
260
        self._sample_aliases = {}
261
        self._experiment_aliases = {}
262
        self._run_aliases = {}
263
264
        self._ebi_sample_accessions = \
265
            self.sample_template.ebi_sample_accessions
266
        self._ebi_experiment_accessions = \
267
            self.prep_template.ebi_experiment_accessions
268
269
    def _get_study_alias(self):
270
        """Format alias using ``self.study_id``"""
271
        study_alias_format = '%s_sid_%s'
272
        return study_alias_format % (
273
            qiita_config.ebi_organization_prefix,
274
            escape(clean_whitespace(str(self.study.id))))
275
276
    def _get_sample_alias(self, sample_name):
277
        """Format alias using ``self.study_id``, `sample_name`"""
278
        alias = "%s:%s" % (self._get_study_alias(),
279
                           escape(clean_whitespace(str(sample_name))))
280
        self._sample_aliases[alias] = sample_name
281
        return alias
282
283
    def _get_experiment_alias(self, sample_name):
284
        """Format alias using ``self.prep_template.id``, and `sample_name`
285
286
        Currently, this is identical to _get_sample_alias above, since we are
287
        only going to allow submission of one prep for each sample
288
        """
289
        exp_alias_format = '%s_ptid_%s:%s'
290
        alias = exp_alias_format % (
291
            qiita_config.ebi_organization_prefix,
292
            escape(clean_whitespace(str(self.prep_template.id))),
293
            escape(clean_whitespace(str(sample_name))))
294
        self._experiment_aliases[alias] = sample_name
295
        return alias
296
297
    def _get_submission_alias(self):
298
        """Format alias using ``self.artifact_id``"""
299
        safe_artifact_id = escape(
300
            clean_whitespace(str(self.artifact_id)))
301
        submission_alias_format = '%s_submission_%s'
302
        return submission_alias_format % (qiita_config.ebi_organization_prefix,
303
                                          safe_artifact_id)
304
305
    def _get_run_alias(self, sample_name):
306
        """Format alias using `sample_name`
307
        """
308
        alias = '%s_ppdid_%s:%s' % (
309
            qiita_config.ebi_organization_prefix,
310
            escape(clean_whitespace(str(self.artifact_id))),
311
            sample_name)
312
        self._run_aliases[alias] = sample_name
313
        return alias
314
315
    def _get_library_name(self, sample_name):
316
        """Format alias using `sample_name`
317
        """
318
        return escape(clean_whitespace(sample_name))
319
320
    def _add_dict_as_tags_and_values(self, parent_node, attribute_element_name,
321
                                     data_dict):
322
        """Format key/value data using a common EBI XML motif"""
323
        for attr, val in sorted(data_dict.items()):
324
            if val is None:
325
                val = "Unknown"
326
            attribute_element = ET.SubElement(parent_node,
327
                                              attribute_element_name)
328
            tag = ET.SubElement(attribute_element, 'TAG')
329
            tag.text = clean_whitespace(attr)
330
            value = ET.SubElement(attribute_element, 'VALUE')
331
            value.text = clean_whitespace(val)
332
333
    def _get_publication_element(self, study_links, pmid, db_name):
334
        study_link = ET.SubElement(study_links, 'STUDY_LINK')
335
        xref_link = ET.SubElement(study_link,  'XREF_LINK')
336
337
        db = ET.SubElement(xref_link, 'DB')
338
        db.text = db_name
339
340
        _id = ET.SubElement(xref_link, 'ID')
341
        _id.text = str(pmid)
342
343
    def generate_study_xml(self):
344
        """Generates the string for study XML file
345
346
        Returns
347
        -------
348
        ET.Element
349
            Object with study XML values
350
        """
351
        study_set = ET.Element('STUDY_SET', {
352
            'xmlns:xsi': self.xmlns_xsi,
353
            'xsi:noNamespaceSchemaLocation': self.xsi_noNSL % "study"})
354
355
        study = ET.SubElement(study_set, 'STUDY', {
356
            'alias': self._get_study_alias(),
357
            'center_name': qiita_config.ebi_center_name}
358
        )
359
360
        descriptor = ET.SubElement(study, 'DESCRIPTOR')
361
        study_title = ET.SubElement(descriptor, 'STUDY_TITLE')
362
        study_title.text = escape(clean_whitespace(self.study_title))
363
364
        # study type is deprecated and not displayed anywhere on EBI-ENA;
365
        # however it's required for submission so just injecting with Other
366
        ET.SubElement(
367
            descriptor, 'STUDY_TYPE', {'existing_study_type': 'Other'})
368
369
        study_abstract = ET.SubElement(descriptor, 'STUDY_ABSTRACT')
370
        study_abstract.text = clean_whitespace(escape(self.study_abstract))
371
372
        # Add pubmed IDs
373
        if self.publications:
374
            study_links = ET.SubElement(study, 'STUDY_LINKS')
375
            for pub, is_doi in self.publications:
376
                if is_doi:
377
                    self._get_publication_element(study_links, pub, 'DOI')
378
                else:
379
                    self._get_publication_element(study_links, pub, 'PUBMED')
380
381
        return study_set
382
383
    def generate_sample_xml(self, samples=None, ignore_columns=None):
384
        """Generates the sample XML file
385
386
        Parameters
387
        ----------
388
        samples : list of str, optional
389
            The list of samples to be included in the sample xml. If not
390
            provided or an empty list is provided, all the samples are used
391
        ignore_columns : list of str, optional
392
            The list of columns to ignore during submission; helpful for when
393
            the submissions are too large
394
395
        Returns
396
        -------
397
        ET.Element
398
            Object with sample XML values
399
        """
400
        sample_set = ET.Element('SAMPLE_SET', {
401
            'xmlns:xsi': self.xmlns_xsi,
402
            "xsi:noNamespaceSchemaLocation": self.xsi_noNSL % "sample"})
403
404
        if not samples:
405
            samples = self.samples.keys()
406
407
        for sample_name in sorted(samples):
408
            sample_info = dict(self.samples[sample_name])
409
            if 'country' in sample_info.keys():
410
                nname = 'geographic location (country and/or sea)'
411
                sample_info[nname] = sample_info['country']
412
413
            sample_accession = self._ebi_sample_accessions[sample_name]
414
            if self.action in ('ADD', 'VALIDATE'):
415
                if sample_accession is not None:
416
                    continue
417
                else:
418
                    sample = ET.SubElement(sample_set, 'SAMPLE', {
419
                        'alias': self._get_sample_alias(sample_name),
420
                        'center_name': qiita_config.ebi_center_name}
421
                    )
422
            else:
423
                sample = ET.SubElement(sample_set, 'SAMPLE', {
424
                    'accession': sample_accession,
425
                    'center_name': qiita_config.ebi_center_name}
426
                )
427
428
            sample_title = ET.SubElement(sample, 'TITLE')
429
            sample_title.text = escape(clean_whitespace(sample_name))
430
431
            sample_sample_name = ET.SubElement(sample, 'SAMPLE_NAME')
432
            taxon_id = ET.SubElement(sample_sample_name, 'TAXON_ID')
433
            text = sample_info.pop('taxon_id')
434
            taxon_id.text = escape(clean_whitespace(text))
435
436
            scientific_name = ET.SubElement(
437
                sample_sample_name, 'SCIENTIFIC_NAME')
438
            text = sample_info.pop('scientific_name')
439
            scientific_name.text = escape(clean_whitespace(text))
440
441
            description = ET.SubElement(sample, 'DESCRIPTION')
442
            text = sample_info.pop('description')
443
            description.text = escape(clean_whitespace(text))
444
445
            if sample_info:
446
                if ignore_columns is not None:
447
                    for key in ignore_columns:
448
                        del sample_info[key]
449
                sample_attributes = ET.SubElement(sample, 'SAMPLE_ATTRIBUTES')
450
                self._add_dict_as_tags_and_values(sample_attributes,
451
                                                  'SAMPLE_ATTRIBUTE',
452
                                                  sample_info)
453
454
        return sample_set
455
456
    def _generate_spot_descriptor(self, design, platform):
457
        """This XML element (and its subelements) must be written for every
458
        sample, but its generation depends on only study-level information.
459
        Therefore, we can break it out into its own method.
460
        """
461
        # This section applies only to the LS454 platform
462
        if platform != 'LS454':
463
            return
464
465
        # There is some hard-coded information in here, but this is what we
466
        # have always done in the past...
467
        spot_descriptor = ET.SubElement(design, 'SPOT_DESCRIPTOR')
468
        ET.SubElement(spot_descriptor, 'SPOT_DECODE_SPEC')
469
        read_spec = ET.SubElement(spot_descriptor, 'READ_SPEC')
470
471
        read_index = ET.SubElement(read_spec, 'READ_INDEX')
472
        read_index.text = '0'
473
        read_class = ET.SubElement(read_spec, 'READ_CLASS')
474
        read_class.text = 'Application Read'
475
        read_type = ET.SubElement(read_spec, 'READ_TYPE')
476
        read_type.text = 'Forward'
477
        base_coord = ET.SubElement(read_spec, 'BASE_COORD')
478
        base_coord.text = '1'
479
480
    def generate_experiment_xml(self, samples=None):
481
        """Generates the experiment XML file
482
483
        Parameters
484
        ----------
485
        samples : list of str, optional
486
            The list of samples to be included in the experiment xml
487
488
        Returns
489
        -------
490
        ET.Element
491
            Object with experiment XML values
492
        """
493
        study_accession = self.study.ebi_study_accession
494
        if study_accession:
495
            study_ref_dict = {'accession': study_accession}
496
        else:
497
            study_ref_dict = {'refname': self._get_study_alias()}
498
499
        experiment_set = ET.Element('EXPERIMENT_SET', {
500
            'xmlns:xsi': self.xmlns_xsi,
501
            "xsi:noNamespaceSchemaLocation": self.xsi_noNSL % "experiment"})
502
503
        samples = samples if samples is not None else self.samples.keys()
504
505
        if self.investigation_type == 'Other':
506
            library_strategy = self.new_investigation_type
507
        else:
508
            library_strategy = self.investigation_type
509
510
        for sample_name in sorted(samples):
511
            experiment_alias = self._get_experiment_alias(sample_name)
512
            sample_prep = dict(self.samples_prep[sample_name])
513
            if self._ebi_sample_accessions[sample_name]:
514
                sample_descriptor_dict = {
515
                    'accession': self._ebi_sample_accessions[sample_name]}
516
            else:
517
                sample_descriptor_dict = {
518
                    'refname': self._get_sample_alias(sample_name)}
519
520
            platform = sample_prep.pop('platform')
521
            experiment = ET.SubElement(experiment_set, 'EXPERIMENT', {
522
                'alias': experiment_alias,
523
                'center_name': qiita_config.ebi_center_name}
524
            )
525
            title = ET.SubElement(experiment, 'TITLE')
526
            title.text = experiment_alias
527
            ET.SubElement(experiment, 'STUDY_REF', study_ref_dict)
528
529
            design = ET.SubElement(experiment, 'DESIGN')
530
            design_description = ET.SubElement(design,
531
                                               'DESIGN_DESCRIPTION')
532
            edd = sample_prep.pop('experiment_design_description')
533
            design_description.text = escape(clean_whitespace(edd))
534
            ET.SubElement(design, 'SAMPLE_DESCRIPTOR', sample_descriptor_dict)
535
536
            # this is the library contruction section. The only required fields
537
            # is library_construction_protocol, the other are optional
538
            library_descriptor = ET.SubElement(design, 'LIBRARY_DESCRIPTOR')
539
            library_name = ET.SubElement(library_descriptor, 'LIBRARY_NAME')
540
            library_name.text = self._get_library_name(sample_name)
541
542
            lg = ET.SubElement(library_descriptor, 'LIBRARY_STRATEGY')
543
            lg.text = escape(clean_whitespace(library_strategy))
544
545
            # hardcoding some values,
546
            # see https://github.com/biocore/qiita/issues/1485
547
            library_source = ET.SubElement(library_descriptor,
548
                                           "LIBRARY_SOURCE")
549
            library_source.text = "METAGENOMIC"
550
            library_selection = ET.SubElement(library_descriptor,
551
                                              "LIBRARY_SELECTION")
552
            library_selection.text = "PCR"
553
            library_layout = ET.SubElement(library_descriptor,
554
                                           "LIBRARY_LAYOUT")
555
            if self.per_sample_FASTQ_reverse:
556
                ET.SubElement(library_layout, "PAIRED")
557
            else:
558
                ET.SubElement(library_layout, "SINGLE")
559
560
            lcp = ET.SubElement(library_descriptor,
561
                                "LIBRARY_CONSTRUCTION_PROTOCOL")
562
            lcp.text = escape(clean_whitespace(
563
                sample_prep.pop('library_construction_protocol')))
564
565
            self._generate_spot_descriptor(design, platform)
566
567
            platform_element = ET.SubElement(experiment, 'PLATFORM')
568
            platform_info = ET.SubElement(platform_element,
569
                                          platform.upper())
570
            instrument_model = ET.SubElement(platform_info, 'INSTRUMENT_MODEL')
571
            instrument_model.text = sample_prep.pop('instrument_model')
572
573
            if sample_prep:
574
                experiment_attributes = ET.SubElement(
575
                    experiment, 'EXPERIMENT_ATTRIBUTES')
576
                self._add_dict_as_tags_and_values(experiment_attributes,
577
                                                  'EXPERIMENT_ATTRIBUTE',
578
                                                  sample_prep)
579
580
        return experiment_set
581
582
    def _add_file_subelement(self, add_file, file_type, sample_name,
583
                             is_forward):
584
        """generate_run_xml helper to avoid duplication of code
585
        """
586
587
        if is_forward:
588
            suffix = self.FWD_READ_SUFFIX
589
        else:
590
            suffix = self.REV_READ_SUFFIX
591
592
        file_path = self.sample_demux_fps[sample_name] + suffix
593
        with open(file_path, 'rb') as fp:
594
            md5 = hashlib.md5(fp.read()).hexdigest()
595
596
        file_details = {'filetype': file_type,
597
                        'quality_scoring_system': 'phred',
598
                        'checksum_method': 'MD5',
599
                        'checksum': md5,
600
                        'filename': join(self.ebi_dir, basename(file_path))}
601
602
        add_file(file_details)
603
604
    def generate_run_xml(self):
605
        """Generates the run XML file
606
607
        Returns
608
        -------
609
        ET.Element
610
            Object with run XML values
611
        """
612
        run_set = ET.Element('RUN_SET', {
613
            'xmlns:xsi': self.xmlns_xsi,
614
            "xsi:noNamespaceSchemaLocation": self.xsi_noNSL % "run"})
615
        for sample_name, sample_prep in sorted(self.samples_prep.items()):
616
            sample_prep = dict(sample_prep)
617
618
            if self._ebi_experiment_accessions[sample_name]:
619
                experiment_ref_dict = {
620
                    'accession': self._ebi_experiment_accessions[sample_name]}
621
            else:
622
                experiment_alias = self._get_experiment_alias(sample_name)
623
                experiment_ref_dict = {'refname': experiment_alias}
624
625
            # We only submit fastq
626
            file_type = 'fastq'
627
            run = ET.SubElement(run_set, 'RUN', {
628
                'alias': self._get_run_alias(sample_name),
629
                'center_name': qiita_config.ebi_center_name}
630
            )
631
            ET.SubElement(run, 'EXPERIMENT_REF', experiment_ref_dict)
632
            data_block = ET.SubElement(run, 'DATA_BLOCK')
633
            files = ET.SubElement(data_block, 'FILES')
634
635
            add_file = partial(ET.SubElement, files, 'FILE')
636
            add_file_subelement = partial(self._add_file_subelement, add_file,
637
                                          file_type, sample_name)
638
            add_file_subelement(is_forward=True)
639
            if self.per_sample_FASTQ_reverse:
640
                add_file_subelement(is_forward=False)
641
642
        return run_set
643
644
    def generate_submission_xml(self, submission_date=None):
645
        """Generates the submission XML file
646
647
        Parameters
648
        ----------
649
        submission_date : date, optional
650
            Date when the submission was created, when None date.today() will
651
            be used.
652
653
        Returns
654
        -------
655
        ET.Element
656
            Object with submission XML values
657
658
        Notes
659
        -----
660
            EBI requieres a date when the submission will be automatically made
661
            public. This date is generated from the submission date + 365 days.
662
        """
663
        submission_set = ET.Element('SUBMISSION_SET', {
664
            'xmlns:xsi': self.xmlns_xsi,
665
            "xsi:noNamespaceSchemaLocation": self.xsi_noNSL % "submission"})
666
        submission = ET.SubElement(submission_set, 'SUBMISSION', {
667
            'alias': self._get_submission_alias(),
668
            'center_name': qiita_config.ebi_center_name}
669
        )
670
671
        actions = ET.SubElement(submission, 'ACTIONS')
672
673
        if self.study_xml_fp:
674
            study_action = ET.SubElement(actions, 'ACTION')
675
            ET.SubElement(study_action, self.action, {
676
                'schema': 'study',
677
                'source': basename(self.study_xml_fp)}
678
            )
679
680
        if self.sample_xml_fp:
681
            sample_action = ET.SubElement(actions, 'ACTION')
682
            ET.SubElement(sample_action, self.action, {
683
                'schema': 'sample',
684
                'source': basename(self.sample_xml_fp)}
685
            )
686
687
        if self.experiment_xml_fp:
688
            experiment_action = ET.SubElement(actions, 'ACTION')
689
            ET.SubElement(experiment_action, self.action, {
690
                'schema': 'experiment',
691
                'source': basename(self.experiment_xml_fp)}
692
            )
693
694
        if self.run_xml_fp:
695
            run_action = ET.SubElement(actions, 'ACTION')
696
            ET.SubElement(run_action, self.action, {
697
                'schema': 'run', 'source': basename(self.run_xml_fp)}
698
            )
699
700
        if submission_date is None:
701
            submission_date = date.today()
702
        if self.action == 'ADD':
703
            hold_action = ET.SubElement(actions, 'ACTION')
704
            ET.SubElement(hold_action, 'HOLD', {
705
                'HoldUntilDate': str(submission_date + timedelta(365))}
706
            )
707
708
        return submission_set
709
710
    def write_xml_file(self, element, fp):
711
        """Writes an XML file after calling one of the XML generation
712
        functions
713
714
        Parameters
715
        ----------
716
        element : ET.Element
717
            The Element to be written
718
        fp : str
719
            The filepath to which the XML will be written
720
        """
721
        if not exists(self.xml_dir):
722
            makedirs(self.xml_dir)
723
        ET.ElementTree(element).write(
724
            fp, encoding='UTF-8', xml_declaration=True)
725
726
    def generate_xml_files(self):
727
        """Generate all the XML files"""
728
        get_output_fp = partial(join, self.xml_dir)
729
730
        # There are really only 2 main cases for EBI submission: ADD and
731
        # MODIFY and the only exception is in MODIFY
732
        if self.action != 'MODIFY':
733
            # The study.xml file needs to be generated if and only if the study
734
            # does NOT have an ebi_study_accession
735
            if not self.study.ebi_study_accession:
736
                self.study_xml_fp = get_output_fp('study.xml')
737
                self.write_xml_file(self.generate_study_xml(),
738
                                    self.study_xml_fp)
739
740
            # The sample.xml file needs to be generated if and only if there
741
            # are samples in the current submission that do NOT have an
742
            # ebi_sample_accession
743
            new_samples = {
744
                sample for sample, accession in
745
                self.sample_template.ebi_sample_accessions.items()
746
                if accession is None}
747
            new_samples = new_samples.intersection(self.samples)
748
            if new_samples:
749
                self.sample_xml_fp = get_output_fp('sample.xml')
750
                self.write_xml_file(self.generate_sample_xml(new_samples),
751
                                    self.sample_xml_fp)
752
753
            # The experiment.xml needs to be generated if and only if there are
754
            # samples in the current submission that do NO have an
755
            # ebi_experiment_accession
756
            new_samples = {
757
                sample for sample, accession in
758
                self.prep_template.ebi_experiment_accessions.items()
759
                if accession is None}
760
            new_samples = new_samples.intersection(self.samples)
761
            if new_samples:
762
                self.experiment_xml_fp = get_output_fp('experiment.xml')
763
                self.write_xml_file(self.generate_experiment_xml(new_samples),
764
                                    self.experiment_xml_fp)
765
766
            # Generate the run.xml as it should always be generated
767
            self.run_xml_fp = get_output_fp('run.xml')
768
            self.write_xml_file(self.generate_run_xml(), self.run_xml_fp)
769
770
            self.submission_xml_fp = get_output_fp('submission.xml')
771
        else:
772
            # When MODIFY we can only modify the sample (sample.xml) and prep
773
            # (experiment.xml) template. The easiest is to generate both and
774
            # submit them. Note that we are assuming that Qiita is not
775
            # allowing to change preprocessing required information
776
            all_samples = self.sample_template.ebi_sample_accessions
777
            samples = {k: all_samples[k] for k in self.samples}
778
779
            # finding unique name for sample xml
780
            i = 0
781
            while True:
782
                self.sample_xml_fp = get_output_fp('sample_%d.xml' % i)
783
                if not exists(self.sample_xml_fp):
784
                    break
785
                i = i + 1
786
            self.write_xml_file(self.generate_sample_xml(samples),
787
                                self.sample_xml_fp)
788
789
            # finding unique name for experiment xml
790
            i = 0
791
            while True:
792
                self.experiment_xml_fp = get_output_fp('experiment_%d.xml' % i)
793
                if not exists(self.experiment_xml_fp):
794
                    break
795
                i = i + 1
796
            self.write_xml_file(self.generate_experiment_xml(samples),
797
                                self.experiment_xml_fp)
798
799
            # finding unique name for run xml
800
            i = 0
801
            while True:
802
                self.submission_xml_fp = get_output_fp('submission_%d.xml' % i)
803
                if not exists(self.submission_xml_fp):
804
                    break
805
                i = i + 1
806
807
            # just to keep all curl_reply-s we find a new name
808
            i = 0
809
            while True:
810
                self.curl_reply = join(self.full_ebi_dir,
811
                                       'curl_reply_%d.xml' % i)
812
                if not exists(self.curl_reply):
813
                    break
814
                i = i + 1
815
816
        # The submission.xml is always generated
817
        self.write_xml_file(self.generate_submission_xml(),
818
                            self.submission_xml_fp)
819
820
    def generate_curl_command(
821
            self,
822
            ebi_seq_xfer_user=qiita_config.ebi_seq_xfer_user,
823
            ebi_seq_xfer_pass=qiita_config.ebi_seq_xfer_pass,
824
            ebi_dropbox_url=qiita_config.ebi_dropbox_url):
825
        """Generates the curl command for submission
826
827
        Parameters
828
        ----------
829
        ebi_seq_xfer_user : str
830
            The user to use when submitting to EBI
831
        ebi_seq_xfer_pass : str
832
            The user password issued by EBI for REST submissions
833
        ebi_dropbox_url : str
834
            The dropbox url
835
836
        Returns
837
        -------
838
        curl_command
839
            The curl string to be executed
840
841
        Notes
842
        -----
843
        - All 5 XML files (study, sample, experiment, run, and submission) must
844
          be generated before executing this function
845
        """
846
        # make sure that the XML files have been generated
847
        url = '?auth=ENA%20{0}%20{1}'.format(quote(ebi_seq_xfer_user),
848
                                             quote(ebi_seq_xfer_pass))
849
        curl_cmd = ['curl -sS -k']
850
        if self.submission_xml_fp is not None:
851
            curl_cmd.append(' -F "SUBMISSION=@%s"' % self.submission_xml_fp)
852
        if self.study_xml_fp is not None:
853
            curl_cmd.append(' -F "STUDY=@%s"' % self.study_xml_fp)
854
        if self.sample_xml_fp is not None:
855
            curl_cmd.append(' -F "SAMPLE=@%s"' % self.sample_xml_fp)
856
        if self.run_xml_fp is not None:
857
            curl_cmd.append(' -F "RUN=@%s"' % self.run_xml_fp)
858
        if self.experiment_xml_fp is not None:
859
            curl_cmd.append(' -F "EXPERIMENT=@%s"' % self.experiment_xml_fp)
860
        curl_cmd.append(' "%s"' % join(ebi_dropbox_url, url))
861
862
        return ''.join(curl_cmd)
863
864
    def generate_send_sequences_cmd(self):
865
        """Generate the sequences to EBI via ascp command
866
867
        Returns
868
        -------
869
        ascp_command
870
            The ascp command to be executed
871
872
        Notes
873
        -----
874
        - All 5 XML files (study, sample, experiment, run, and submission) must
875
          be generated before executing this function
876
        """
877
        fastqs = []
878
        for _, sfp in self.sample_demux_fps.items():
879
            fastqs.append(sfp + self.FWD_READ_SUFFIX)
880
            if self.per_sample_FASTQ_reverse:
881
                sfp = sfp + self.REV_READ_SUFFIX
882
                fastqs.append(sfp)
883
        # divide all the fastqs in groups of 10
884
        fastqs_div = [fastqs[i::10] for i in range(10) if fastqs[i::10]]
885
        ascp_commands = []
886
        for f in fastqs_div:
887
            ascp_commands.append('ascp --ignore-host-key -d -QT -k2 '
888
                                 '{0} {1}@{2}:./{3}/'.format(
889
                                     ' '.join(f),
890
                                     qiita_config.ebi_seq_xfer_user,
891
                                     qiita_config.ebi_seq_xfer_url,
892
                                     self.ebi_dir))
893
894
        return ascp_commands
895
896
    def parse_EBI_reply(self, curl_result, test=False):
897
        """Parse and verify reply from EBI after sending XML files
898
899
        Parameters
900
        ----------
901
        curl_result : str
902
            The reply sent by EBI after sending XML files
903
        test : bool
904
            If true we will assume is a test and ignore some parsing errors
905
906
        Returns
907
        -------
908
        str
909
            The study accession number. None in case of failure
910
        dict of {str: str}
911
            The sample accession numbers, keyed by sample id. None in case of
912
            failure
913
        dict of {str: str}
914
            The biosample accession numbers, keyed by sample id. None in case
915
            of failure
916
        dict of {str: str}
917
            The experiment accession numbers, keyed by sample id. None in case
918
            of failure
919
        dict of {str: str}
920
            The run accession numbers, keyed by sample id. None in case of
921
            failure
922
923
        Raises
924
        ------
925
        EBISubmissionError
926
            If curl_result is not a valid XML file
927
            If the ebi subumission has not been successful
928
            If multiple study tags are found in the curl result
929
        """
930
        try:
931
            root = ET.fromstring(curl_result)
932
        except ParseError:
933
            error_msg = ("The curl result from the EBI submission doesn't "
934
                         "look like an XML file:\n%s" % curl_result)
935
            le = LogEntry.create('Runtime', error_msg)
936
            raise EBISubmissionError(
937
                "The curl result from the EBI submission doesn't look like "
938
                "an XML file. Contact and admin for more information. "
939
                "Log id: %d" % le.id)
940
941
        success = root.get('success') == 'true'
942
        if not success:
943
            # here we want to parse out the errors so the failures are clearer
944
            errors = {elem.text for elem in root.iter("ERROR")}
945
946
            raise EBISubmissionError("The EBI submission failed:\n%s"
947
                                     % '\n'.join(errors))
948
        if test:
949
            study_accession = 'MyStudyAccession'
950
            sample_accessions = {}
951
            biosample_accessions = {}
952
            experiment_accessions = {}
953
            run_accessions = {}
954
955
            return (study_accession, sample_accessions, biosample_accessions,
956
                    experiment_accessions, run_accessions)
957
958
        study_elem = root.findall("STUDY")
959
        if study_elem:
960
            if len(study_elem) > 1:
961
                raise EBISubmissionError(
962
                    "Multiple study tags found in EBI reply: %d"
963
                    % len(study_elem))
964
            study_elem = study_elem[0]
965
            study_accession = study_elem.get('accession')
966
        else:
967
            study_accession = None
968
969
        sample_accessions = {}
970
        biosample_accessions = {}
971
        for elem in root.iter("SAMPLE"):
972
            alias = elem.get('alias')
973
            sample_id = self._sample_aliases[alias]
974
            sample_accessions[sample_id] = elem.get('accession')
975
            ext_id = elem.find('EXT_ID')
976
            biosample_accessions[sample_id] = ext_id.get('accession')
977
978
        def data_retriever(key, trans_dict):
979
            res = {}
980
            for elem in root.iter(key):
981
                alias = elem.get('alias')
982
                res[trans_dict[alias]] = elem.get('accession')
983
            return res
984
        experiment_accessions = data_retriever("EXPERIMENT",
985
                                               self._experiment_aliases)
986
        run_accessions = data_retriever("RUN", self._run_aliases)
987
988
        return (study_accession, sample_accessions, biosample_accessions,
989
                experiment_accessions, run_accessions)
990
991
    def _generate_demultiplexed_fastq_per_sample_FASTQ(self):
992
        """Modularity helper"""
993
994
        # helper function to write files in this method
995
        def _rename_file(fp, new_fp):
996
            if fp.endswith('.gz'):
997
                copyfile(fp, new_fp)
998
            else:
999
                cmd = "gzip -c %s > %s" % (fp, new_fp)
1000
                stdout, stderr, rv = system_call(cmd)
1001
                if rv != 0:
1002
                    error_msg = (
1003
                        "Error:\nStd output:%s\nStd error:%s"
1004
                        % (stdout, stderr))
1005
                    raise EBISubmissionError(error_msg)
1006
1007
        fwd_reads = []
1008
        rev_reads = []
1009
        for x in self.artifact.filepaths:
1010
            if x['fp_type'] == 'raw_forward_seqs':
1011
                fwd_reads.append((basename(x['fp']), x['fp']))
1012
            elif x['fp_type'] == 'raw_reverse_seqs':
1013
                rev_reads.append((basename(x['fp']), x['fp']))
1014
        fwd_reads.sort(key=lambda x: x[1])
1015
        rev_reads.sort(key=lambda x: x[1])
1016
        if rev_reads:
1017
            self.per_sample_FASTQ_reverse = True
1018
1019
        # merging forward and reverse into a single list, note that at this
1020
        # stage the files have passed multiple rounds of reviews: validator
1021
        # when the artifact was created, the summary generator, etc. so we can
1022
        # assure that if a rev exists for 1 fwd, there is one for all of them
1023
        fps = []
1024
        for f, r in zip_longest(fwd_reads, rev_reads):
1025
            sample_name = f[0]
1026
            fwd_read = f[1]
1027
            rev_read = r[1] if r is not None else None
1028
            fps.append((sample_name, (fwd_read, rev_read)))
1029
1030
        if 'run_prefix' in self.prep_template.categories:
1031
            rps = [(k, v) for k, v in
1032
                   self.prep_template.get_category('run_prefix').items()]
1033
        else:
1034
            rps = [(v, v.split('.', 1)[1]) for v in self.prep_template.keys()]
1035
        rps.sort(key=lambda x: x[1])
1036
1037
        demux_samples = set()
1038
        for sn, rp in rps:
1039
            for i, (bn, fp) in enumerate(fps):
1040
                if bn.startswith(rp):
1041
                    demux_samples.add(sn)
1042
                    new_fp = self.sample_demux_fps[sn] + self.FWD_READ_SUFFIX
1043
                    _rename_file(fp[0], new_fp)
1044
1045
                    if fp[1] is not None:
1046
                        new_fp = self.sample_demux_fps[
1047
                            sn] + self.REV_READ_SUFFIX
1048
                        _rename_file(fp[1], new_fp)
1049
                    del fps[i]
1050
                    break
1051
        if fps:
1052
            error_msg = (
1053
                'Discrepancy between filepaths and sample names. Extra'
1054
                ' filepaths: %s' % ', '.join([fp[0] for fp in fps]))
1055
            LogEntry.create('Runtime', error_msg)
1056
            raise EBISubmissionError(error_msg)
1057
1058
        return demux_samples, \
1059
            set(self.samples.keys()).difference(set(demux_samples))
1060
1061
    def _generate_demultiplexed_fastq_demux(self, mtime):
1062
        """Modularity helper"""
1063
        # An artifact will hold only one file of type
1064
        # `preprocessed_demux`. Thus, we only use the first one
1065
        # (the only one present)
1066
        ar = self.artifact
1067
        demux = [x['fp'] for x in ar.filepaths
1068
                 if x['fp_type'] == 'preprocessed_demux'][0]
1069
1070
        demux_samples = set()
1071
        with open_file(demux) as demux_fh:
1072
            if not isinstance(demux_fh, File):
1073
                error_msg = (
1074
                    "'%s' doesn't look like a demux file" % demux)
1075
                LogEntry.create('Runtime', error_msg)
1076
                raise EBISubmissionError(error_msg)
1077
            for s, i in to_per_sample_ascii(demux_fh,
1078
                                            self.prep_template.keys()):
1079
                s = s.decode('ascii')
1080
                sample_fp = self.sample_demux_fps[s] + self.FWD_READ_SUFFIX
1081
                wrote_sequences = False
1082
                with GzipFile(sample_fp, mode='w', mtime=mtime) as fh:
1083
                    for record in i:
1084
                        fh.write(record)
1085
                        wrote_sequences = True
1086
1087
                if wrote_sequences:
1088
                    demux_samples.add(s)
1089
                else:
1090
                    del (self.samples[s])
1091
                    del (self.samples_prep[s])
1092
                    del (self.sample_demux_fps[s])
1093
                    remove(sample_fp)
1094
        return demux_samples
1095
1096
    def generate_demultiplexed_fastq(self, rewrite_fastq=False, mtime=None):
1097
        """Generates demultiplexed fastq
1098
1099
        Parameters
1100
        ----------
1101
        rewrite_fastq : bool, optional
1102
            If true, it forces the rewrite of the fastq files
1103
        mtime : float, optional
1104
            The time to use when creating the gz files. If None, the current
1105
            time will be used by gzip.GzipFile. This is useful for testing.
1106
1107
        Returns
1108
        -------
1109
        demux_samples
1110
            List of successful demultiplexed samples
1111
1112
        Notes
1113
        -----
1114
        - As a performace feature, this method will check if self.full_ebi_dir
1115
        already exists and, if it does, the script will assume that in a
1116
        previous execution this step was performed correctly and will simply
1117
        read the file names from self.full_ebi_dir
1118
        - When the object is created (init), samples, samples_prep and
1119
        sample_demux_fps hold values for all available samples in the database.
1120
        Here some of those values will be deleted (del's, within the loops) for
1121
        those cases where the fastq.gz files weren't written or exist. This is
1122
        an indication that they had no sequences and this kind of files are not
1123
        accepted in EBI
1124
1125
        Raises
1126
        ------
1127
        EBISubmissionError
1128
            - The demux file couldn't be read
1129
            - All samples are removed
1130
        """
1131
        dir_not_exists = not isdir(self.full_ebi_dir)
1132
        missing_samples = []
1133
        if dir_not_exists or rewrite_fastq:
1134
            # if it exists, remove folder and start from scratch
1135
            if isdir(self.full_ebi_dir):
1136
                rmtree(self.full_ebi_dir)
1137
1138
            create_nested_path(self.full_ebi_dir)
1139
1140
            if self.artifact.artifact_type == 'per_sample_FASTQ':
1141
                demux_samples, missing_samples = \
1142
                    self._generate_demultiplexed_fastq_per_sample_FASTQ()
1143
            else:
1144
                demux_samples = self._generate_demultiplexed_fastq_demux(mtime)
1145
        else:
1146
            # if we are within this else, it means that we already have
1147
            # generated the raw files and for some reason the submission
1148
            # failed so we don't need to generate the files again and just
1149
            # check which files exist in the file path to create our final
1150
            # list of samples
1151
            demux_samples = set()
1152
            extension = self.FWD_READ_SUFFIX
1153
            extension_len = len(extension)
1154
            all_missing_files = set()
1155
            for f in listdir(self.full_ebi_dir):
1156
                fpath = join(self.full_ebi_dir, f)
1157
                if isfile(fpath) and f.endswith(extension):
1158
                    demux_samples.add(f[:-extension_len])
1159
                else:
1160
                    all_missing_files.add(f[:-extension_len])
1161
            # at this stage we have created/reviewed all the files and have
1162
            # all the sample names, however, we are not sure if we are dealing
1163
            # with just forwards or if we are dealing with also reverse. The
1164
            # easiest way to do this is to review the all_missing_files
1165
            missing_files = all_missing_files - demux_samples
1166
            if missing_files != all_missing_files:
1167
                self.per_sample_FASTQ_reverse = True
1168
1169
            missing_samples = set(
1170
                self.samples.keys()).difference(demux_samples)
1171
1172
        if missing_samples:
1173
            for ms in missing_samples:
1174
                del (self.samples[ms])
1175
                del (self.samples_prep[ms])
1176
                del (self.sample_demux_fps[ms])
1177
1178
        if not demux_samples:
1179
            error_msg = ("All samples were removed from the submission "
1180
                         "because the demux file is empty or the sample names "
1181
                         "do not match.")
1182
            LogEntry.create('Runtime', error_msg)
1183
            raise EBISubmissionError(error_msg)
1184
1185
        return demux_samples