--- a +++ b/qiita_ware/ebi.py @@ -0,0 +1,1185 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- + +import hashlib +from os.path import basename, join, isdir, isfile, exists +from shutil import copyfile, rmtree +from os import remove, listdir, makedirs +from datetime import date, timedelta +from urllib.parse import quote +from itertools import zip_longest +from xml.etree import ElementTree as ET +from xml.etree.ElementTree import ParseError +from xml.sax.saxutils import escape +from gzip import GzipFile +from functools import partial +from h5py import File +from qiita_files.demux import to_per_sample_ascii + +from qiita_core.qiita_settings import qiita_config +from qiita_ware.exceptions import EBISubmissionError +from qiita_db.util import create_nested_path +from qiita_db.logger import LogEntry +from qiita_db.ontology import Ontology +from qiita_db.util import convert_to_id, get_mountpoint, open_file +from qiita_db.artifact import Artifact +from qiita_db.metadata_template.constants import ( + TARGET_GENE_DATA_TYPES, PREP_TEMPLATE_COLUMNS_TARGET_GENE) +from qiita_db.processing_job import _system_call as system_call + + +def clean_whitespace(text): + """Standardizes whitespaces so there is only one space separating tokens + + Parameters + ---------- + text : str + The fixed text + + Returns + ------- + str + fixed text + """ + return ' '.join(str(text).split()) + + +class EBISubmission(object): + """Define an EBI submission, generate submission files and submit + + Submit an artifact to EBI + + The steps for EBI submission are: + 1. Validate that we have all required info to submit + 2. Generate per sample demultiplexed files + 3. Generate XML files for submission + 4. Submit sequences files + 5. Submit XML files. The answer has the EBI submission numbers. + + Parameters + ---------- + artifact_id : int + The artifact id to submit + action : str + The action to perform. Valid options see + EBISubmission.valid_ebi_actions + + Raises + ------ + EBISubmissionError + - If the action is not in EBISubmission.valid_ebi_actions + - If the artifact cannot be submitted to EBI + - If the artifact has been already submitted to EBI and the action + is different from 'MODIFY' + - If the status of the study attached to the artifact is `submitting` + - If the prep template investigation type is not in the + ena_ontology.terms or not in the ena_ontology.user_defined_terms + - If the submission is missing required EBI fields either in the sample + or prep template + - If the sample preparation metadata doesn't have a platform field or + it isn't a EBISubmission.valid_platforms + """ + FWD_READ_SUFFIX = '.R1.fastq.gz' + REV_READ_SUFFIX = '.R2.fastq.gz' + + valid_ebi_actions = ('ADD', 'VALIDATE', 'MODIFY') + valid_ebi_submission_states = ('submitting') + # valid_platforms dict of 'platform': ['valid_instrument_models'] + valid_platforms = {'LS454': ['454 GS', '454 GS 20', '454 GS FLX', + '454 GS FLX+', '454 GS FLX TITANIUM', + '454 GS JUNIOR', 'UNSPECIFIED'], + 'ION_TORRENT': ['ION TORRENT PGM', 'ION TORRENT PROTON', + 'ION TORRENT S5', 'ION TORRENT S5 XL'], + 'ILLUMINA': ['HISEQ X FIVE', + 'HISEQ X TEN', + 'ILLUMINA GENOME ANALYZER', + 'ILLUMINA GENOME ANALYZER II', + 'ILLUMINA GENOME ANALYZER IIX', + 'ILLUMINA HISCANSQ', + 'ILLUMINA HISEQ 1000', + 'ILLUMINA HISEQ 1500', + 'ILLUMINA HISEQ 2000', + 'ILLUMINA HISEQ 2500', + 'ILLUMINA HISEQ 3000', + 'ILLUMINA HISEQ 4000', + 'ILLUMINA MISEQ', + 'ILLUMINA MINISEQ', + 'ILLUMINA NOVASEQ 6000', + 'ILLUMINA NOVASEQ X', + 'NEXTSEQ 500', + 'NEXTSEQ 550', + 'UNSPECIFIED'], + 'OXFORD_NANOPORE': ['GRIDION'], + 'PACBIO_SMRT': ['PACBIO RS', + 'PACBIO RS II', + 'SEQUEL', + 'ONSO', + 'REVIO', + 'SEQUEL IIE', + 'SEQUEL II']} + + xmlns_xsi = "http://www.w3.org/2001/XMLSchema-instance" + xsi_noNSL = "ftp://ftp.sra.ebi.ac.uk/meta/xsd/sra_1_3/SRA.%s.xsd" + + def __init__(self, artifact_id, action): + error_msgs = [] + + if action not in self.valid_ebi_actions: + error_msg = ("%s is not a valid EBI submission action, valid " + "actions are: %s" % + (action, ', '.join(self.valid_ebi_actions))) + LogEntry.create('Runtime', error_msg) + raise EBISubmissionError(error_msg) + + ena_ontology = Ontology(convert_to_id('ENA', 'ontology')) + self.action = action + self.artifact = Artifact(artifact_id) + if not self.artifact.can_be_submitted_to_ebi: + error_msg = ("Artifact %d cannot be submitted to EBI" + % self.artifact.id) + LogEntry.create('Runtime', error_msg) + raise EBISubmissionError(error_msg) + + self.study = self.artifact.study + self.sample_template = self.study.sample_template + # If we reach this point, there should be only one prep template + # attached to the artifact. By design, each artifact has at least one + # prep template. Artifacts with more than one prep template cannot be + # submitted to EBI, so the attribute 'can_be_submitted_to_ebi' should + # be set to false, which is checked in the previous if statement + self.prep_template = self.artifact.prep_templates[0] + + if self.artifact.is_submitted_to_ebi and action != 'MODIFY': + error_msg = ("Cannot resubmit! Artifact %d has already " + "been submitted to EBI." % artifact_id) + LogEntry.create('Runtime', error_msg) + raise EBISubmissionError(error_msg) + + self.artifact_id = artifact_id + self.study_title = self.study.title + self.study_abstract = self.study.info['study_abstract'] + + it = self.prep_template.investigation_type + if it in ena_ontology.terms: + self.investigation_type = it + self.new_investigation_type = None + elif it in ena_ontology.user_defined_terms: + self.investigation_type = 'Other' + self.new_investigation_type = it + else: + # This should never happen + error_msgs.append("Unrecognized investigation type: '%s'. This " + "term is neither one of the official terms nor " + "one of the user-defined terms in the ENA " + "ontology." % it) + _, base_fp = get_mountpoint("preprocessed_data")[0] + self.ebi_dir = '%d_ebi_submission' % artifact_id + self.full_ebi_dir = join(base_fp, self.ebi_dir) + self.ascp_reply = join(self.full_ebi_dir, 'ascp_reply.txt') + self.curl_reply = join(self.full_ebi_dir, 'curl_reply.xml') + self.xml_dir = join(self.full_ebi_dir, 'xml_dir') + self.study_xml_fp = None + self.sample_xml_fp = None + self.experiment_xml_fp = None + self.run_xml_fp = None + self.submission_xml_fp = None + self.per_sample_FASTQ_reverse = False + self.publications = self.study.publications + + # getting the restrictions + st_restrictions = [self.sample_template.columns_restrictions['EBI']] + pt_restrictions = [self.prep_template.columns_restrictions['EBI']] + if self.artifact.data_type in TARGET_GENE_DATA_TYPES: + # adding restrictions on primer and barcode as these are + # conditionally requiered for target gene + pt_restrictions.append( + PREP_TEMPLATE_COLUMNS_TARGET_GENE['demultiplex']) + st_missing = self.sample_template.check_restrictions(st_restrictions) + pt_missing = self.prep_template.check_restrictions(pt_restrictions) + # testing if there are any missing columns + if st_missing: + error_msgs.append("Missing column in the sample template: %s" % + ', '.join(list(st_missing))) + if pt_missing: + error_msgs.append("Missing column in the prep template: %s" % + ', '.join(list(pt_missing))) + + # generating all samples from sample template + self.samples = {} + self.samples_prep = {} + self.sample_demux_fps = {} + get_output_fp = partial(join, self.full_ebi_dir) + nvp = [] + nvim = [] + for k, sample_prep in self.prep_template.items(): + # validating required fields + if ('platform' not in sample_prep or + sample_prep['platform'] is None): + nvp.append(k) + else: + platform = sample_prep['platform'].upper() + if platform not in self.valid_platforms: + nvp.append(k) + else: + if ('instrument_model' not in sample_prep or + sample_prep['instrument_model'] is None): + nvim.append(k) + else: + im = sample_prep['instrument_model'].upper() + if im not in self.valid_platforms[platform]: + nvim.append(k) + + # IMPORTANT: note that we are generating the samples we are going + # to be using during submission and they come from the sample info + # file, however, we are only retrieving the samples that exist in + # the prep AKA not all samples + self.samples[k] = self.sample_template.get(sample_prep.id) + self.samples_prep[k] = sample_prep + self.sample_demux_fps[k] = get_output_fp(k) + + if nvp: + error_msgs.append("These samples do not have a valid platform " + "(instrumet model wasn't checked): %s" % ( + ', '.join(nvp))) + if nvim: + error_msgs.append("These samples do not have a valid instrument " + "model: %s" % (', '.join(nvim))) + if error_msgs: + error_msgs = ("Errors found during EBI submission for study #%d, " + "artifact #%d and prep template #%d:\n%s" + % (self.study.id, artifact_id, + self.prep_template.id, '\n'.join(error_msgs))) + LogEntry.create('Runtime', error_msgs) + raise EBISubmissionError(error_msgs) + + self._sample_aliases = {} + self._experiment_aliases = {} + self._run_aliases = {} + + self._ebi_sample_accessions = \ + self.sample_template.ebi_sample_accessions + self._ebi_experiment_accessions = \ + self.prep_template.ebi_experiment_accessions + + def _get_study_alias(self): + """Format alias using ``self.study_id``""" + study_alias_format = '%s_sid_%s' + return study_alias_format % ( + qiita_config.ebi_organization_prefix, + escape(clean_whitespace(str(self.study.id)))) + + def _get_sample_alias(self, sample_name): + """Format alias using ``self.study_id``, `sample_name`""" + alias = "%s:%s" % (self._get_study_alias(), + escape(clean_whitespace(str(sample_name)))) + self._sample_aliases[alias] = sample_name + return alias + + def _get_experiment_alias(self, sample_name): + """Format alias using ``self.prep_template.id``, and `sample_name` + + Currently, this is identical to _get_sample_alias above, since we are + only going to allow submission of one prep for each sample + """ + exp_alias_format = '%s_ptid_%s:%s' + alias = exp_alias_format % ( + qiita_config.ebi_organization_prefix, + escape(clean_whitespace(str(self.prep_template.id))), + escape(clean_whitespace(str(sample_name)))) + self._experiment_aliases[alias] = sample_name + return alias + + def _get_submission_alias(self): + """Format alias using ``self.artifact_id``""" + safe_artifact_id = escape( + clean_whitespace(str(self.artifact_id))) + submission_alias_format = '%s_submission_%s' + return submission_alias_format % (qiita_config.ebi_organization_prefix, + safe_artifact_id) + + def _get_run_alias(self, sample_name): + """Format alias using `sample_name` + """ + alias = '%s_ppdid_%s:%s' % ( + qiita_config.ebi_organization_prefix, + escape(clean_whitespace(str(self.artifact_id))), + sample_name) + self._run_aliases[alias] = sample_name + return alias + + def _get_library_name(self, sample_name): + """Format alias using `sample_name` + """ + return escape(clean_whitespace(sample_name)) + + def _add_dict_as_tags_and_values(self, parent_node, attribute_element_name, + data_dict): + """Format key/value data using a common EBI XML motif""" + for attr, val in sorted(data_dict.items()): + if val is None: + val = "Unknown" + attribute_element = ET.SubElement(parent_node, + attribute_element_name) + tag = ET.SubElement(attribute_element, 'TAG') + tag.text = clean_whitespace(attr) + value = ET.SubElement(attribute_element, 'VALUE') + value.text = clean_whitespace(val) + + def _get_publication_element(self, study_links, pmid, db_name): + study_link = ET.SubElement(study_links, 'STUDY_LINK') + xref_link = ET.SubElement(study_link, 'XREF_LINK') + + db = ET.SubElement(xref_link, 'DB') + db.text = db_name + + _id = ET.SubElement(xref_link, 'ID') + _id.text = str(pmid) + + def generate_study_xml(self): + """Generates the string for study XML file + + Returns + ------- + ET.Element + Object with study XML values + """ + study_set = ET.Element('STUDY_SET', { + 'xmlns:xsi': self.xmlns_xsi, + 'xsi:noNamespaceSchemaLocation': self.xsi_noNSL % "study"}) + + study = ET.SubElement(study_set, 'STUDY', { + 'alias': self._get_study_alias(), + 'center_name': qiita_config.ebi_center_name} + ) + + descriptor = ET.SubElement(study, 'DESCRIPTOR') + study_title = ET.SubElement(descriptor, 'STUDY_TITLE') + study_title.text = escape(clean_whitespace(self.study_title)) + + # study type is deprecated and not displayed anywhere on EBI-ENA; + # however it's required for submission so just injecting with Other + ET.SubElement( + descriptor, 'STUDY_TYPE', {'existing_study_type': 'Other'}) + + study_abstract = ET.SubElement(descriptor, 'STUDY_ABSTRACT') + study_abstract.text = clean_whitespace(escape(self.study_abstract)) + + # Add pubmed IDs + if self.publications: + study_links = ET.SubElement(study, 'STUDY_LINKS') + for pub, is_doi in self.publications: + if is_doi: + self._get_publication_element(study_links, pub, 'DOI') + else: + self._get_publication_element(study_links, pub, 'PUBMED') + + return study_set + + def generate_sample_xml(self, samples=None, ignore_columns=None): + """Generates the sample XML file + + Parameters + ---------- + samples : list of str, optional + The list of samples to be included in the sample xml. If not + provided or an empty list is provided, all the samples are used + ignore_columns : list of str, optional + The list of columns to ignore during submission; helpful for when + the submissions are too large + + Returns + ------- + ET.Element + Object with sample XML values + """ + sample_set = ET.Element('SAMPLE_SET', { + 'xmlns:xsi': self.xmlns_xsi, + "xsi:noNamespaceSchemaLocation": self.xsi_noNSL % "sample"}) + + if not samples: + samples = self.samples.keys() + + for sample_name in sorted(samples): + sample_info = dict(self.samples[sample_name]) + if 'country' in sample_info.keys(): + nname = 'geographic location (country and/or sea)' + sample_info[nname] = sample_info['country'] + + sample_accession = self._ebi_sample_accessions[sample_name] + if self.action in ('ADD', 'VALIDATE'): + if sample_accession is not None: + continue + else: + sample = ET.SubElement(sample_set, 'SAMPLE', { + 'alias': self._get_sample_alias(sample_name), + 'center_name': qiita_config.ebi_center_name} + ) + else: + sample = ET.SubElement(sample_set, 'SAMPLE', { + 'accession': sample_accession, + 'center_name': qiita_config.ebi_center_name} + ) + + sample_title = ET.SubElement(sample, 'TITLE') + sample_title.text = escape(clean_whitespace(sample_name)) + + sample_sample_name = ET.SubElement(sample, 'SAMPLE_NAME') + taxon_id = ET.SubElement(sample_sample_name, 'TAXON_ID') + text = sample_info.pop('taxon_id') + taxon_id.text = escape(clean_whitespace(text)) + + scientific_name = ET.SubElement( + sample_sample_name, 'SCIENTIFIC_NAME') + text = sample_info.pop('scientific_name') + scientific_name.text = escape(clean_whitespace(text)) + + description = ET.SubElement(sample, 'DESCRIPTION') + text = sample_info.pop('description') + description.text = escape(clean_whitespace(text)) + + if sample_info: + if ignore_columns is not None: + for key in ignore_columns: + del sample_info[key] + sample_attributes = ET.SubElement(sample, 'SAMPLE_ATTRIBUTES') + self._add_dict_as_tags_and_values(sample_attributes, + 'SAMPLE_ATTRIBUTE', + sample_info) + + return sample_set + + def _generate_spot_descriptor(self, design, platform): + """This XML element (and its subelements) must be written for every + sample, but its generation depends on only study-level information. + Therefore, we can break it out into its own method. + """ + # This section applies only to the LS454 platform + if platform != 'LS454': + return + + # There is some hard-coded information in here, but this is what we + # have always done in the past... + spot_descriptor = ET.SubElement(design, 'SPOT_DESCRIPTOR') + ET.SubElement(spot_descriptor, 'SPOT_DECODE_SPEC') + read_spec = ET.SubElement(spot_descriptor, 'READ_SPEC') + + read_index = ET.SubElement(read_spec, 'READ_INDEX') + read_index.text = '0' + read_class = ET.SubElement(read_spec, 'READ_CLASS') + read_class.text = 'Application Read' + read_type = ET.SubElement(read_spec, 'READ_TYPE') + read_type.text = 'Forward' + base_coord = ET.SubElement(read_spec, 'BASE_COORD') + base_coord.text = '1' + + def generate_experiment_xml(self, samples=None): + """Generates the experiment XML file + + Parameters + ---------- + samples : list of str, optional + The list of samples to be included in the experiment xml + + Returns + ------- + ET.Element + Object with experiment XML values + """ + study_accession = self.study.ebi_study_accession + if study_accession: + study_ref_dict = {'accession': study_accession} + else: + study_ref_dict = {'refname': self._get_study_alias()} + + experiment_set = ET.Element('EXPERIMENT_SET', { + 'xmlns:xsi': self.xmlns_xsi, + "xsi:noNamespaceSchemaLocation": self.xsi_noNSL % "experiment"}) + + samples = samples if samples is not None else self.samples.keys() + + if self.investigation_type == 'Other': + library_strategy = self.new_investigation_type + else: + library_strategy = self.investigation_type + + for sample_name in sorted(samples): + experiment_alias = self._get_experiment_alias(sample_name) + sample_prep = dict(self.samples_prep[sample_name]) + if self._ebi_sample_accessions[sample_name]: + sample_descriptor_dict = { + 'accession': self._ebi_sample_accessions[sample_name]} + else: + sample_descriptor_dict = { + 'refname': self._get_sample_alias(sample_name)} + + platform = sample_prep.pop('platform') + experiment = ET.SubElement(experiment_set, 'EXPERIMENT', { + 'alias': experiment_alias, + 'center_name': qiita_config.ebi_center_name} + ) + title = ET.SubElement(experiment, 'TITLE') + title.text = experiment_alias + ET.SubElement(experiment, 'STUDY_REF', study_ref_dict) + + design = ET.SubElement(experiment, 'DESIGN') + design_description = ET.SubElement(design, + 'DESIGN_DESCRIPTION') + edd = sample_prep.pop('experiment_design_description') + design_description.text = escape(clean_whitespace(edd)) + ET.SubElement(design, 'SAMPLE_DESCRIPTOR', sample_descriptor_dict) + + # this is the library contruction section. The only required fields + # is library_construction_protocol, the other are optional + library_descriptor = ET.SubElement(design, 'LIBRARY_DESCRIPTOR') + library_name = ET.SubElement(library_descriptor, 'LIBRARY_NAME') + library_name.text = self._get_library_name(sample_name) + + lg = ET.SubElement(library_descriptor, 'LIBRARY_STRATEGY') + lg.text = escape(clean_whitespace(library_strategy)) + + # hardcoding some values, + # see https://github.com/biocore/qiita/issues/1485 + library_source = ET.SubElement(library_descriptor, + "LIBRARY_SOURCE") + library_source.text = "METAGENOMIC" + library_selection = ET.SubElement(library_descriptor, + "LIBRARY_SELECTION") + library_selection.text = "PCR" + library_layout = ET.SubElement(library_descriptor, + "LIBRARY_LAYOUT") + if self.per_sample_FASTQ_reverse: + ET.SubElement(library_layout, "PAIRED") + else: + ET.SubElement(library_layout, "SINGLE") + + lcp = ET.SubElement(library_descriptor, + "LIBRARY_CONSTRUCTION_PROTOCOL") + lcp.text = escape(clean_whitespace( + sample_prep.pop('library_construction_protocol'))) + + self._generate_spot_descriptor(design, platform) + + platform_element = ET.SubElement(experiment, 'PLATFORM') + platform_info = ET.SubElement(platform_element, + platform.upper()) + instrument_model = ET.SubElement(platform_info, 'INSTRUMENT_MODEL') + instrument_model.text = sample_prep.pop('instrument_model') + + if sample_prep: + experiment_attributes = ET.SubElement( + experiment, 'EXPERIMENT_ATTRIBUTES') + self._add_dict_as_tags_and_values(experiment_attributes, + 'EXPERIMENT_ATTRIBUTE', + sample_prep) + + return experiment_set + + def _add_file_subelement(self, add_file, file_type, sample_name, + is_forward): + """generate_run_xml helper to avoid duplication of code + """ + + if is_forward: + suffix = self.FWD_READ_SUFFIX + else: + suffix = self.REV_READ_SUFFIX + + file_path = self.sample_demux_fps[sample_name] + suffix + with open(file_path, 'rb') as fp: + md5 = hashlib.md5(fp.read()).hexdigest() + + file_details = {'filetype': file_type, + 'quality_scoring_system': 'phred', + 'checksum_method': 'MD5', + 'checksum': md5, + 'filename': join(self.ebi_dir, basename(file_path))} + + add_file(file_details) + + def generate_run_xml(self): + """Generates the run XML file + + Returns + ------- + ET.Element + Object with run XML values + """ + run_set = ET.Element('RUN_SET', { + 'xmlns:xsi': self.xmlns_xsi, + "xsi:noNamespaceSchemaLocation": self.xsi_noNSL % "run"}) + for sample_name, sample_prep in sorted(self.samples_prep.items()): + sample_prep = dict(sample_prep) + + if self._ebi_experiment_accessions[sample_name]: + experiment_ref_dict = { + 'accession': self._ebi_experiment_accessions[sample_name]} + else: + experiment_alias = self._get_experiment_alias(sample_name) + experiment_ref_dict = {'refname': experiment_alias} + + # We only submit fastq + file_type = 'fastq' + run = ET.SubElement(run_set, 'RUN', { + 'alias': self._get_run_alias(sample_name), + 'center_name': qiita_config.ebi_center_name} + ) + ET.SubElement(run, 'EXPERIMENT_REF', experiment_ref_dict) + data_block = ET.SubElement(run, 'DATA_BLOCK') + files = ET.SubElement(data_block, 'FILES') + + add_file = partial(ET.SubElement, files, 'FILE') + add_file_subelement = partial(self._add_file_subelement, add_file, + file_type, sample_name) + add_file_subelement(is_forward=True) + if self.per_sample_FASTQ_reverse: + add_file_subelement(is_forward=False) + + return run_set + + def generate_submission_xml(self, submission_date=None): + """Generates the submission XML file + + Parameters + ---------- + submission_date : date, optional + Date when the submission was created, when None date.today() will + be used. + + Returns + ------- + ET.Element + Object with submission XML values + + Notes + ----- + EBI requieres a date when the submission will be automatically made + public. This date is generated from the submission date + 365 days. + """ + submission_set = ET.Element('SUBMISSION_SET', { + 'xmlns:xsi': self.xmlns_xsi, + "xsi:noNamespaceSchemaLocation": self.xsi_noNSL % "submission"}) + submission = ET.SubElement(submission_set, 'SUBMISSION', { + 'alias': self._get_submission_alias(), + 'center_name': qiita_config.ebi_center_name} + ) + + actions = ET.SubElement(submission, 'ACTIONS') + + if self.study_xml_fp: + study_action = ET.SubElement(actions, 'ACTION') + ET.SubElement(study_action, self.action, { + 'schema': 'study', + 'source': basename(self.study_xml_fp)} + ) + + if self.sample_xml_fp: + sample_action = ET.SubElement(actions, 'ACTION') + ET.SubElement(sample_action, self.action, { + 'schema': 'sample', + 'source': basename(self.sample_xml_fp)} + ) + + if self.experiment_xml_fp: + experiment_action = ET.SubElement(actions, 'ACTION') + ET.SubElement(experiment_action, self.action, { + 'schema': 'experiment', + 'source': basename(self.experiment_xml_fp)} + ) + + if self.run_xml_fp: + run_action = ET.SubElement(actions, 'ACTION') + ET.SubElement(run_action, self.action, { + 'schema': 'run', 'source': basename(self.run_xml_fp)} + ) + + if submission_date is None: + submission_date = date.today() + if self.action == 'ADD': + hold_action = ET.SubElement(actions, 'ACTION') + ET.SubElement(hold_action, 'HOLD', { + 'HoldUntilDate': str(submission_date + timedelta(365))} + ) + + return submission_set + + def write_xml_file(self, element, fp): + """Writes an XML file after calling one of the XML generation + functions + + Parameters + ---------- + element : ET.Element + The Element to be written + fp : str + The filepath to which the XML will be written + """ + if not exists(self.xml_dir): + makedirs(self.xml_dir) + ET.ElementTree(element).write( + fp, encoding='UTF-8', xml_declaration=True) + + def generate_xml_files(self): + """Generate all the XML files""" + get_output_fp = partial(join, self.xml_dir) + + # There are really only 2 main cases for EBI submission: ADD and + # MODIFY and the only exception is in MODIFY + if self.action != 'MODIFY': + # The study.xml file needs to be generated if and only if the study + # does NOT have an ebi_study_accession + if not self.study.ebi_study_accession: + self.study_xml_fp = get_output_fp('study.xml') + self.write_xml_file(self.generate_study_xml(), + self.study_xml_fp) + + # The sample.xml file needs to be generated if and only if there + # are samples in the current submission that do NOT have an + # ebi_sample_accession + new_samples = { + sample for sample, accession in + self.sample_template.ebi_sample_accessions.items() + if accession is None} + new_samples = new_samples.intersection(self.samples) + if new_samples: + self.sample_xml_fp = get_output_fp('sample.xml') + self.write_xml_file(self.generate_sample_xml(new_samples), + self.sample_xml_fp) + + # The experiment.xml needs to be generated if and only if there are + # samples in the current submission that do NO have an + # ebi_experiment_accession + new_samples = { + sample for sample, accession in + self.prep_template.ebi_experiment_accessions.items() + if accession is None} + new_samples = new_samples.intersection(self.samples) + if new_samples: + self.experiment_xml_fp = get_output_fp('experiment.xml') + self.write_xml_file(self.generate_experiment_xml(new_samples), + self.experiment_xml_fp) + + # Generate the run.xml as it should always be generated + self.run_xml_fp = get_output_fp('run.xml') + self.write_xml_file(self.generate_run_xml(), self.run_xml_fp) + + self.submission_xml_fp = get_output_fp('submission.xml') + else: + # When MODIFY we can only modify the sample (sample.xml) and prep + # (experiment.xml) template. The easiest is to generate both and + # submit them. Note that we are assuming that Qiita is not + # allowing to change preprocessing required information + all_samples = self.sample_template.ebi_sample_accessions + samples = {k: all_samples[k] for k in self.samples} + + # finding unique name for sample xml + i = 0 + while True: + self.sample_xml_fp = get_output_fp('sample_%d.xml' % i) + if not exists(self.sample_xml_fp): + break + i = i + 1 + self.write_xml_file(self.generate_sample_xml(samples), + self.sample_xml_fp) + + # finding unique name for experiment xml + i = 0 + while True: + self.experiment_xml_fp = get_output_fp('experiment_%d.xml' % i) + if not exists(self.experiment_xml_fp): + break + i = i + 1 + self.write_xml_file(self.generate_experiment_xml(samples), + self.experiment_xml_fp) + + # finding unique name for run xml + i = 0 + while True: + self.submission_xml_fp = get_output_fp('submission_%d.xml' % i) + if not exists(self.submission_xml_fp): + break + i = i + 1 + + # just to keep all curl_reply-s we find a new name + i = 0 + while True: + self.curl_reply = join(self.full_ebi_dir, + 'curl_reply_%d.xml' % i) + if not exists(self.curl_reply): + break + i = i + 1 + + # The submission.xml is always generated + self.write_xml_file(self.generate_submission_xml(), + self.submission_xml_fp) + + def generate_curl_command( + self, + ebi_seq_xfer_user=qiita_config.ebi_seq_xfer_user, + ebi_seq_xfer_pass=qiita_config.ebi_seq_xfer_pass, + ebi_dropbox_url=qiita_config.ebi_dropbox_url): + """Generates the curl command for submission + + Parameters + ---------- + ebi_seq_xfer_user : str + The user to use when submitting to EBI + ebi_seq_xfer_pass : str + The user password issued by EBI for REST submissions + ebi_dropbox_url : str + The dropbox url + + Returns + ------- + curl_command + The curl string to be executed + + Notes + ----- + - All 5 XML files (study, sample, experiment, run, and submission) must + be generated before executing this function + """ + # make sure that the XML files have been generated + url = '?auth=ENA%20{0}%20{1}'.format(quote(ebi_seq_xfer_user), + quote(ebi_seq_xfer_pass)) + curl_cmd = ['curl -sS -k'] + if self.submission_xml_fp is not None: + curl_cmd.append(' -F "SUBMISSION=@%s"' % self.submission_xml_fp) + if self.study_xml_fp is not None: + curl_cmd.append(' -F "STUDY=@%s"' % self.study_xml_fp) + if self.sample_xml_fp is not None: + curl_cmd.append(' -F "SAMPLE=@%s"' % self.sample_xml_fp) + if self.run_xml_fp is not None: + curl_cmd.append(' -F "RUN=@%s"' % self.run_xml_fp) + if self.experiment_xml_fp is not None: + curl_cmd.append(' -F "EXPERIMENT=@%s"' % self.experiment_xml_fp) + curl_cmd.append(' "%s"' % join(ebi_dropbox_url, url)) + + return ''.join(curl_cmd) + + def generate_send_sequences_cmd(self): + """Generate the sequences to EBI via ascp command + + Returns + ------- + ascp_command + The ascp command to be executed + + Notes + ----- + - All 5 XML files (study, sample, experiment, run, and submission) must + be generated before executing this function + """ + fastqs = [] + for _, sfp in self.sample_demux_fps.items(): + fastqs.append(sfp + self.FWD_READ_SUFFIX) + if self.per_sample_FASTQ_reverse: + sfp = sfp + self.REV_READ_SUFFIX + fastqs.append(sfp) + # divide all the fastqs in groups of 10 + fastqs_div = [fastqs[i::10] for i in range(10) if fastqs[i::10]] + ascp_commands = [] + for f in fastqs_div: + ascp_commands.append('ascp --ignore-host-key -d -QT -k2 ' + '{0} {1}@{2}:./{3}/'.format( + ' '.join(f), + qiita_config.ebi_seq_xfer_user, + qiita_config.ebi_seq_xfer_url, + self.ebi_dir)) + + return ascp_commands + + def parse_EBI_reply(self, curl_result, test=False): + """Parse and verify reply from EBI after sending XML files + + Parameters + ---------- + curl_result : str + The reply sent by EBI after sending XML files + test : bool + If true we will assume is a test and ignore some parsing errors + + Returns + ------- + str + The study accession number. None in case of failure + dict of {str: str} + The sample accession numbers, keyed by sample id. None in case of + failure + dict of {str: str} + The biosample accession numbers, keyed by sample id. None in case + of failure + dict of {str: str} + The experiment accession numbers, keyed by sample id. None in case + of failure + dict of {str: str} + The run accession numbers, keyed by sample id. None in case of + failure + + Raises + ------ + EBISubmissionError + If curl_result is not a valid XML file + If the ebi subumission has not been successful + If multiple study tags are found in the curl result + """ + try: + root = ET.fromstring(curl_result) + except ParseError: + error_msg = ("The curl result from the EBI submission doesn't " + "look like an XML file:\n%s" % curl_result) + le = LogEntry.create('Runtime', error_msg) + raise EBISubmissionError( + "The curl result from the EBI submission doesn't look like " + "an XML file. Contact and admin for more information. " + "Log id: %d" % le.id) + + success = root.get('success') == 'true' + if not success: + # here we want to parse out the errors so the failures are clearer + errors = {elem.text for elem in root.iter("ERROR")} + + raise EBISubmissionError("The EBI submission failed:\n%s" + % '\n'.join(errors)) + if test: + study_accession = 'MyStudyAccession' + sample_accessions = {} + biosample_accessions = {} + experiment_accessions = {} + run_accessions = {} + + return (study_accession, sample_accessions, biosample_accessions, + experiment_accessions, run_accessions) + + study_elem = root.findall("STUDY") + if study_elem: + if len(study_elem) > 1: + raise EBISubmissionError( + "Multiple study tags found in EBI reply: %d" + % len(study_elem)) + study_elem = study_elem[0] + study_accession = study_elem.get('accession') + else: + study_accession = None + + sample_accessions = {} + biosample_accessions = {} + for elem in root.iter("SAMPLE"): + alias = elem.get('alias') + sample_id = self._sample_aliases[alias] + sample_accessions[sample_id] = elem.get('accession') + ext_id = elem.find('EXT_ID') + biosample_accessions[sample_id] = ext_id.get('accession') + + def data_retriever(key, trans_dict): + res = {} + for elem in root.iter(key): + alias = elem.get('alias') + res[trans_dict[alias]] = elem.get('accession') + return res + experiment_accessions = data_retriever("EXPERIMENT", + self._experiment_aliases) + run_accessions = data_retriever("RUN", self._run_aliases) + + return (study_accession, sample_accessions, biosample_accessions, + experiment_accessions, run_accessions) + + def _generate_demultiplexed_fastq_per_sample_FASTQ(self): + """Modularity helper""" + + # helper function to write files in this method + def _rename_file(fp, new_fp): + if fp.endswith('.gz'): + copyfile(fp, new_fp) + else: + cmd = "gzip -c %s > %s" % (fp, new_fp) + stdout, stderr, rv = system_call(cmd) + if rv != 0: + error_msg = ( + "Error:\nStd output:%s\nStd error:%s" + % (stdout, stderr)) + raise EBISubmissionError(error_msg) + + fwd_reads = [] + rev_reads = [] + for x in self.artifact.filepaths: + if x['fp_type'] == 'raw_forward_seqs': + fwd_reads.append((basename(x['fp']), x['fp'])) + elif x['fp_type'] == 'raw_reverse_seqs': + rev_reads.append((basename(x['fp']), x['fp'])) + fwd_reads.sort(key=lambda x: x[1]) + rev_reads.sort(key=lambda x: x[1]) + if rev_reads: + self.per_sample_FASTQ_reverse = True + + # merging forward and reverse into a single list, note that at this + # stage the files have passed multiple rounds of reviews: validator + # when the artifact was created, the summary generator, etc. so we can + # assure that if a rev exists for 1 fwd, there is one for all of them + fps = [] + for f, r in zip_longest(fwd_reads, rev_reads): + sample_name = f[0] + fwd_read = f[1] + rev_read = r[1] if r is not None else None + fps.append((sample_name, (fwd_read, rev_read))) + + if 'run_prefix' in self.prep_template.categories: + rps = [(k, v) for k, v in + self.prep_template.get_category('run_prefix').items()] + else: + rps = [(v, v.split('.', 1)[1]) for v in self.prep_template.keys()] + rps.sort(key=lambda x: x[1]) + + demux_samples = set() + for sn, rp in rps: + for i, (bn, fp) in enumerate(fps): + if bn.startswith(rp): + demux_samples.add(sn) + new_fp = self.sample_demux_fps[sn] + self.FWD_READ_SUFFIX + _rename_file(fp[0], new_fp) + + if fp[1] is not None: + new_fp = self.sample_demux_fps[ + sn] + self.REV_READ_SUFFIX + _rename_file(fp[1], new_fp) + del fps[i] + break + if fps: + error_msg = ( + 'Discrepancy between filepaths and sample names. Extra' + ' filepaths: %s' % ', '.join([fp[0] for fp in fps])) + LogEntry.create('Runtime', error_msg) + raise EBISubmissionError(error_msg) + + return demux_samples, \ + set(self.samples.keys()).difference(set(demux_samples)) + + def _generate_demultiplexed_fastq_demux(self, mtime): + """Modularity helper""" + # An artifact will hold only one file of type + # `preprocessed_demux`. Thus, we only use the first one + # (the only one present) + ar = self.artifact + demux = [x['fp'] for x in ar.filepaths + if x['fp_type'] == 'preprocessed_demux'][0] + + demux_samples = set() + with open_file(demux) as demux_fh: + if not isinstance(demux_fh, File): + error_msg = ( + "'%s' doesn't look like a demux file" % demux) + LogEntry.create('Runtime', error_msg) + raise EBISubmissionError(error_msg) + for s, i in to_per_sample_ascii(demux_fh, + self.prep_template.keys()): + s = s.decode('ascii') + sample_fp = self.sample_demux_fps[s] + self.FWD_READ_SUFFIX + wrote_sequences = False + with GzipFile(sample_fp, mode='w', mtime=mtime) as fh: + for record in i: + fh.write(record) + wrote_sequences = True + + if wrote_sequences: + demux_samples.add(s) + else: + del (self.samples[s]) + del (self.samples_prep[s]) + del (self.sample_demux_fps[s]) + remove(sample_fp) + return demux_samples + + def generate_demultiplexed_fastq(self, rewrite_fastq=False, mtime=None): + """Generates demultiplexed fastq + + Parameters + ---------- + rewrite_fastq : bool, optional + If true, it forces the rewrite of the fastq files + mtime : float, optional + The time to use when creating the gz files. If None, the current + time will be used by gzip.GzipFile. This is useful for testing. + + Returns + ------- + demux_samples + List of successful demultiplexed samples + + Notes + ----- + - As a performace feature, this method will check if self.full_ebi_dir + already exists and, if it does, the script will assume that in a + previous execution this step was performed correctly and will simply + read the file names from self.full_ebi_dir + - When the object is created (init), samples, samples_prep and + sample_demux_fps hold values for all available samples in the database. + Here some of those values will be deleted (del's, within the loops) for + those cases where the fastq.gz files weren't written or exist. This is + an indication that they had no sequences and this kind of files are not + accepted in EBI + + Raises + ------ + EBISubmissionError + - The demux file couldn't be read + - All samples are removed + """ + dir_not_exists = not isdir(self.full_ebi_dir) + missing_samples = [] + if dir_not_exists or rewrite_fastq: + # if it exists, remove folder and start from scratch + if isdir(self.full_ebi_dir): + rmtree(self.full_ebi_dir) + + create_nested_path(self.full_ebi_dir) + + if self.artifact.artifact_type == 'per_sample_FASTQ': + demux_samples, missing_samples = \ + self._generate_demultiplexed_fastq_per_sample_FASTQ() + else: + demux_samples = self._generate_demultiplexed_fastq_demux(mtime) + else: + # if we are within this else, it means that we already have + # generated the raw files and for some reason the submission + # failed so we don't need to generate the files again and just + # check which files exist in the file path to create our final + # list of samples + demux_samples = set() + extension = self.FWD_READ_SUFFIX + extension_len = len(extension) + all_missing_files = set() + for f in listdir(self.full_ebi_dir): + fpath = join(self.full_ebi_dir, f) + if isfile(fpath) and f.endswith(extension): + demux_samples.add(f[:-extension_len]) + else: + all_missing_files.add(f[:-extension_len]) + # at this stage we have created/reviewed all the files and have + # all the sample names, however, we are not sure if we are dealing + # with just forwards or if we are dealing with also reverse. The + # easiest way to do this is to review the all_missing_files + missing_files = all_missing_files - demux_samples + if missing_files != all_missing_files: + self.per_sample_FASTQ_reverse = True + + missing_samples = set( + self.samples.keys()).difference(demux_samples) + + if missing_samples: + for ms in missing_samples: + del (self.samples[ms]) + del (self.samples_prep[ms]) + del (self.sample_demux_fps[ms]) + + if not demux_samples: + error_msg = ("All samples were removed from the submission " + "because the demux file is empty or the sample names " + "do not match.") + LogEntry.create('Runtime', error_msg) + raise EBISubmissionError(error_msg) + + return demux_samples