--- a +++ b/qiita_db/software.py @@ -0,0 +1,2060 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- + +from json import dumps, loads +from copy import deepcopy +import inspect +import warnings + +import networkx as nx + +from qiita_core.qiita_settings import qiita_config +import qiita_db as qdb + +from configparser import ConfigParser + + +class Command(qdb.base.QiitaObject): + r"""An executable command available in the system + + Attributes + ---------- + active + post_processing_cmd + analysis_only + default_parameter_sets + description + merging_scheme + name + naming_order + optional_parameters + outputs + parameters + required_parameters + software + description + cli + parameters_table + + Methods + ------- + _check_id + activate + + Class Methods + ------------- + create + exists + get_commands_by_input_type + get_html_generator + get_validator + + See Also + -------- + qiita_db.software.Software + """ + _table = "software_command" + + @classmethod + def get_commands_by_input_type(cls, artifact_types, active_only=True, + exclude_analysis=True, prep_type=None): + """Returns the commands that can process the given artifact types + + Parameters + ---------- + artifact_type : list of str + The artifact types + active_only : bool, optional + If True, return only active commands, otherwise return all commands + Default: True + exclude_analysis : bool, optional + If True, return commands that are not part of the analysis pipeline + + Returns + ------- + generator of qiita_db.software.Command + The commands that can process the given artifact tyoes + """ + with qdb.sql_connection.TRN: + sql = """SELECT DISTINCT command_id + FROM qiita.command_parameter + JOIN qiita.parameter_artifact_type + USING (command_parameter_id) + JOIN qiita.artifact_type USING (artifact_type_id) + JOIN qiita.software_command USING (command_id) + WHERE artifact_type IN %s""" + if active_only: + sql += " AND active = True" + if exclude_analysis: + sql += " AND is_analysis = False" + qdb.sql_connection.TRN.add(sql, [tuple(artifact_types)]) + cids = set(qdb.sql_connection.TRN.execute_fetchflatten()) + + if prep_type is not None: + dws = [w for w in qdb.software.DefaultWorkflow.iter() + if prep_type in w.data_type] + if dws: + cmds = {n.default_parameter.command.id + for w in dws for n in w.graph.nodes} + cids = cmds & cids + + return [cls(cid) for cid in cids] + + @classmethod + def get_html_generator(cls, artifact_type): + """Returns the command that generete the HTML for the given artifact + + Parameters + ---------- + artifact_type : str + The artifact type to search the HTML generator for + + Returns + ------- + qiita_db.software.Command + The newly created command + + Raises + ------ + qdb.exceptions.QiitaDBError when the generete the HTML command can't + be found + """ + with qdb.sql_connection.TRN: + sql = """SELECT command_id + FROM qiita.software_command + JOIN qiita.software_artifact_type USING (software_id) + JOIN qiita.artifact_type USING (artifact_type_id) + WHERE artifact_type = %s + AND name = 'Generate HTML summary' + AND active = true""" + qdb.sql_connection.TRN.add(sql, [artifact_type]) + try: + res = qdb.sql_connection.TRN.execute_fetchlast() + except IndexError: + raise qdb.exceptions.QiitaDBError( + "There is no command to generate the HTML summary for " + "artifact type '%s'" % artifact_type) + + return cls(res) + + @classmethod + def get_validator(cls, artifact_type): + """Returns the command that validates the given artifact + + Parameters + ---------- + artifact_type : str + The artifact type to search the Validate for + + Returns + ------- + qiita_db.software.Command + The newly created command + + Raises + ------ + qdb.exceptions.QiitaDBError when the Validate command can't be found + """ + with qdb.sql_connection.TRN: + sql = """SELECT command_id + FROM qiita.software_command + JOIN qiita.software_artifact_type USING (software_id) + JOIN qiita.artifact_type USING (artifact_type_id) + WHERE artifact_type = %s + AND name = 'Validate' + AND active = true""" + qdb.sql_connection.TRN.add(sql, [artifact_type]) + try: + res = qdb.sql_connection.TRN.execute_fetchlast() + except IndexError: + raise qdb.exceptions.QiitaDBError( + "There is no command to generate the Validate for " + "artifact type '%s'" % artifact_type) + + return cls(res) + + def _check_id(self, id_): + """Check that the provided ID actually exists in the database + + Parameters + ---------- + id_ : int + The ID to test + + Notes + ----- + This function overwrites the base function, as the sql layout doesn't + follow the same conventions done in the other classes. + """ + with qdb.sql_connection.TRN: + sql = """SELECT EXISTS( + SELECT * + FROM qiita.software_command + WHERE command_id = %s)""" + qdb.sql_connection.TRN.add(sql, [id_]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @classmethod + def exists(cls, software, name): + """Checks if the command already exists in the system + + Parameters + ---------- + qiita_db.software.Software + The software to which this command belongs to. + name : str + The name of the command + + Returns + ------- + bool + Whether the command exists in the system or not + """ + with qdb.sql_connection.TRN: + sql = """SELECT EXISTS(SELECT * + FROM qiita.software_command + WHERE software_id = %s + AND name = %s)""" + qdb.sql_connection.TRN.add(sql, [software.id, name]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @classmethod + def create(cls, software, name, description, parameters, outputs=None, + analysis_only=False): + r"""Creates a new command in the system + + The supported types for the parameters are: + - string: the parameter is a free text input + - integer: the parameter is an integer + - float: the parameter is a float + - artifact: the parameter is an artifact instance, the artifact id + will be stored + - reference: the parameter is a reference instance, the reference + id will be stored + - choice: the format of this should be `choice:<json-dump-of-list>` + in which json-dump-of-list is the JSON dump of a list containing + the acceptable values + + Parameters + ---------- + software : qiita_db.software.Software + The software to which this command belongs to. + name : str + The name of the command + description : str + The description of the command + parameters : dict + The description of the parameters that this command received. The + format is: {parameter_name: (parameter_type, default, name_order, + check_biom_merge, qiita_optional_parameter (optional))}, + where parameter_name, parameter_type and default are strings, + name_order is an optional integer value and check_biom_merge is + an optional boolean value. name_order is used to specify the order + of the parameter when automatically naming the artifacts. + check_biom_merge is used when merging artifacts in the analysis + pipeline. qiita_optional_parameter is an optional bool to "force" + the parameter to be optional + outputs : dict, optional + The description of the outputs that this command generated. The + format is either {output_name: artifact_type} or + {output_name: (artifact_type, check_biom_merge)} + analysis_only : bool, optional + If true, then the command will only be available on the analysis + pipeline. Default: False. + + Returns + ------- + qiita_db.software.Command + The newly created command + + Raises + ------ + QiitaDBError + - If parameters is empty + - If the parameters dictionary is malformed + - If one of the parameter types is not supported + - If the default value of a choice parameter is not listed in + the available choices + QiitaDBDuplicateError + - If the command already exists + + Notes + ----- + If the default value for a parameter is NULL, then the parameter will + be required. On the other hand, if it is provided, the parameter will + be optional and the default value will be used when the user doesn't + overwrite it. + """ + # Perform some sanity checks in the parameters dictionary + if not parameters: + raise qdb.exceptions.QiitaDBError( + "Error creating command %s. At least one parameter should " + "be provided." % name) + sql_param_values = [] + sql_artifact_params = [] + for pname, vals in parameters.items(): + qiita_optional_parameter = False + if 'qiita_optional_parameter' in vals: + qiita_optional_parameter = True + vals.remove('qiita_optional_parameter') + lenvals = len(vals) + if lenvals == 2: + ptype, dflt = vals + name_order = None + check_biom_merge = False + elif lenvals == 4: + ptype, dflt, name_order, check_biom_merge = vals + else: + raise qdb.exceptions.QiitaDBError( + "Malformed parameters dictionary, the format should be " + "either {param_name: [parameter_type, default]} or " + "{parameter_name: (parameter_type, default, name_order, " + "check_biom_merge)}. Found: %s for parameter name %s" + % (vals, pname)) + + # Check that the type is one of the supported types + supported_types = ['string', 'integer', 'float', 'reference', + 'boolean', 'prep_template', 'analysis'] + if ptype not in supported_types and not ptype.startswith( + ('choice', 'mchoice', 'artifact')): + supported_types.extend(['choice', 'mchoice', 'artifact']) + raise qdb.exceptions.QiitaDBError( + "Unsupported parameters type '%s' for parameter %s. " + "Supported types are: %s" + % (ptype, pname, ', '.join(supported_types))) + + if ptype.startswith(('choice', 'mchoice')) and dflt is not None: + choices = set(loads(ptype.split(':')[1])) + dflt_val = dflt + if ptype.startswith('choice'): + # In the choice case, the dflt value is a single string, + # create a list with it the string on it to use the + # issuperset call below + dflt_val = [dflt_val] + else: + # jsonize the list to store it in the DB + dflt = dumps(dflt) + if not choices.issuperset(dflt_val): + raise qdb.exceptions.QiitaDBError( + "The default value '%s' for the parameter %s is not " + "listed in the available choices: %s" + % (dflt, pname, ', '.join(choices))) + + if ptype.startswith('artifact'): + atypes = loads(ptype.split(':')[1]) + sql_artifact_params.append( + [pname, 'artifact', atypes]) + else: + # a parameter will be required (not optional) if + # qiita_optional_parameter is false and there is the default + # value (dflt) is None + required = not qiita_optional_parameter and dflt is None + sql_param_values.append([pname, ptype, required, dflt, + name_order, check_biom_merge]) + + with qdb.sql_connection.TRN: + if cls.exists(software, name): + raise qdb.exceptions.QiitaDBDuplicateError( + "command", "software: %d, name: %s" + % (software.id, name)) + # Add the command to the DB + sql = """INSERT INTO qiita.software_command + (name, software_id, description, is_analysis) + VALUES (%s, %s, %s, %s) + RETURNING command_id""" + sql_params = [name, software.id, description, analysis_only] + qdb.sql_connection.TRN.add(sql, sql_params) + c_id = qdb.sql_connection.TRN.execute_fetchlast() + + # Add the parameters to the DB + sql = """INSERT INTO qiita.command_parameter + (command_id, parameter_name, parameter_type, + required, default_value, name_order, check_biom_merge) + VALUES (%s, %s, %s, %s, %s, %s, %s) + RETURNING command_parameter_id""" + sql_params = [ + [c_id, pname, p_type, reqd, default, no, chm] + for pname, p_type, reqd, default, no, chm in sql_param_values] + qdb.sql_connection.TRN.add(sql, sql_params, many=True) + qdb.sql_connection.TRN.execute() + + # Add the artifact parameters + sql_type = """INSERT INTO qiita.parameter_artifact_type + (command_parameter_id, artifact_type_id) + VALUES (%s, %s)""" + supported_types = [] + for pname, p_type, atypes in sql_artifact_params: + sql_params = [c_id, pname, p_type, True, None, None, False] + qdb.sql_connection.TRN.add(sql, sql_params) + pid = qdb.sql_connection.TRN.execute_fetchlast() + sql_params = [ + [pid, qdb.util.convert_to_id(at, 'artifact_type')] + for at in atypes] + qdb.sql_connection.TRN.add(sql_type, sql_params, many=True) + supported_types.extend([atid for _, atid in sql_params]) + + # If the software type is 'artifact definition', there are a couple + # of extra steps + if software.type == 'artifact definition': + # If supported types is not empty, link the software with these + # types + if supported_types: + sql = """INSERT INTO qiita.software_artifact_type + (software_id, artifact_type_id) + VALUES (%s, %s)""" + sql_params = [[software.id, atid] + for atid in supported_types] + qdb.sql_connection.TRN.add(sql, sql_params, many=True) + # If this is the validate command, we need to add the + # provenance and name parameters. These are used internally, + # that's why we are adding them here + if name == 'Validate': + sql = """INSERT INTO qiita.command_parameter + (command_id, parameter_name, parameter_type, + required, default_value) + VALUES (%s, 'name', 'string', 'False', + 'dflt_name'), + (%s, 'provenance', 'string', 'False', NULL) + """ + qdb.sql_connection.TRN.add(sql, [c_id, c_id]) + + # Add the outputs to the command + if outputs: + sql_args = [] + for pname, at in outputs.items(): + if isinstance(at, tuple): + sql_args.append( + [pname, c_id, + qdb.util.convert_to_id(at[0], 'artifact_type'), + at[1]]) + else: + try: + at_id = qdb.util.convert_to_id(at, 'artifact_type') + except qdb.exceptions.QiitaDBLookupError: + msg = (f'Error creating {software.name}, {name}, ' + f'{description} - Unknown artifact_type: ' + f'{at}') + raise ValueError(msg) + sql_args.append([pname, c_id, at_id, False]) + + sql = """INSERT INTO qiita.command_output + (name, command_id, artifact_type_id, + check_biom_merge) + VALUES (%s, %s, %s, %s)""" + qdb.sql_connection.TRN.add(sql, sql_args, many=True) + qdb.sql_connection.TRN.execute() + + return cls(c_id) + + @property + def software(self): + """The software to which this command belongs to + + Returns + ------- + qiita_db.software.Software + the software to which this command belongs to + """ + with qdb.sql_connection.TRN: + sql = """SELECT software_id + FROM qiita.software_command + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return Software(qdb.sql_connection.TRN.execute_fetchlast()) + + @property + def name(self): + """The name of the command + + Returns + ------- + str + The name of the command + """ + with qdb.sql_connection.TRN: + sql = """SELECT name + FROM qiita.software_command + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def post_processing_cmd(self): + """Additional processing commands required for merging + + Returns + ------- + str + Returns the additional processing command for merging + """ + with qdb.sql_connection.TRN: + sql = """SELECT post_processing_cmd + FROM qiita.software_command + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + + cmd = qdb.sql_connection.TRN.execute_fetchlast() + if cmd: + # assume correctly formatted json data + # load data into dictionary; don't return JSON + return loads(qdb.sql_connection.TRN.execute_fetchlast()) + + return None + + @property + def description(self): + """The description of the command + + Returns + ------- + str + The description of the command + """ + with qdb.sql_connection.TRN: + sql = """SELECT description + FROM qiita.software_command + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def parameters(self): + """Returns the parameters that the command accepts + + Returns + ------- + dict + Dictionary of {parameter_name: [ptype, dflt]} + """ + with qdb.sql_connection.TRN: + sql = """SELECT parameter_name, parameter_type, default_value + FROM qiita.command_parameter + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + res = qdb.sql_connection.TRN.execute_fetchindex() + return {pname: [ptype, dflt] for pname, ptype, dflt in res} + + @property + def required_parameters(self): + """Returns the required parameters that the command accepts + + Returns + ------- + dict + Dictionary of {parameter_name: ptype} + """ + with qdb.sql_connection.TRN: + sql = """SELECT command_parameter_id, parameter_name, + parameter_type, array_agg( + artifact_type ORDER BY artifact_type) AS + artifact_type + FROM qiita.command_parameter + LEFT JOIN qiita.parameter_artifact_type + USING (command_parameter_id) + LEFT JOIN qiita.artifact_type USING (artifact_type_id) + WHERE command_id = %s AND required = True + GROUP BY command_parameter_id""" + qdb.sql_connection.TRN.add(sql, [self.id]) + res = qdb.sql_connection.TRN.execute_fetchindex() + return {pname: (ptype, atype) for _, pname, ptype, atype in res} + + @property + def optional_parameters(self): + """Returns the optional parameters that the command accepts + + Returns + ------- + dict + Dictionary of {parameter_name: [ptype, default]} + """ + with qdb.sql_connection.TRN: + sql = """SELECT parameter_name, parameter_type, default_value + FROM qiita.command_parameter + WHERE command_id = %s AND required = false""" + qdb.sql_connection.TRN.add(sql, [self.id]) + res = qdb.sql_connection.TRN.execute_fetchindex() + + # Define a function to load the json storing the default parameters + # if ptype is multiple choice. When I added it to the for loop as + # a one liner if, made the code a bit hard to read + def dflt_fmt(dflt, ptype): + if ptype.startswith('mchoice'): + return loads(dflt) + return dflt + + return {pname: [ptype, dflt_fmt(dflt, ptype)] + for pname, ptype, dflt in res} + + @property + def default_parameter_sets(self): + """Returns the list of default parameter sets + + Returns + ------- + generator + generator of qiita_db.software.DefaultParameters + """ + with qdb.sql_connection.TRN: + sql = """SELECT default_parameter_set_id + FROM qiita.default_parameter_set + WHERE command_id = %s + ORDER BY default_parameter_set_id""" + qdb.sql_connection.TRN.add(sql, [self.id]) + res = qdb.sql_connection.TRN.execute_fetchflatten() + for pid in res: + yield DefaultParameters(pid) + + @property + def outputs(self): + """Returns the list of output artifact types + + Returns + ------- + list of str + The output artifact types + """ + with qdb.sql_connection.TRN: + sql = """SELECT name, artifact_type + FROM qiita.command_output + JOIN qiita.artifact_type USING (artifact_type_id) + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchindex() + + @property + def active(self): + """Returns if the command is active or not + + Returns + ------- + bool + Whether the command is active or not + + Notes + ----- + This method differentiates between commands based on analysis_only or + the software type. The commands that are not for analysis (processing) + and are from an artifact definition software will return as active + if they have the same name than a command that is active; this helps + for situations where the processing plugins are updated but some + commands didn't change its version. + """ + with qdb.sql_connection.TRN: + cmd_type = self.software.type + if self.analysis_only or cmd_type == 'artifact definition': + sql = """SELECT active + FROM qiita.software_command + WHERE command_id = %s""" + else: + sql = """SELECT EXISTS ( + SELECT active FROM qiita.software_command + WHERE name IN ( + SELECT name FROM qiita.software_command + WHERE command_id = %s) AND active = true)""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + def activate(self): + """Activates the command""" + sql = """UPDATE qiita.software_command + SET active = %s + WHERE command_id = %s""" + qdb.sql_connection.perform_as_transaction(sql, [True, self.id]) + + @property + def analysis_only(self): + """Returns if the command is an analysis-only command + + Returns + ------- + bool + Whether the command is analysis only or not + """ + with qdb.sql_connection.TRN: + sql = """SELECT is_analysis + FROM qiita.software_command + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def naming_order(self): + """The ordered list of parameters to use to name the output artifacts + + Returns + ------- + list of str + """ + with qdb.sql_connection.TRN: + sql = """SELECT parameter_name + FROM qiita.command_parameter + WHERE command_id = %s AND name_order IS NOT NULL + ORDER BY name_order""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchflatten() + + @property + def merging_scheme(self): + """The values to check when merging the output result + + Returns + ------- + dict of {'parameters': [list of str], + 'outputs': [list of str] + 'ignore_parent_command': bool} + """ + with qdb.sql_connection.TRN: + sql = """SELECT parameter_name + FROM qiita.command_parameter + WHERE command_id = %s AND check_biom_merge = TRUE + ORDER BY parameter_name""" + qdb.sql_connection.TRN.add(sql, [self.id]) + params = qdb.sql_connection.TRN.execute_fetchflatten() + sql = """SELECT name + FROM qiita.command_output + WHERE command_id = %s AND check_biom_merge = TRUE + ORDER BY name""" + qdb.sql_connection.TRN.add(sql, [self.id]) + outputs = qdb.sql_connection.TRN.execute_fetchflatten() + + sql = """SELECT ignore_parent_command + FROM qiita.software_command + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + ipc = qdb.sql_connection.TRN.execute_fetchlast() + + return {'parameters': params, + 'outputs': outputs, + 'ignore_parent_command': ipc} + + @property + def resource_allocation(self): + """The resource allocation defined in the database for this command + + Returns + ------- + str + """ + + with qdb.sql_connection.TRN: + sql = """SELECT allocation FROM + qiita.processing_job_resource_allocation + WHERE name = %s and + job_type = 'RESOURCE_PARAMS_COMMAND'""" + qdb.sql_connection.TRN.add(sql, [self.name]) + + result = qdb.sql_connection.TRN.execute_fetchflatten() + + # if no matches for both type and name were found, query the + # 'default' value for the type + + if not result: + sql = """SELECT allocation FROM + qiita.processing_job_resource_allocation WHERE + name = %s and job_type = 'RESOURCE_PARAMS_COMMAND'""" + qdb.sql_connection.TRN.add(sql, ['default']) + + result = qdb.sql_connection.TRN.execute_fetchflatten() + if not result: + raise ValueError("Could not match '%s' to a resource " + "allocation!" % self.name) + + return result[0] + + @property + def processing_jobs(self): + """All the processing_jobs that used this command + + Returns + ------- + list of qiita_db.processing_job.ProcessingJob + List of jobs that used this command. + """ + + with qdb.sql_connection.TRN: + sql = """SELECT processing_job_id FROM + qiita.processing_job + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + + jids = qdb.sql_connection.TRN.execute_fetchflatten() + + return [qdb.processing_job.ProcessingJob(j) for j in jids] + + +class Software(qdb.base.QiitaObject): + r"""A software package available in the system + + Attributes + ---------- + name + version + description + commands + publications + environment_name + start_script + + Methods + ------- + add_publications + create + + See Also + -------- + qiita_db.software.Command + """ + _table = "software" + + @classmethod + def iter(cls, active=True): + """Iterates over all active software + + Parameters + ---------- + active : bool, optional + If True will only return active software + + Returns + ------- + list of qiita_db.software.Software + The software objects + """ + sql = """SELECT software_id + FROM qiita.software {0} + ORDER BY software_id""".format( + 'WHERE active = True' if active else '') + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + for s_id in qdb.sql_connection.TRN.execute_fetchflatten(): + yield cls(s_id) + + @classmethod + def deactivate_all(cls): + """Deactivates all the plugins in the system""" + with qdb.sql_connection.TRN: + sql = "UPDATE qiita.software SET active = False" + qdb.sql_connection.TRN.add(sql) + sql = "UPDATE qiita.software_command SET active = False" + qdb.sql_connection.TRN.add(sql) + qdb.sql_connection.TRN.execute() + + @classmethod + def from_file(cls, fp, update=False): + """Installs/updates a plugin from a plugin configuration file + + Parameters + ---------- + fp : str + Path to the plugin configuration file + update : bool, optional + If true, update the values in the database with the current values + in the config file. Otherwise, use stored values and warn if config + file contents and database contents do not match + + Returns + ------- + qiita_db.software.Software + The software object for the contents of `fp` + + Raises + ------ + qiita_db.exceptions.QiitaDBOperationNotPermittedError + If the plugin type in the DB and in the config file doesn't match + If the (client_id, client_secret) pair in the DB and in the config + file doesn't match + """ + config = ConfigParser() + with open(fp, newline=None) as conf_file: + config.read_file(conf_file) + + name = config.get('main', 'NAME') + version = config.get('main', 'VERSION') + description = config.get('main', 'DESCRIPTION') + env_script = config.get('main', 'ENVIRONMENT_SCRIPT') + start_script = config.get('main', 'START_SCRIPT') + software_type = config.get('main', 'PLUGIN_TYPE') + publications = config.get('main', 'PUBLICATIONS') + publications = loads(publications) if publications else [] + client_id = config.get('oauth2', 'CLIENT_ID') + client_secret = config.get('oauth2', 'CLIENT_SECRET') + + if cls.exists(name, version): + # This plugin already exists, check that all the values are the + # same and return the existing plugin + with qdb.sql_connection.TRN: + sql = """SELECT software_id + FROM qiita.software + WHERE name = %s AND version = %s""" + qdb.sql_connection.TRN.add(sql, [name, version]) + instance = cls(qdb.sql_connection.TRN.execute_fetchlast()) + + warning_values = [] + sql_update = """UPDATE qiita.software + SET {0} = %s + WHERE software_id = %s""" + + values = [description, env_script, start_script] + attrs = ['description', 'environment_script', 'start_script'] + for value, attr in zip(values, attrs): + if value != instance.__getattribute__(attr): + if update: + qdb.sql_connection.TRN.add( + sql_update.format(attr), [value, instance.id]) + else: + warning_values.append(attr) + + # Having a different plugin type should be an error, + # independently if the user is trying to update plugins or not + if software_type != instance.type: + raise qdb.exceptions.QiitaDBOperationNotPermittedError( + 'The plugin type of the plugin "%s" version %s does ' + 'not match the one in the system' % (name, version)) + + if publications != instance.publications: + if update: + instance.add_publications(publications) + else: + warning_values.append('publications') + + if (client_id != instance.client_id or + client_secret != instance.client_secret): + if update: + sql = """INSERT INTO qiita.oauth_identifiers + (client_id, client_secret) + SELECT %s, %s + WHERE NOT EXISTS(SELECT * + FROM qiita.oauth_identifiers + WHERE client_id = %s + AND client_secret = %s)""" + qdb.sql_connection.TRN.add( + sql, [client_id, client_secret, + client_id, client_secret]) + sql = """UPDATE qiita.oauth_software + SET client_id = %s + WHERE software_id = %s""" + qdb.sql_connection.TRN.add( + sql, [client_id, instance.id]) + else: + raise qdb.exceptions.QiitaDBOperationNotPermittedError( + 'The (client_id, client_secret) pair of the ' + 'plugin "%s" version "%s" does not match the one ' + 'in the system' % (name, version)) + + if warning_values: + warnings.warn( + 'Plugin "%s" version "%s" config file does not match ' + 'with stored information. Check the config file or ' + 'run "qiita plugin update" to update the plugin ' + 'information. Offending values: %s' + % (name, version, ", ".join(sorted(warning_values))), + qdb.exceptions.QiitaDBWarning) + qdb.sql_connection.TRN.execute() + else: + # This is a new plugin, create it + instance = cls.create( + name, version, description, env_script, start_script, + software_type, publications=publications, client_id=client_id, + client_secret=client_secret) + + return instance + + @classmethod + def exists(cls, name, version): + """Returns whether the plugin (name, version) already exists + + Parameters + ---------- + name : str + The name of the plugin + version : str + The version of the plugin + """ + with qdb.sql_connection.TRN: + sql = """SELECT EXISTS( + SELECT * FROM qiita.software + WHERE name = %s AND version = %s)""" + qdb.sql_connection.TRN.add(sql, [name, version]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @classmethod + def create(cls, name, version, description, environment_script, + start_script, software_type, publications=None, + client_id=None, client_secret=None): + r"""Creates a new software in the system + + Parameters + ---------- + name : str + The name of the software + version : str + The version of the software + description : str + The description of the software + environment_script : str + The script used to start the environment in which the plugin runs + start_script : str + The script used to start the plugin + software_type : str + The type of the software + publications : list of (str, str), optional + A list with the (DOI, pubmed_id) of the publications attached to + the software + client_id : str, optional + The client_id of the software. Default: randomly generated + client_secret : str, optional + The client_secret of the software. Default: randomly generated + + Raises + ------ + qiita_db.exceptions.QiitaDBError + If one of client_id or client_secret is provided but not both + """ + with qdb.sql_connection.TRN: + sql = """INSERT INTO qiita.software + (name, version, description, environment_script, + start_script, software_type_id) + VALUES (%s, %s, %s, %s, %s, %s) + RETURNING software_id""" + type_id = qdb.util.convert_to_id(software_type, "software_type") + sql_params = [name, version, description, environment_script, + start_script, type_id] + qdb.sql_connection.TRN.add(sql, sql_params) + s_id = qdb.sql_connection.TRN.execute_fetchlast() + + instance = cls(s_id) + + if publications: + instance.add_publications(publications) + + id_is_none = client_id is None + secret_is_none = client_secret is None + + if id_is_none and secret_is_none: + # Both are none, generate new ones + client_id = qdb.util.create_rand_string(50, punct=False) + client_secret = qdb.util.create_rand_string(255, punct=False) + elif id_is_none ^ secret_is_none: + # One has been provided but not the other, raise an error + raise qdb.exceptions.QiitaDBError( + 'Plugin "%s" version "%s" cannot be created, please ' + 'provide both client_id and client_secret or none of them' + % (name, version)) + + # At this point both client_id and client_secret are defined + sql = """INSERT INTO qiita.oauth_identifiers + (client_id, client_secret) + SELECT %s, %s + WHERE NOT EXISTS(SELECT * + FROM qiita.oauth_identifiers + WHERE client_id = %s + AND client_secret = %s)""" + qdb.sql_connection.TRN.add( + sql, [client_id, client_secret, client_id, client_secret]) + sql = """INSERT INTO qiita.oauth_software (software_id, client_id) + VALUES (%s, %s)""" + qdb.sql_connection.TRN.add(sql, [s_id, client_id]) + + return instance + + @classmethod + def from_name_and_version(cls, name, version): + """Returns the software object with the given name and version + + Parameters + ---------- + name: str + The software name + version : str + The software version + + Returns + ------- + qiita_db.software.Software + The software with the given name and version + + Raises + ------ + qiita_db.exceptions.QiitaDBUnknownIDError + If no software with the given name and version exists + """ + with qdb.sql_connection.TRN: + sql = """SELECT software_id + FROM qiita.software + WHERE name = %s AND version = %s""" + qdb.sql_connection.TRN.add(sql, [name, version]) + res = qdb.sql_connection.TRN.execute_fetchindex() + if not res: + raise qdb.exceptions.QiitaDBUnknownIDError( + "%s %s" % (name, version), cls._table) + return cls(res[0][0]) + + @property + def name(self): + """The name of the software + + Returns + ------- + str + The name of the software + """ + with qdb.sql_connection.TRN: + sql = "SELECT name FROM qiita.software WHERE software_id = %s" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def version(self): + """The version of the software + + Returns + ------- + str + The version of the software + """ + with qdb.sql_connection.TRN: + sql = "SELECT version FROM qiita.software WHERE software_id = %s" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def description(self): + """The description of the software + + Returns + ------- + str + The software description + """ + with qdb.sql_connection.TRN: + sql = """SELECT description + FROM qiita.software + WHERE software_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def commands(self): + """The list of commands attached to this software + + Returns + ------- + list of qiita_db.software.Command + The commands attached to this software package + """ + with qdb.sql_connection.TRN: + sql = """SELECT command_id + FROM qiita.software_command + WHERE software_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return [Command(cid) + for cid in qdb.sql_connection.TRN.execute_fetchflatten()] + + def get_command(self, cmd_name): + """Returns the command with the given name in the software + + Parameters + ---------- + cmd_name: str + The command with the given name + + Returns + ------- + qiita_db.software.Command + The command with the given name in this software + """ + with qdb.sql_connection.TRN: + sql = """SELECT command_id + FROM qiita.software_command + WHERE software_id =%s AND name=%s""" + qdb.sql_connection.TRN.add(sql, [self.id, cmd_name]) + res = qdb.sql_connection.TRN.execute_fetchindex() + if not res: + raise qdb.exceptions.QiitaDBUnknownIDError( + cmd_name, "software_command") + return Command(res[0][0]) + + @property + def publications(self): + """The publications attached to the software + + Returns + ------- + list of (str, str) + The list of DOI and pubmed_id attached to the publication + """ + with qdb.sql_connection.TRN: + sql = """SELECT p.doi, p.pubmed_id + FROM qiita.publication p + JOIN qiita.software_publication sp + ON p.doi = sp.publication_doi + WHERE sp.software_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchindex() + + def add_publications(self, publications): + """Add publications to the software + + Parameters + ---------- + publications : list of 2-tuples of str + A list with the (DOI, pubmed_id) of the publications to be attached + to the software + + Notes + ----- + For more information about pubmed id, visit + https://www.nlm.nih.gov/bsd/disted/pubmedtutorial/020_830.html + """ + with qdb.sql_connection.TRN: + sql = """INSERT INTO qiita.publication (doi, pubmed_id) + SELECT %s, %s + WHERE NOT EXISTS(SELECT * + FROM qiita.publication + WHERE doi = %s)""" + args = [[doi, pid, doi] for doi, pid in publications] + qdb.sql_connection.TRN.add(sql, args, many=True) + + sql = """INSERT INTO qiita.software_publication + (software_id, publication_doi) + SELECT %s, %s + WHERE NOT EXISTS(SELECT * + FROM qiita.software_publication + WHERE software_id = %s AND + publication_doi = %s)""" + sql_params = [[self.id, doi, self.id, doi] + for doi, _ in publications] + qdb.sql_connection.TRN.add(sql, sql_params, many=True) + qdb.sql_connection.TRN.execute() + + @property + def environment_script(self): + """The script used to start the plugin environment + + Returns + ------- + str + The script used to start the environment + """ + with qdb.sql_connection.TRN: + sql = """SELECT environment_script + FROM qiita.software + WHERE software_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def start_script(self): + """The script used to start the plugin + + Returns + ------- + str + The plugin's start script + """ + with qdb.sql_connection.TRN: + sql = """SELECT start_script + FROM qiita.software + WHERE software_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def type(self): + """Returns the type of the software + + Returns + ------- + str + The type of the software + """ + with qdb.sql_connection.TRN: + sql = """SELECT software_type + FROM qiita.software_type + JOIN qiita.software USING (software_type_id) + WHERE software_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def deprecated(self): + """Returns if the software is deprecated or not + + Returns + ------- + bool + Whether the software is deprecated or not + """ + with qdb.sql_connection.TRN: + sql = """SELECT deprecated + FROM qiita.software + WHERE software_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @deprecated.setter + def deprecated(self, deprecate): + """Changes deprecated of the software + + Parameters + ---------- + deprecate : bool + New software deprecate value + """ + sql = """UPDATE qiita.software SET deprecated = %s + WHERE software_id = %s""" + qdb.sql_connection.perform_as_transaction(sql, [deprecate, self._id]) + + @property + def active(self): + """Returns if the software is active or not + + Returns + ------- + bool + Whether the software is active or not + """ + with qdb.sql_connection.TRN: + sql = "SELECT active FROM qiita.software WHERE software_id = %s" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + def activate(self): + """Activates the plugin""" + sql = """UPDATE qiita.software + SET active = %s + WHERE software_id = %s""" + qdb.sql_connection.perform_as_transaction(sql, [True, self.id]) + + @property + def client_id(self): + """Returns the client id of the plugin + + Returns + ------- + str + The client id of the software + """ + with qdb.sql_connection.TRN: + sql = """SELECT client_id + FROM qiita.oauth_software + WHERE software_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def client_secret(self): + """Returns the client secret of the plugin + + Returns + ------- + str + The client secrect of the plugin + """ + with qdb.sql_connection.TRN: + sql = """SELECT client_secret + FROM qiita.oauth_software + JOIN qiita.oauth_identifiers USING (client_id) + WHERE software_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + def register_commands(self): + """Registers the software commands""" + url = "%s%s" % (qiita_config.base_url, qiita_config.portal_dir) + cmd = '%s; %s "%s" "register" "ignored"' % ( + self.environment_script, self.start_script, url) + + # it can be assumed that any command beginning with 'source' + # is calling 'source', an internal command of 'bash' and hence + # should be executed from bash, instead of sh. + # TODO: confirm that exit_code propagates from bash to sh to + # rv. + if cmd.startswith('source'): + cmd = "bash -c '%s'" % cmd + + p_out, p_err, rv = qdb.processing_job._system_call(cmd) + + if rv != 0: + s = "cmd: %s\nexit status: %d\n" % (cmd, rv) + s += "stdout: %s\nstderr: %s\n" % (p_out, p_err) + + raise ValueError(s) + + +class DefaultParameters(qdb.base.QiitaObject): + """Models a default set of parameters of a command + + Attributes + ---------- + name + values + + Methods + ------- + exists + create + iter + to_str + to_file + + See Also + -------- + qiita_db.software.Command + """ + _table = 'default_parameter_set' + + @classmethod + def exists(cls, command, **kwargs): + r"""Check if a parameter set already exists + + Parameters + ---------- + command : qiita_db.software.Command + The command to which the parameter set belongs to + kwargs : dict of {str: str} + The parameters and their values + + Returns + ------- + bool + Whether if the parameter set exists in the given command + + Raises + ------ + qiita_db.exceptions.QiitaDBError + - If there are missing parameters for the given command + - If `kwargs` contains parameters not originally defined in the + command + """ + with qdb.sql_connection.TRN: + command_params = set(command.optional_parameters) + user_params = set(kwargs) + + missing_in_user = command_params - user_params + extra_in_user = user_params - command_params + + if missing_in_user or extra_in_user: + raise qdb.exceptions.QiitaDBError( + "The given set of parameters do not match the ones for " + "the command.\nMissing parameters: %s\n" + "Extra parameters: %s\n" + % (', '.join(missing_in_user), ', '.join(extra_in_user))) + + sql = """SELECT parameter_set + FROM qiita.default_parameter_set + WHERE command_id = %s""" + qdb.sql_connection.TRN.add(sql, [command.id]) + for p_set in qdb.sql_connection.TRN.execute_fetchflatten(): + if p_set == kwargs: + return True + + return False + + @classmethod + def create(cls, param_set_name, command, **kwargs): + r"""Create a new parameter set for the given command + + Parameters + ---------- + param_set_name: str + The name of the new parameter set + command : qiita_db.software.Command + The command to add the new parameter set + kwargs : dict + The parameters and their values + + Returns + ------- + qiita_db.software.Parameters + The new parameter set instance + + Raises + ------ + qiita_db.exceptions.QiitaDBError + - If there are missing parameters for the given command + - If there are extra parameters in `kwargs` than for the given + command + qdb.exceptions.QiitaDBDuplicateError + - If the parameter set already exists + """ + with qdb.sql_connection.TRN: + # setting to default values all parameters not in the user_params + cmd_params = command.optional_parameters + missing_in_user = {k: cmd_params[k][1] + for k in (set(cmd_params) - set(kwargs))} + if missing_in_user: + kwargs.update(missing_in_user) + + # If the columns in kwargs and command do not match, cls.exists + # will raise the error for us + if cls.exists(command, **kwargs): + raise qdb.exceptions.QiitaDBDuplicateError( + cls._table, "Values: %s" % kwargs) + + sql = """INSERT INTO qiita.default_parameter_set + (command_id, parameter_set_name, parameter_set) + VALUES (%s, %s, %s) + RETURNING default_parameter_set_id""" + sql_args = [command.id, param_set_name, dumps(kwargs)] + qdb.sql_connection.TRN.add(sql, sql_args) + + return cls(qdb.sql_connection.TRN.execute_fetchlast()) + + @property + def name(self): + """The name of the parameter set + + Returns + ------- + str + The name of the parameter set + """ + with qdb.sql_connection.TRN: + sql = """SELECT parameter_set_name + FROM qiita.default_parameter_set + WHERE default_parameter_set_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def values(self): + """The values of the parameter set + + Returns + ------- + dict of {str: object} + Dictionary with the parameters values keyed by parameter name + """ + with qdb.sql_connection.TRN: + sql = """SELECT parameter_set + FROM qiita.default_parameter_set + WHERE default_parameter_set_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def command(self): + """The command that this parameter set belongs to + + Returns + ------- + qiita_db.software.Command + The command that this parameter set belongs to + """ + with qdb.sql_connection.TRN: + sql = """SELECT command_id + FROM qiita.default_parameter_set + WHERE default_parameter_set_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return Command(qdb.sql_connection.TRN.execute_fetchlast()) + + +class Parameters(object): + """Represents an instance of parameters used to process an artifact + + Raises + ------ + qiita_db.exceptions.QiitaDBOperationNotPermittedError + If trying to instantiate this class directly. In order to instantiate + this class, the classmethods `load` or `from_default_params` should + be used. + """ + + def __eq__(self, other): + """Equality based on the parameter values and the command""" + if type(self) is not type(other): + return False + if self.command != other.command: + return False + if self.values != other.values: + return False + return True + + @classmethod + def load(cls, command, json_str=None, values_dict=None): + """Load the parameters set form a json str or from a dict of values + + Parameters + ---------- + command : qiita_db.software.Command + The command to which the parameter set belongs to + json_str : str, optional + The json string encoding the parameters + values_dict : dict of {str: object}, optional + The dictionary with the parameter values + + Returns + ------- + qiita_db.software.Parameters + The loaded parameter set + + Raises + ------ + qiita_db.exceptions.QiitaDBError + - If `json_str` and `values` are both provided + - If neither `json_str` or `values` are provided + - If `json_str` or `values` do not encode a parameter set of + the provided command. + + Notes + ----- + The parameters `json_str` and `values_dict` are mutually exclusive, + only one of them should be provided at a time. However, one of them + should always be provided. + """ + if json_str is None and values_dict is None: + raise qdb.exceptions.QiitaDBError( + "Either `json_str` or `values_dict` should be provided.") + elif json_str is not None and values_dict is not None: + raise qdb.exceptions.QiitaDBError( + "Either `json_str` or `values_dict` should be provided, " + "but not both") + elif json_str is not None: + parameters = loads(json_str) + error_msg = ("The provided JSON string doesn't encode a " + "parameter set for command %s" % command.id) + else: + if not isinstance(values_dict, dict): + raise qdb.exceptions.QiitaDBError( + "The provided value_dict is %s (i.e. not None) but also " + "not a dictionary for command %s" % ( + values_dict, command.id)) + parameters = deepcopy(values_dict) + error_msg = ("The provided values dictionary doesn't encode a " + "parameter set for command %s" % command.id) + + # setting to default values all parameters not in the user_params + cmd_params = command.optional_parameters + missing_in_user = {k: cmd_params[k][1] + for k in (set(cmd_params) - set(parameters))} + if missing_in_user: + parameters.update(missing_in_user) + + with qdb.sql_connection.TRN: + cmd_reqd_params = command.required_parameters + cmd_opt_params = command.optional_parameters + + values = {} + for key in cmd_reqd_params: + try: + values[key] = parameters.pop(key) + except KeyError: + raise qdb.exceptions.QiitaDBError( + "%s. Missing required parameter: %s" + % (error_msg, key)) + + for key in cmd_opt_params: + try: + values[key] = parameters.pop(key) + except KeyError: + raise qdb.exceptions.QiitaDBError( + "%s. Missing optional parameter: %s" + % (error_msg, key)) + + if parameters: + raise qdb.exceptions.QiitaDBError( + "%s. Extra parameters: %s" + % (error_msg, ', '.join(parameters.keys()))) + + return cls(values, command) + + @classmethod + def from_default_params(cls, dflt_params, req_params, opt_params=None): + """Creates the parameter set from a `dflt_params` set + + Parameters + ---------- + dflt_params : qiita_db.software.DefaultParameters + The DefaultParameters object in which this instance is based on + req_params : dict of {str: object} + The required parameters values, keyed by parameter name + opt_params : dict of {str: object}, optional + The optional parameters to change from the default set, keyed by + parameter name. Default: None, use the values in `dflt_params` + + Raises + ------ + QiitaDBError + - If there are missing requried parameters + - If there is an unknown required ot optional parameter + """ + with qdb.sql_connection.TRN: + command = dflt_params.command + cmd_req_params = command.required_parameters + cmd_opt_params = command.optional_parameters + + missing_reqd = set(cmd_req_params) - set(req_params) + extra_reqd = set(req_params) - set(cmd_req_params) + if missing_reqd or extra_reqd: + raise qdb.exceptions.QiitaDBError( + "Provided required parameters not expected.\n" + "Missing required parameters: %s\n" + "Extra required parameters: %s\n" + % (', '.join(missing_reqd), ', '.join(extra_reqd))) + + if opt_params: + extra_opts = set(opt_params) - set(cmd_opt_params) + if extra_opts: + raise qdb.exceptions.QiitaDBError( + "Extra optional parameters provded: %s" + % ', '.join(extra_opts)) + + values = dflt_params.values + values.update(req_params) + + if opt_params: + values.update(opt_params) + + return cls(values, command) + + def __init__(self, values, command): + # Time for some python magic! The __init__ function should not be used + # outside of this module, users should always be using one of the above + # classmethods to instantiate the object. Lets test that it is the case + # First, we are going to get the current frame (i.e. this __init__) + # function and the caller to the __init__ + current_frame = inspect.currentframe() + caller_frame = current_frame.f_back + # The file names where the function is defined is stored in the + # f_code.co_filename attribute, and in this case it has to be the same + # for both of them. Also, we are restricing that the name of the caller + # should be either `load` or `from_default_params`, which are the two + # classmethods defined above + current_file = current_frame.f_code.co_filename + caller_file = caller_frame.f_code.co_filename + caller_name = caller_frame.f_code.co_name + if current_file != caller_file or \ + caller_name not in ['load', 'from_default_params']: + raise qdb.exceptions.QiitaDBOperationNotPermittedError( + "qiita_db.software.Parameters can't be instantiated directly. " + "Please use one of the classmethods: `load` or " + "`from_default_params`") + + self._values = values + self._command = command + + @property + def command(self): + """The command to which this parameter set belongs to + + Returns + ------- + qiita_db.software.Command + The command to which this parameter set belongs to + """ + return self._command + + @property + def values(self): + """The values of the parameters + + Returns + ------- + dict of {str: object} + The parameter values keyed by parameter name + """ + return self._values + + def dump(self): + """Return the values in the parameter as JSON + + Returns + ------- + str + The parameter values as a JSON string + """ + return dumps(self._values, sort_keys=True) + + +class DefaultWorkflowNode(qdb.base.QiitaObject): + r"""Represents a node in a default software workflow + + Attributes + ---------- + command + parameters + """ + _table = "default_workflow_node" + + @property + def default_parameter(self): + """The default parameter set to use in this node + + Returns + ------- + qiita_db.software.DefaultParameters + """ + with qdb.sql_connection.TRN: + sql = """SELECT default_parameter_set_id + FROM qiita.default_workflow_node + WHERE default_workflow_node_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + params_id = qdb.sql_connection.TRN.execute_fetchlast() + return qdb.software.DefaultParameters(params_id) + + +class DefaultWorkflowEdge(qdb.base.QiitaObject): + r"""Represents an edge in a default software workflow + + Attributes + ---------- + connections + """ + _table = "default_workflow_edge" + + @property + def connections(self): + """Retrieve how the commands are connected using this edge + + Returns + ------- + list of [str, str] + The list of pairs of output parameter name and input parameter name + used to connect the output of the source command to the input of + the destination command. + """ + with qdb.sql_connection.TRN: + sql = """SELECT name, parameter_name, artifact_type + FROM qiita.default_workflow_edge_connections c + JOIN qiita.command_output o + ON c.parent_output_id = o.command_output_id + JOIN qiita.command_parameter p + ON c.child_input_id = p.command_parameter_id + LEFT JOIN qiita.artifact_type USING (artifact_type_id) + WHERE default_workflow_edge_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchindex() + + +class DefaultWorkflow(qdb.base.QiitaObject): + r"""Represents a software's default workflow + + A default workflow is defined by a Directed Acyclic Graph (DAG) in which + the nodes represent the commands to be executed with the default parameter + set to use and the edges represent the command precedence, including + which outputs of the source command are provided as input to the + destination command. + """ + _table = "default_workflow" + + @classmethod + def iter(cls, active=True): + """Iterates over all active DefaultWorkflow + + Parameters + ---------- + active : bool, optional + If True will only return active software + + Returns + ------- + list of qiita_db.software.DefaultWorkflow + The DefaultWorkflow objects + """ + sql = """SELECT default_workflow_id + FROM qiita.default_workflow {0} + ORDER BY default_workflow_id""".format( + 'WHERE active = True' if active else '') + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + for s_id in qdb.sql_connection.TRN.execute_fetchflatten(): + yield cls(s_id) + + @property + def active(self): + """Retrieves active status of the default workflow + + Returns + ------- + active : bool + active value + """ + with qdb.sql_connection.TRN: + sql = """SELECT active + FROM qiita.default_workflow + WHERE default_workflow_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @active.setter + def active(self, active): + """Changes active status of the default workflow + + Parameters + ---------- + active : bool + New active value + """ + sql = """UPDATE qiita.default_workflow SET active = %s + WHERE default_workflow_id = %s""" + qdb.sql_connection.perform_as_transaction(sql, [active, self._id]) + + @property + def name(self): + with qdb.sql_connection.TRN: + sql = """SELECT name + FROM qiita.default_workflow + WHERE default_workflow_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @property + def description(self): + """Retrieves the description of the default workflow + + Returns + ------- + str + description value + """ + with qdb.sql_connection.TRN: + sql = """SELECT description + FROM qiita.default_workflow + WHERE default_workflow_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchlast() + + @description.setter + def description(self, description): + """Changes the description of the default workflow + + Parameters + ---------- + description : str + New description value + """ + sql = """UPDATE qiita.default_workflow SET description = %s + WHERE default_workflow_id = %s""" + qdb.sql_connection.perform_as_transaction(sql, [description, self._id]) + + @property + def data_type(self): + """Retrieves all the data_types accepted by the default workflow + + Returns + ---------- + list of str + The data types + """ + with qdb.sql_connection.TRN: + sql = """SELECT data_type + FROM qiita.default_workflow_data_type + LEFT JOIN qiita.data_type USING (data_type_id) + WHERE default_workflow_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchflatten() + + @property + def artifact_type(self): + """Retrieves artifact_type that the workflow can be applied to + + Returns + ---------- + str + The name of the artifact type this workflow can be applied to + """ + with qdb.sql_connection.TRN: + sql = """SELECT artifact_type + FROM qiita.artifact_type + LEFT JOIN qiita.default_workflow USING (artifact_type_id) + WHERE default_workflow_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchflatten()[0] + + @property + def graph(self): + """Returns the graph that represents the workflow + + Returns + ------- + networkx.DiGraph + The graph representing the default workflow. + """ + g = nx.DiGraph() + with qdb.sql_connection.TRN: + # Retrieve all graph workflow nodes + sql = """SELECT default_workflow_node_id + FROM qiita.default_workflow_node + WHERE default_workflow_id = %s + ORDER BY default_workflow_node_id""" + qdb.sql_connection.TRN.add(sql, [self.id]) + db_nodes = qdb.sql_connection.TRN.execute_fetchflatten() + + nodes = {n_id: DefaultWorkflowNode(n_id) for n_id in db_nodes} + + # Retrieve all graph edges + sql = """SELECT DISTINCT default_workflow_edge_id, parent_id, + child_id + FROM qiita.default_workflow_edge e + JOIN qiita.default_workflow_node n + ON e.parent_id = n.default_workflow_node_id + OR e.child_id = n.default_workflow_node_id + WHERE default_workflow_id = %s + ORDER BY default_workflow_edge_id""" + qdb.sql_connection.TRN.add(sql, [self.id]) + db_edges = qdb.sql_connection.TRN.execute_fetchindex() + + # let's track what nodes are actually being used so if they do not + # have an edge we still return them as part of the graph + used_nodes = nodes.copy() + for edge_id, p_id, c_id in db_edges: + e = DefaultWorkflowEdge(edge_id) + g.add_edge(nodes[p_id], nodes[c_id], connections=e) + if p_id in used_nodes: + del used_nodes[p_id] + if c_id in used_nodes: + del used_nodes[c_id] + # adding the missing nodes + for ms in used_nodes: + g.add_node(nodes[ms]) + + return g + + @property + def parameters(self): + """Retrieves the parameters that the workflow can be applied to + + Returns + ---------- + dict, dict + The dictionary of valid key: value pairs given by the sample or + the preparation info file + """ + with qdb.sql_connection.TRN: + sql = """SELECT parameters + FROM qiita.default_workflow + WHERE default_workflow_id = %s""" + qdb.sql_connection.TRN.add(sql, [self.id]) + return qdb.sql_connection.TRN.execute_fetchflatten()[0] + + @parameters.setter + def parameters(self, values): + """Sets the parameters that the workflow can be applied to + + Parameters + ---------- + dict : {'sample': dict, 'prep': dict} + dict of dict with the key: value pairs for the 'sample' and 'prep' + info files + + Raises + ------ + ValueError + if the passed parameter is not a properly formated dict + """ + if not isinstance(values, dict) or \ + set(values.keys()) != set(['prep', 'sample']): + raise ValueError("Improper format for values, should be " + "{'sample': dict, 'prep': dict} ") + with qdb.sql_connection.TRN: + sql = """UPDATE qiita.default_workflow + SET parameters = %s + WHERE default_workflow_id = %s""" + qdb.sql_connection.perform_as_transaction( + sql, [dumps(values), self._id])