--- a +++ b/qiita_db/analysis.py @@ -0,0 +1,1243 @@ +""" +Objects for dealing with Qiita analyses + +This module provides the implementation of the Analysis and Collection classes. + +Classes +------- +- `Analysis` -- A Qiita Analysis class +- `Collection` -- A Qiita Collection class for grouping multiple analyses +""" + +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- +from itertools import product +from os.path import join, exists +from os import mkdir +from collections import defaultdict + +from biom import load_table +from biom.util import biom_open +from biom.exception import DisjointIDError +from re import sub +import pandas as pd + +from qiita_core.exceptions import IncompetentQiitaDeveloperError +from qiita_core.qiita_settings import qiita_config +import qiita_db as qdb +from json import loads, dump + + +class Analysis(qdb.base.QiitaObject): + """ + Analysis object to access to the Qiita Analysis information + + Attributes + ---------- + owner + name + description + samples + data_types + artifacts + shared_with + jobs + pmid + + Methods + ------- + has_access + add_samples + remove_samples + share + unshare + build_files + summary_data + exists + create + delete + add_artifact + set_error + """ + + _table = "analysis" + _portal_table = "analysis_portal" + _analysis_id_column = 'analysis_id' + + @classmethod + def iter(cls): + """Iter over the analyses""" + with qdb.sql_connection.TRN: + sql = """SELECT DISTINCT analysis_id + FROM qiita.analysis + JOIN qiita.analysis_portal USING (analysis_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE portal = %s + ORDER BY analysis_id""" + qdb.sql_connection.TRN.add(sql, [qiita_config.portal]) + aids = qdb.sql_connection.TRN.execute_fetchflatten() + + for aid in aids: + yield cls(aid) + + @classmethod + def get_by_status(cls, status): + """Returns all Analyses with given status + + Parameters + ---------- + status : str + Status to search analyses for + + Returns + ------- + set of Analysis + All analyses in the database with the given status + """ + with qdb.sql_connection.TRN: + # Sandboxed analyses are the analyses that have not been started + # and hence they don't have an artifact yet + if status == 'sandbox': + sql = """SELECT DISTINCT analysis + FROM qiita.analysis + JOIN qiita.analysis_portal USING (analysis_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE portal = %s AND analysis_id NOT IN ( + SELECT analysis_id + FROM qiita.analysis_artifact)""" + qdb.sql_connection.TRN.add(sql, [qiita_config.portal]) + else: + sql = """SELECT DISTINCT analysis_id + FROM qiita.analysis_artifact + JOIN qiita.artifact USING (artifact_id) + JOIN qiita.visibility USING (visibility_id) + JOIN qiita.analysis_portal USING (analysis_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE visibility = %s AND portal = %s""" + qdb.sql_connection.TRN.add(sql, [status, qiita_config.portal]) + + return set( + cls(aid) + for aid in qdb.sql_connection.TRN.execute_fetchflatten()) + + @classmethod + def create(cls, owner, name, description, from_default=False, + merge_duplicated_sample_ids=False, categories=None, + reservation=None): + """Creates a new analysis on the database + + Parameters + ---------- + owner : User object + The analysis' owner + name : str + Name of the analysis + description : str + Description of the analysis + from_default : bool, optional + If True, use the default analysis to populate selected samples. + Default False. + merge_duplicated_sample_ids : bool, optional + If the duplicated sample ids in the selected studies should be + merged or prepended with the artifact ids. False (default) prepends + the artifact id + categories : list of str, optional + If not None, use _only_ these categories for the metaanalysis + reservation : str + The slurm reservation to asign to the analysis + + Returns + ------- + qdb.analysis.Analysis + The newly created analysis + """ + with qdb.sql_connection.TRN: + portal_id = qdb.util.convert_to_id( + qiita_config.portal, 'portal_type', 'portal') + + # Create the row in the analysis table + sql = """INSERT INTO qiita.{0} + (email, name, description) + VALUES (%s, %s, %s) + RETURNING analysis_id""".format(cls._table) + qdb.sql_connection.TRN.add( + sql, [owner.id, name, description]) + a_id = qdb.sql_connection.TRN.execute_fetchlast() + + if from_default: + # Move samples into that new analysis + dflt_id = owner.default_analysis.id + sql = """UPDATE qiita.analysis_sample + SET analysis_id = %s + WHERE analysis_id = %s""" + qdb.sql_connection.TRN.add(sql, [a_id, dflt_id]) + + # Add to both QIITA and given portal (if not QIITA) + sql = """INSERT INTO qiita.analysis_portal + (analysis_id, portal_type_id) + VALUES (%s, %s)""" + args = [[a_id, portal_id]] + + if qiita_config.portal != 'QIITA': + qp_id = qdb.util.convert_to_id( + 'QIITA', 'portal_type', 'portal') + args.append([a_id, qp_id]) + qdb.sql_connection.TRN.add(sql, args, many=True) + + instance = cls(a_id) + if reservation is not None: + instance.slurm_reservation = reservation + + # Once the analysis is created, we can create the mapping file and + # the initial set of artifacts + plugin = qdb.software.Software.from_name_and_version( + 'Qiita', 'alpha') + cmd = plugin.get_command('build_analysis_files') + params = qdb.software.Parameters.load( + cmd, values_dict={ + 'analysis': a_id, + 'merge_dup_sample_ids': merge_duplicated_sample_ids, + 'categories': categories}) + job = qdb.processing_job.ProcessingJob.create( + owner, params, True) + sql = """INSERT INTO qiita.analysis_processing_job + (analysis_id, processing_job_id) + VALUES (%s, %s)""" + qdb.sql_connection.TRN.add(sql, [a_id, job.id]) + qdb.sql_connection.TRN.execute() + + # Doing the submission outside of the transaction + job.submit() + return instance + + @classmethod + def delete_analysis_artifacts(cls, _id): + """Deletes the artifacts linked to an artifact and then the analysis + + Parameters + ---------- + _id : int + The analysis id + """ + analysis = cls(_id) + aids = [a.id for a in analysis.artifacts if not a.parents] + aids.sort(reverse=True) + for aid in aids: + qdb.artifact.Artifact.delete(aid) + cls.delete(analysis.id) + + @classmethod + def delete(cls, _id): + """Deletes an analysis + + Parameters + ---------- + _id : int + The analysis id + + Raises + ------ + QiitaDBUnknownIDError + If the analysis id doesn't exist + """ + with qdb.sql_connection.TRN: + # check if the analysis exist + if not cls.exists(_id): + raise qdb.exceptions.QiitaDBUnknownIDError(_id, "analysis") + + # Check if the analysis has any artifact + sql = """SELECT EXISTS(SELECT * + FROM qiita.analysis_artifact + WHERE analysis_id = %s)""" + qdb.sql_connection.TRN.add(sql, [_id]) + if qdb.sql_connection.TRN.execute_fetchlast(): + raise qdb.exceptions.QiitaDBOperationNotPermittedError( + "Can't delete analysis %d, has artifacts attached" + % _id) + + sql = "DELETE FROM qiita.analysis_filepath WHERE {0} = %s".format( + cls._analysis_id_column) + args = [_id] + qdb.sql_connection.TRN.add(sql, args) + + sql = "DELETE FROM qiita.analysis_portal WHERE {0} = %s".format( + cls._analysis_id_column) + qdb.sql_connection.TRN.add(sql, args) + + sql = "DELETE FROM qiita.analysis_sample WHERE {0} = %s".format( + cls._analysis_id_column) + qdb.sql_connection.TRN.add(sql, args) + + sql = """DELETE FROM qiita.analysis_processing_job + WHERE {0} = %s""".format(cls._analysis_id_column) + qdb.sql_connection.TRN.add(sql, args) + + # TODO: issue #1176 + + sql = """DELETE FROM qiita.{0} WHERE {1} = %s""".format( + cls._table, cls._analysis_id_column) + qdb.sql_connection.TRN.add(sql, args) + + qdb.sql_connection.TRN.execute() + + @classmethod + def exists(cls, analysis_id): + r"""Checks if the given analysis _id exists + + Parameters + ---------- + analysis_id : int + The id of the analysis we are searching for + + Returns + ------- + bool + True if exists, false otherwise. + """ + with qdb.sql_connection.TRN: + sql = """SELECT EXISTS( + SELECT * + FROM qiita.{0} + JOIN qiita.analysis_portal USING (analysis_id) + JOIN qiita.portal_type USING (portal_type_id) + WHERE {1}=%s + AND portal=%s)""".format(cls._table, + cls._analysis_id_column) + qdb.sql_connection.TRN.add(sql, [analysis_id, qiita_config.portal]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def owner(self): + """The owner of the analysis + + Returns + ------- + qiita_db.user.User + The owner of the Analysis + """ + with qdb.sql_connection.TRN: + sql = "SELECT email FROM qiita.{0} WHERE analysis_id = %s".format( + self._table) + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.user.User(qdb.sql_connection.TRN.execute_fetchlast()) + + @property + def name(self): + """The name of the analysis + + Returns + ------- + str + Name of the Analysis + """ + with qdb.sql_connection.TRN: + sql = "SELECT name FROM qiita.{0} WHERE analysis_id = %s".format( + self._table) + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def _portals(self): + """The portals used to create the analysis + + Returns + ------- + str + Name of the portal + """ + with qdb.sql_connection.TRN: + sql = """SELECT portal + FROM qiita.analysis_portal + JOIN qiita.portal_type USING (portal_type_id) + WHERE analysis_id = %s""" + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchflatten() + + @property + def timestamp(self): + """The timestamp of the analysis + + Returns + ------- + datetime + Timestamp of the Analysis + """ + with qdb.sql_connection.TRN: + sql = """SELECT timestamp FROM qiita.{0} + WHERE analysis_id = %s""".format(self._table) + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def description(self): + """Returns the description of the analysis""" + with qdb.sql_connection.TRN: + sql = """SELECT description FROM qiita.{0} + WHERE analysis_id = %s""".format(self._table) + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @description.setter + def description(self, description): + """Changes the description of the analysis + + Parameters + ---------- + description : str + New description for the analysis + + Raises + ------ + QiitaDBStatusError + Analysis is public + """ + sql = """UPDATE qiita.{0} SET description = %s + WHERE analysis_id = %s""".format(self._table) + qdb.sql_connection.perform_as_transaction(sql, [description, self._id]) + + @property + def samples(self): + """The artifact and samples attached to the analysis + + Returns + ------- + dict + Format is {artifact_id: [sample_id, sample_id, ...]} + """ + with qdb.sql_connection.TRN: + sql = """SELECT artifact_id, array_agg( + sample_id ORDER BY sample_id) + FROM qiita.analysis_sample + WHERE analysis_id = %s + GROUP BY artifact_id""" + qdb.sql_connection.TRN.add(sql, [self._id]) + return dict(qdb.sql_connection.TRN.execute_fetchindex()) + + @property + def data_types(self): + """Returns all data types used in the analysis + + Returns + ------- + list of str + Data types in the analysis + """ + with qdb.sql_connection.TRN: + sql = """SELECT DISTINCT data_type + FROM qiita.data_type + JOIN qiita.artifact USING (data_type_id) + JOIN qiita.analysis_sample USING (artifact_id) + WHERE analysis_id = %s + ORDER BY data_type""" + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchflatten() + + @property + def shared_with(self): + """The user the analysis is shared with + + Returns + ------- + list of int + User ids analysis is shared with + """ + with qdb.sql_connection.TRN: + sql = """SELECT email FROM qiita.analysis_users + WHERE analysis_id = %s""" + qdb.sql_connection.TRN.add(sql, [self._id]) + return [qdb.user.User(uid) + for uid in qdb.sql_connection.TRN.execute_fetchflatten()] + + @property + def artifacts(self): + with qdb.sql_connection.TRN: + sql = """SELECT artifact_id + FROM qiita.analysis_artifact + WHERE analysis_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return [qdb.artifact.Artifact(aid) + for aid in qdb.sql_connection.TRN.execute_fetchflatten()] + + @property + def mapping_file(self): + """Returns the mapping file for the analysis + + Returns + ------- + int or None + The filepath id of the analysis mapping file or None + if not generated + """ + fp = [x['fp_id'] for x in qdb.util.retrieve_filepaths( + "analysis_filepath", "analysis_id", self._id) + if x['fp_type'] == 'plain_text'] + + if fp: + # returning the actual filepath id vs. an array + return fp[0] + else: + return None + + @property + def metadata_categories(self): + """Returns all metadata categories in the current analyses based + on the available studies + + Returns + ------- + dict of dict + a dict with study_id as the key & the values are another dict with + 'sample' & 'prep' as keys and the metadata categories as values + """ + ST = qdb.metadata_template.sample_template.SampleTemplate + PT = qdb.metadata_template.prep_template.PrepTemplate + with qdb.sql_connection.TRN: + sql = """SELECT DISTINCT study_id, artifact_id + FROM qiita.analysis_sample + LEFT JOIN qiita.study_artifact USING (artifact_id) + WHERE analysis_id = %s""" + qdb.sql_connection.TRN.add(sql, [self._id]) + + metadata = defaultdict(dict) + for sid, aid in qdb.sql_connection.TRN.execute_fetchindex(): + if sid not in metadata: + metadata[sid]['sample'] = set(ST(sid).categories) + metadata[sid]['prep'] = set() + for pt in qdb.artifact.Artifact(aid).prep_templates: + metadata[sid]['prep'] = metadata[sid]['prep'] | set( + PT(pt.id).categories) + + return metadata + + @property + def tgz(self): + """Returns the tgz file of the analysis + + Returns + ------- + str or None + full filepath to the mapping file or None if not generated + """ + fp = [x['fp'] for x in qdb.util.retrieve_filepaths( + "analysis_filepath", "analysis_id", self._id) + if x['fp_type'] == 'tgz'] + + if fp: + # returning the actual path vs. an array + return fp[0] + else: + return None + + @property + def jobs(self): + """The jobs generating the initial artifacts for the analysis + + Returns + ------- + list of qiita_db.processing_job.Processing_job + Job ids for jobs in analysis. Empty list if no jobs attached. + """ + with qdb.sql_connection.TRN: + sql = """SELECT processing_job_id + FROM qiita.analysis_processing_job + WHERE analysis_id = %s""" + qdb.sql_connection.TRN.add(sql, [self._id]) + return [qdb.processing_job.ProcessingJob(jid) + for jid in qdb.sql_connection.TRN.execute_fetchflatten()] + + @property + def pmid(self): + """Returns pmid attached to the analysis + + Returns + ------- + str or None + returns the PMID or None if none is attached + """ + with qdb.sql_connection.TRN: + sql = "SELECT pmid FROM qiita.{0} WHERE analysis_id = %s".format( + self._table) + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @pmid.setter + def pmid(self, pmid): + """adds pmid to the analysis + + Parameters + ---------- + pmid: str + pmid to set for study + + Raises + ------ + QiitaDBStatusError + Analysis is public + + Notes + ----- + An analysis should only ever have one PMID attached to it. + """ + sql = """UPDATE qiita.{0} SET pmid = %s + WHERE analysis_id = %s""".format(self._table) + qdb.sql_connection.perform_as_transaction(sql, [pmid, self._id]) + + @property + def can_be_publicized(self): + """Returns whether the analysis can be made public + + Returns + ------- + bool + Whether the analysis can be publicized + list + A list of not public (private) artifacts + """ + # The analysis can be made public if all the artifacts used + # to get the samples from are public + with qdb.sql_connection.TRN: + non_public = [] + sql = """SELECT DISTINCT artifact_id + FROM qiita.analysis_sample + WHERE analysis_id = %s + ORDER BY artifact_id""" + qdb.sql_connection.TRN.add(sql, [self.id]) + for aid in qdb.sql_connection.TRN.execute_fetchflatten(): + if qdb.artifact.Artifact(aid).visibility != 'public': + non_public.append(aid) + + return (non_public == [], non_public) + + @property + def is_public(self): + """Returns if the analysis is public + + Returns + ------- + bool + If the analysis is public + """ + with qdb.sql_connection.TRN: + # getting all root artifacts / command_id IS NULL + sql = """SELECT DISTINCT visibility + FROM qiita.analysis_artifact + LEFT JOIN qiita.artifact USING (artifact_id) + LEFT JOIN qiita.visibility USING (visibility_id) + WHERE analysis_id = %s AND command_id IS NULL""" + qdb.sql_connection.TRN.add(sql, [self.id]) + visibilities = set(qdb.sql_connection.TRN.execute_fetchflatten()) + + return visibilities == {'public'} + + def make_public(self): + """Makes an analysis public + + Raises + ------ + ValueError + If can_be_publicized is not true + """ + with qdb.sql_connection.TRN: + can_be_publicized, non_public = self.can_be_publicized + if not can_be_publicized: + raise ValueError('Not all artifacts that generated this ' + 'analysis are public: %s' % ', '.join( + map(str, non_public))) + + # getting all root artifacts / command_id IS NULL + sql = """SELECT artifact_id + FROM qiita.analysis_artifact + LEFT JOIN qiita.artifact USING (artifact_id) + WHERE analysis_id = %s AND command_id IS NULL""" + qdb.sql_connection.TRN.add(sql, [self.id]) + aids = qdb.sql_connection.TRN.execute_fetchflatten() + for aid in aids: + qdb.artifact.Artifact(aid).visibility = 'public' + + def add_artifact(self, artifact): + """Adds an artifact to the analysis + + Parameters + ---------- + artifact : qiita_db.artifact.Artifact + The artifact to be added + """ + with qdb.sql_connection.TRN: + sql = """INSERT INTO qiita.analysis_artifact + (analysis_id, artifact_id) + SELECT %s, %s + WHERE NOT EXISTS(SELECT * + FROM qiita.analysis_artifact + WHERE analysis_id = %s + AND artifact_id = %s)""" + qdb.sql_connection.TRN.add(sql, [self.id, artifact.id, + self.id, artifact.id]) + + def set_error(self, error_msg): + """Sets the analysis error + + Parameters + ---------- + error_msg : str + The error message + """ + le = qdb.logger.LogEntry.create('Runtime', error_msg) + sql = """UPDATE qiita.analysis + SET logging_id = %s + WHERE analysis_id = %s""" + qdb.sql_connection.perform_as_transaction(sql, [le.id, self.id]) + + def has_access(self, user): + """Returns whether the given user has access to the analysis + + Parameters + ---------- + user : User object + User we are checking access for + + Returns + ------- + bool + Whether user has access to analysis or not + """ + with qdb.sql_connection.TRN: + # if admin or superuser, just return true + if user.level in {'superuser', 'admin'}: + return True + + return self in Analysis.get_by_status('public') | \ + user.private_analyses | user.shared_analyses + + def can_edit(self, user): + """Returns whether the given user can edit the analysis + + Parameters + ---------- + user : User object + User we are checking edit permissions for + + Returns + ------- + bool + Whether user can edit the study or not + """ + # The analysis is editable only if the user is the owner, is in the + # shared list or the user is an admin + return (user.level in {'superuser', 'admin'} or self.owner == user or + user in self.shared_with) + + def summary_data(self): + """Return number of studies, artifacts, and samples selected + + Returns + ------- + dict + counts keyed to their relevant type + """ + with qdb.sql_connection.TRN: + sql = """SELECT + COUNT(DISTINCT study_id) as studies, + COUNT(DISTINCT artifact_id) as artifacts, + COUNT(DISTINCT sample_id) as samples + FROM qiita.study_artifact + JOIN qiita.analysis_sample USING (artifact_id) + WHERE analysis_id = %s""" + qdb.sql_connection.TRN.add(sql, [self._id]) + return dict(qdb.sql_connection.TRN.execute_fetchindex()[0]) + + def share(self, user): + """Share the analysis with another user + + Parameters + ---------- + user: User object + The user to share the analysis with + """ + # Make sure the analysis is not already shared with the given user + if user.id == self.owner or user.id in self.shared_with: + return + + sql = """INSERT INTO qiita.analysis_users (analysis_id, email) + VALUES (%s, %s)""" + qdb.sql_connection.perform_as_transaction(sql, [self._id, user.id]) + + def unshare(self, user): + """Unshare the analysis with another user + + Parameters + ---------- + user: User object + The user to unshare the analysis with + """ + sql = """DELETE FROM qiita.analysis_users + WHERE analysis_id = %s AND email = %s""" + qdb.sql_connection.perform_as_transaction(sql, [self._id, user.id]) + + def _lock_samples(self): + """Only dflt analyses can have samples added/removed + + Raises + ------ + qiita_db.exceptions.QiitaDBOperationNotPermittedError + If the analysis is not a default analysis + """ + with qdb.sql_connection.TRN: + sql = "SELECT dflt FROM qiita.analysis WHERE analysis_id = %s" + qdb.sql_connection.TRN.add(sql, [self.id]) + if not qdb.sql_connection.TRN.execute_fetchlast(): + raise qdb.exceptions.QiitaDBOperationNotPermittedError( + "Can't add/remove samples from this analysis") + + def add_samples(self, samples): + """Adds samples to the analysis + + Parameters + ---------- + samples : dictionary of lists + samples and the artifact id they come from in form + {artifact_id: [sample1, sample2, ...], ...} + """ + with qdb.sql_connection.TRN: + self._lock_samples() + + for aid, samps in samples.items(): + # get previously selected samples for aid and filter them out + sql = """SELECT sample_id + FROM qiita.analysis_sample + WHERE artifact_id = %s AND analysis_id = %s""" + qdb.sql_connection.TRN.add(sql, [aid, self._id]) + prev_selected = qdb.sql_connection.TRN.execute_fetchflatten() + + select = set(samps).difference(prev_selected) + sql = """INSERT INTO qiita.analysis_sample + (analysis_id, artifact_id, sample_id) + VALUES (%s, %s, %s)""" + args = [[self._id, aid, s] for s in select] + qdb.sql_connection.TRN.add(sql, args, many=True) + qdb.sql_connection.TRN.execute() + + def remove_samples(self, artifacts=None, samples=None): + """Removes samples from the analysis + + Parameters + ---------- + artifacts : list, optional + Artifacts to remove, default None + samples : list, optional + sample ids to remove, default None + + Notes + ----- + - When only a list of samples given, the samples will be removed from + all artifacts it is associated with + - When only a list of artifacts is given, all samples associated with + that artifact are removed + - If both are passed, the given samples are removed from the given + artifacts + """ + with qdb.sql_connection.TRN: + self._lock_samples() + if artifacts and samples: + sql = """DELETE FROM qiita.analysis_sample + WHERE analysis_id = %s + AND artifact_id = %s + AND sample_id = %s""" + # Build the SQL arguments to remove the samples of the + # given artifacts. + args = [[self._id, a.id, s] + for a, s in product(artifacts, samples)] + elif artifacts: + sql = """DELETE FROM qiita.analysis_sample + WHERE analysis_id = %s AND artifact_id = %s""" + args = [[self._id, a.id] for a in artifacts] + elif samples: + sql = """DELETE FROM qiita.analysis_sample + WHERE analysis_id = %s AND sample_id = %s""" + args = [[self._id, s] for s in samples] + else: + raise IncompetentQiitaDeveloperError( + "Must provide list of samples and/or proc_data for " + "removal") + + qdb.sql_connection.TRN.add(sql, args, many=True) + qdb.sql_connection.TRN.execute() + + def build_files(self, merge_duplicated_sample_ids, categories=None): + """Builds biom and mapping files needed for analysis + + Parameters + ---------- + merge_duplicated_sample_ids : bool + If the duplicated sample ids in the selected studies should be + merged or prepended with the artifact ids. If false prepends + the artifact id + categories : set of str, optional + If not None, use _only_ these categories for the metaanalysis + + Notes + ----- + Creates biom tables for each requested data type + Creates mapping file for requested samples + """ + with qdb.sql_connection.TRN: + # in practice we could retrieve samples in each of the following + # calls but this will mean calling the DB multiple times and will + # make testing much harder as we will need to have analyses at + # different stages and possible errors. + samples = self.samples + # retrieving all info on artifacts to save SQL time + bioms_info = qdb.util.get_artifacts_information(samples.keys()) + + # figuring out if we are going to have duplicated samples, again + # doing it here cause it's computationally cheaper + # 1. merge samples per: data_type, reference used and + # the command id + # Note that grouped_samples is basically how many biom tables we + # are going to create + grouped_samples = {} + + # post_processing_cmds is a list of dictionaries, each describing + # an operation to be performed on the final merged BIOM. The order + # of operations will be list-order. Thus, in the case that + # multiple post_processing_cmds are implemented, ensure proper + # order before passing off to _build_biom_tables(). + post_processing_cmds = dict() + for aid, asamples in samples.items(): + # find the artifact info, [0] there should be only one info + ainfo = [bi for bi in bioms_info + if bi['artifact_id'] == aid][0] + data_type = ainfo['data_type'] + + # ainfo['algorithm'] is the original merging scheme + label = "%s || %s" % (data_type, ainfo['algorithm']) + if label not in grouped_samples: + aparams = qdb.artifact.Artifact(aid).processing_parameters + if aparams is not None: + cmd = aparams.command.post_processing_cmd + if cmd is not None: + # preserve label, in case it's needed. + merging_scheme = sub( + ', BIOM: [0-9a-zA-Z-.]+', '', + ainfo['algorithm']) + post_processing_cmds[ainfo['algorithm']] = ( + merging_scheme, cmd) + grouped_samples[label] = [] + grouped_samples[label].append((aid, asamples)) + + # We need to negate merge_duplicated_sample_ids because in + # _build_mapping_file is acually rename: merge yes == rename no + rename_dup_samples = not merge_duplicated_sample_ids + self._build_mapping_file( + samples, rename_dup_samples, categories=categories) + + if post_processing_cmds: + biom_files = self._build_biom_tables( + grouped_samples, + rename_dup_samples, + post_processing_cmds=post_processing_cmds) + else: + # preserve the legacy path + biom_files = self._build_biom_tables( + grouped_samples, + rename_dup_samples) + + # if post_processing_cmds exists, biom_files will be a triplet, + # instead of a pair; the final element in the tuple will be an + # file path to the new phylogenetic tree. + return biom_files + + def _build_biom_tables(self, + grouped_samples, + rename_dup_samples=False, + post_processing_cmds=None): + """Build tables and add them to the analysis""" + with qdb.sql_connection.TRN: + # creating per analysis output folder + _, base_fp = qdb.util.get_mountpoint(self._table)[0] + base_fp = join(base_fp, 'analysis_%d' % self.id) + if not exists(base_fp): + mkdir(base_fp) + + biom_files = [] + for label, tables in grouped_samples.items(): + + data_type, algorithm = [ + line.strip() for line in label.split('||')] + + new_table = None + artifact_ids = [] + for aid, samples in tables: + artifact = qdb.artifact.Artifact(aid) + artifact_ids.append(str(aid)) + + # the next loop is assuming that an artifact can have only + # one biom, which is a safe assumption until we generate + # artifacts from multiple bioms and even then we might + # only have one biom + biom_table_fp = None + for x in artifact.filepaths: + if x['fp_type'] == 'biom': + biom_table_fp = x['fp'] + break + if not biom_table_fp: + raise RuntimeError( + "Artifact %s does not have a biom table associated" + % aid) + + # loading the found biom table + biom_table = load_table(biom_table_fp) + # filtering samples to keep those selected by the user + biom_table_samples = set(biom_table.ids()) + selected_samples = biom_table_samples.intersection(samples) + biom_table.filter(selected_samples, axis='sample', + inplace=True) + if len(biom_table.ids()) == 0: + continue + + if rename_dup_samples: + ids_map = {_id: "%d.%s" % (aid, _id) + for _id in biom_table.ids()} + biom_table.update_ids(ids_map, 'sample', True, True) + + if new_table is None: + new_table = biom_table + else: + try: + new_table = new_table.concat([biom_table]) + except DisjointIDError: + new_table = new_table.merge(biom_table) + + if not new_table or len(new_table.ids()) == 0: + # if we get to this point the only reason for failure is + # rarefaction + raise RuntimeError("All samples filtered out from " + "analysis due to rarefaction level") + + # write out the file + # data_type and algorithm values become part of the file + # name(s). + info = "%s_%s" % ( + sub('[^0-9a-zA-Z]+', '', data_type), + sub('[^0-9a-zA-Z]+', '', algorithm)) + fn = "%d_analysis_%s.biom" % (self._id, info) + biom_fp = join(base_fp, fn) + # save final biom here + with biom_open(biom_fp, 'w') as f: + new_table.to_hdf5( + f, "Generated by Qiita, analysis id: %d, info: %s" % ( + self._id, label)) + + # let's add the regular biom without post processing + biom_files.append((data_type, biom_fp, None)) + + # post_processing_cmds can be None, default, or a dict of + # algorithm: merging_scheme, command + if (post_processing_cmds is not None and + algorithm in post_processing_cmds): + merging_scheme, pp_cmd = post_processing_cmds[algorithm] + # assuming all commands require archives, obtain + # archives once, instead of for every cmd. + features = load_table(biom_fp).ids(axis='observation') + features = list(features) + archives = qdb.archive.Archive.retrieve_feature_values( + archive_merging_scheme=merging_scheme, + features=features) + + # remove archives that SEPP could not match + archives = {f: loads(archives[f]) + for f, plc + in archives.items() + if plc != ''} + + # since biom_fp uses base_fp as its location, assume it's + # suitable for other files as well. + output_dir = join(base_fp, info) + if not exists(output_dir): + mkdir(output_dir) + + fp_archive = join(output_dir, + 'archive_%d.json' % (self._id)) + + with open(fp_archive, 'w') as out_file: + dump(archives, out_file) + + # assume archives file is passed as: + # --fp_archive=<path_to_archives_file> + # assume output dir is passed as: + # --output_dir=<path_to_output_dir> + # assume input biom file is passed as: + # --fp_biom=<path_to_biom_file> + + # concatenate any other parameters into a string + params = ' '.join(["%s=%s" % (k, v) for k, v in + pp_cmd['script_params'].items()]) + + # append archives file and output dir parameters + params = ("%s --fp_biom=%s --fp_archive=%s " + "--output_dir=%s" % ( + params, biom_fp, fp_archive, output_dir)) + + # if environment is successfully activated, + # run script with parameters + # script_env e.g.: 'deactivate; source activate qiita' + # script_path e.g.: + # python 'qiita_db/test/support_files/worker.py' + cmd = "%s %s %s" % ( + pp_cmd['script_env'], pp_cmd['script_path'], params) + p_out, p_err, rv = qdb.processing_job._system_call(cmd) + p_out = p_out.rstrip() + # based on the set of commands ran, we could get a + # rv !=0 but still have a successful return from the + # command, thus checking both rv and p_out. Note that + # p_out will return either an error message or + # the file path to the new tree, depending on p's + # return code. + if rv != 0: + raise ValueError('Error %d: %s' % (rv, p_err)) + p_out = loads(p_out) + + if p_out['archive'] is not None: + biom_files.append( + (data_type, p_out['biom'], p_out['archive'])) + + # return the biom files, either with or without needed tree, to + # the user. + return biom_files + + def _build_mapping_file(self, samples, rename_dup_samples=False, + categories=None): + """Builds the combined mapping file for all samples + Code modified slightly from qiime.util.MetadataMap.__add__""" + with qdb.sql_connection.TRN: + all_ids = set() + to_concat = [] + sample_infos = dict() + for aid, samps in samples.items(): + artifact = qdb.artifact.Artifact(aid) + si = artifact.study.sample_template + if si not in sample_infos: + si_df = si.to_dataframe() + if categories is not None: + si_df = si_df[list(set(categories) & + set(si_df.columns))] + sample_infos[si] = si_df + pt = artifact.prep_templates[0] + pt_df = pt.to_dataframe() + if categories is not None: + pt_df = pt_df[list(set(categories) & + set(pt_df.columns))] + + qm = pt_df.join(sample_infos[si], lsuffix="_prep") + + # if we are not going to merge the duplicated samples + # append the aid to the sample name + qm['qiita_artifact_id'] = aid + qm['qiita_prep_deprecated'] = pt.deprecated + if rename_dup_samples: + qm['original_SampleID'] = qm.index + qm['#SampleID'] = "%d." % aid + qm.index + samps = set(['%d.%s' % (aid, _id) for _id in samps]) + qm.set_index('#SampleID', inplace=True, drop=True) + else: + samps = set(samps) - all_ids + all_ids.update(samps) + + # appending study metadata to the analysis + study = qdb.artifact.Artifact(aid).study + study_owner = study.owner + study_info = study.info + pi = study_info['principal_investigator'] + qm['qiita_study_title'] = study.title + qm['qiita_study_alias'] = study.info['study_alias'] + qm['qiita_owner'] = study_owner.info['name'] + qm['qiita_principal_investigator'] = pi.name + + qm = qm.loc[list(samps)] + to_concat.append(qm) + + merged_map = pd.concat(to_concat) + + # Save the mapping file + _, base_fp = qdb.util.get_mountpoint(self._table)[0] + mapping_fp = join(base_fp, "%d_analysis_mapping.txt" % self._id) + merged_map.to_csv(mapping_fp, index_label='#SampleID', + na_rep='unknown', sep='\t', encoding='utf-8') + + self._add_file("%d_analysis_mapping.txt" % self._id, "plain_text") + + def _add_file(self, filename, filetype, data_type=None): + """adds analysis item to database + + Parameters + ---------- + filename : str + filename to add to analysis + filetype : {plain_text, biom} + data_type : str, optional + """ + with qdb.sql_connection.TRN: + filetype_id = qdb.util.convert_to_id(filetype, 'filepath_type') + _, mp = qdb.util.get_mountpoint('analysis')[0] + fpid = qdb.util.insert_filepaths([ + (join(mp, filename), filetype_id)], -1, 'analysis', + move_files=False)[0] + + col = "" + dtid = "" + if data_type: + col = ", data_type_id" + dtid = ", %d" % qdb.util.convert_to_id(data_type, "data_type") + + sql = """INSERT INTO qiita.analysis_filepath + (analysis_id, filepath_id{0}) + VALUES (%s, %s{1})""".format(col, dtid) + qdb.sql_connection.TRN.add(sql, [self._id, fpid]) + qdb.sql_connection.TRN.execute() + + def _slurm_reservation(self): + """Helper method for the slurm_reservation property""" + with qdb.sql_connection.TRN: + sql = """SELECT slurm_reservation + FROM qiita.{0} + WHERE analysis_id = %s""".format(self._table) + qdb.sql_connection.TRN.add(sql, [self._id]) + return qdb.sql_connection.TRN.execute_fetchflatten() + + @property + def slurm_reservation(self): + """Returns a valid reservation if it exists + + Returns + ------- + str or None + returns the slurm reservation or None + """ + slurm_reservation = self._slurm_reservation() + + if slurm_reservation and slurm_reservation[0] != '': + cmd = f"scontrol show reservations {slurm_reservation[0]}" + p_out, p_err, rv = qdb.processing_job._system_call(cmd) + if rv == 0 and p_out != 'No reservations in the system\n': + return slurm_reservation[0] + + return None + + @slurm_reservation.setter + def slurm_reservation(self, slurm_reservation): + """Changes the slurm reservation of the analysis + + Parameters + ---------- + slurm_reservation : str + New slurm_reservation for the analysis + """ + sql = """UPDATE qiita.{0} + SET slurm_reservation = %s + WHERE analysis_id = %s""".format(self._table) + qdb.sql_connection.perform_as_transaction( + sql, [slurm_reservation, self._id])