Switch to side-by-side view

--- a
+++ b/scripts/qiita-auto-processing
@@ -0,0 +1,328 @@
+#!/usr/bin/env python
+# -----------------------------------------------------------------------------
+# Copyright (c) 2014--, The Qiita Development Team.
+#
+# Distributed under the terms of the BSD 3-clause License.
+#
+# The full license is in the file LICENSE, distributed with this software.
+# -----------------------------------------------------------------------------
+from qiita_db.handlers.plugin import _get_plugin
+from qiita_db.study import Study
+from qiita_db.software import Parameters
+from qiita_db.user import User
+from qiita_db.processing_job import ProcessingWorkflow
+
+
+user = User('antoniog@ucsd.edu')
+
+# full_pipelines is a list of dict as: {
+#   'name': str,
+#   'data_type': list of str,
+#   'artifact_type': str,
+#   'previous-step': the command on the previous step
+#   'requirements': {'sample': {'column': name of the column,
+#                               'value': the expected unique values as list
+#                               'equal': the value should be equal (True)},
+#                    'prep': dict of column values}
+#   'steps': list of dict
+# }
+# that define the different pipelines that are being automated. Each 'step'
+# should have: {
+#   'previous-step': the command on the previous step
+#   'plugin': the name of the pluin we want to use,
+#   'version': the version of the plugin,
+#   'cmd_name': the command we want to run,
+#   'input_name': the name of the input parameter of that command
+#   'ignore_parameters': list of parameters to ignore, for example: threads
+#   'parent_artifact_name': name of the parent output, input for this command
+#   'parameters_names': list of the names of the parameter sets we want to run
+# }
+full_pipelines = [
+    {'name': 'Full WGS - Shogun',
+     'data_type': ['Metagenomic'],
+     'artifact_type': 'per_sample_FASTQ',
+     'previous-step': None,
+     'requirements': dict(),
+     'steps': [
+         {'previous-step': None,
+          'plugin': 'qp-meta',
+          'version': '2021.01',
+          'cmd_name': 'Atropos v1.1.24',
+          'input_name': 'input',
+          'ignore_parameters': ['Number of threads used'],
+          'parent_artifact_name': None,
+          'parameters_names': ['KAPA HyperPlus with iTru']},
+         {'previous-step': 'Atropos v1.1.24',
+          'plugin': 'qp-shogun',
+          'version': '072020',
+          'cmd_name': 'Shogun v1.0.8',
+          'input_name': 'input',
+          'ignore_parameters': ['Number of threads'],
+          'parent_artifact_name': 'Adapter trimmed files',
+          'parameters_names': ['wol_bowtie2', 'rep200_bowtie2']}
+      ]},
+    {'name': 'Target Gene Processing',
+     'data_type': ['16S', '18S', 'ITS'],
+     'artifact_type': 'Demultiplexed',
+     'previous-step': 'Split libraries FASTQ',
+     'requirements': {
+        'prep': [
+            {'column': 'platform', 'value': ['illumina'],
+             'equal': True},
+            {'column': 'run_prefix', 'value': ['cmi_workshop_lane1_s1_l001'],
+             'equal': False}]},
+     'steps': [
+        {'previous-step': None,
+         'plugin': 'QIIMEq2',
+         'version': '1.9.1',
+         'cmd_name': 'Trimming',
+         'input_name': 'input_data',
+         'ignore_parameters': [],
+         'parent_artifact_name': None,
+         'parameters_names': ['90 base pairs',
+                              '100 base pairs',
+                              '150 base pairs'
+                              ]},
+        {'previous-step': 'Trimming',
+         'plugin': 'QIIMEq2',
+         'version': '1.9.1',
+         'cmd_name': 'Pick closed-reference OTUs',
+         'input_name': 'input_data',
+         'ignore_parameters': [],
+         'parent_artifact_name': 'Trimmed Demultiplexed',
+         'parameters_names': ['Defaults - parallel']},
+        {'previous-step': 'Trimming',
+         'plugin': 'deblur',
+         'version': '1.1.0',
+         'cmd_name': 'Deblur',
+         'input_name': 'Demultiplexed sequences',
+         'ignore_parameters': [],
+         'parent_artifact_name': 'Trimmed Demultiplexed',
+         'parameters_names': ['Defaults']}
+        ]},
+]
+
+
+def _check_previous_command(prev_step, pparams):
+    if (prev_step is None and pparams is None) or (
+            pparams is not None and prev_step == pparams.command.name):
+        return True
+    return False
+
+
+def _check_requirements(requirements, template):
+    satisfied = True
+    for req in requirements:
+        if satisfied:
+            if req['column'] not in template.categories:
+                if req['equal']:
+                    satisfied = False
+                continue
+            template_value = list(map(str.lower, set(
+                template.get_category(req['column']).values())))
+            if req['equal'] and template_value != req['value']:
+                satisfied = False
+                continue
+            elif not req['equal'] and template_value == req['value']:
+                satisfied = False
+                continue
+    return satisfied
+
+
+def _check_parameters(jobs, cmd):
+    params = [{k: str(v) for k, v in j.parameters.values.items()
+              if k not in cmd['ignore_parameters']} for j in jobs]
+    return params
+
+
+def _submit_workflows(artifact_process):
+    for artifact in artifact_process:
+        if artifact['workflow'] is None:
+            continue
+        # nodes will return in position [0] the first job created
+        first_job = list(artifact['workflow'].graph.nodes())[0]
+        if first_job.status == 'in_construction':
+            artifact['workflow'].submit()
+
+
+# Step 1. Loop over the full_pipelines to process each step
+for pipeline in full_pipelines:
+    # Step 2. From the steps generate the list of commands to add to the
+    #         workflow
+    commands = []
+    for step in pipeline['steps']:
+        plugin = _get_plugin(step['plugin'], step['version'])
+        cmds = [c for c in plugin.commands if c.name == step['cmd_name']]
+        if len(cmds) != 1:
+            raise ValueError('There is more than one command with this '
+                             'definition %s' % str(step))
+
+        cmd = cmds[0]
+        parameters = []
+        for dps in cmd.default_parameter_sets:
+            if dps.name in step['parameters_names']:
+                # note that for simplicity we are converting all values in the
+                # parameters to string
+                parameters.append({'id': dps.id, 'values': {
+                    k: str(v) for k, v in dps.values.items()}})
+
+        commands.append({
+            'command': cmd,
+            'command-name': cmd.name,
+            'previous-step': step['previous-step'],
+            'parent_artifact_name': step['parent_artifact_name'],
+            'input_name': step['input_name'],
+            'ignore_parameters': step['ignore_parameters'],
+            'parameters': parameters})
+
+    # Step 2. - for children. Get their commands. We currently only support
+    #         processing for 2 levels, like:
+    #         -> Trim -> Deblur
+    #                 -> Close reference
+    #         which should be fine for now as all our pipelines only
+    #         have 2 levels
+    children_cmds = [c for c in commands[1:]
+                     if c['previous-step'] == commands[0]['command-name']]
+
+    # Step 3. Find all preparations/artifacts that we can add the pipeline
+    #         ... as a first pass we will only process study 10317 (AGP) ...
+    # artifacts_all = [a for study in Study.iter()
+    artifacts_all = [a for study in [Study(10317)]
+                     # loop over all artifacts of artifact_type with in study
+                     for a in study.artifacts(
+                         artifact_type=pipeline['artifact_type'])
+                     if _check_previous_command(
+                         pipeline['previous-step'], a.processing_parameters)]
+
+    # Step 4. Limit all_artifacts to those within restrictions
+    artifacts_compliant = []
+    for a in artifacts_all:
+        st = a.study.sample_template
+        pts = a.prep_templates
+        if not pts:
+            continue
+        pt = pts[0]
+
+        # {'sandbox', 'awaiting_approval', 'private', 'public'}
+        if a.visibility in ('sandbox', 'awaiting_approval'):
+            continue
+
+        if pt.data_type() not in pipeline['data_type']:
+            continue
+
+        reqs = pipeline['requirements']
+        if 'sample' in reqs and not _check_requirements(reqs['sample'], st):
+            continue
+        if 'prep' in reqs and not _check_requirements(reqs['prep'], pt):
+            continue
+
+        artifacts_compliant.append(a)
+
+    # Step 5a. Limit artifacts_compliant to those artifacts missing the command
+    #          and parameters of this pipeline. Note that this could be part
+    #          of Step 4 but for debugging it makes sense to separate
+    artifact_process = []
+    children_compliant = []
+    cmd = commands[0]
+    for a in artifacts_compliant:
+        # getting all jobs, includen hiddens, in case the job failed
+        jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
+        params = _check_parameters(jobs, cmd)
+
+        # checking that all required parameters of this command exist
+        missing_parameters = []
+        for p in cmd['parameters']:
+            p = p['values']
+            p.update({cmd['input_name']: str(a.id)})
+            p_to_compare = p.copy()
+            for k in cmd['ignore_parameters']:
+                del p_to_compare[k]
+            if p_to_compare not in params:
+                missing_parameters.append(p)
+            else:
+                for c in a.children:
+                    cpp = c.processing_parameters
+                    if cpp.command.name == cmd['command-name']:
+                        cparams = _check_parameters([cpp], cmd)
+                        if cparams == p_to_compare:
+                            children_compliant.append(c)
+        if missing_parameters:
+            # note that we are building a dict for each artifact so we can
+            # save the workflow id, useful for when we run this in a terminal
+            # and we want to follow up on those workflows
+            artifact_process.append({'workflow': None, 'artifact': a,
+                                     'missing_parameters': missing_parameters,
+                                     'cmd_id': 0})
+
+    # Step 5b. Add workflow/commands for children
+    for a in children_compliant:
+        for cmd_id, cmd in enumerate(children_cmds):
+            # getting all jobs, includen hiddens, in case the job failed
+            jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
+            params = _check_parameters(jobs, cmd)
+
+            # checking that all required parameters of this command exist
+            missing_parameters = []
+            for p in cmd['parameters']:
+                p = p['values']
+                p.update({cmd['input_name']: str(a.id)})
+                p_to_compare = p.copy()
+                for k in cmd['ignore_parameters']:
+                    del p_to_compare[k]
+
+                if p_to_compare not in params:
+                    missing_parameters.append(p)
+            if missing_parameters:
+                artifact_process.append(
+                    {'workflow': None, 'artifact': a,
+                     'missing_parameters': missing_parameters,
+                     'cmd_id': cmd_id + 1})
+
+    # Step 6: add workflows and jobs
+    # max processing will be useful for debugging as it allows to stop after
+    # any number of artifact_process
+    max_processing = len(artifact_process)
+
+    for i, artifact in enumerate(artifact_process):
+        if i >= max_processing:
+            break
+
+        if artifact['workflow'] is not None:
+            continue
+
+        a = artifact['artifact']
+        cmd_id = artifact['cmd_id']
+        # create the first-job/workflow with the first command and the first
+        # set of parameters
+        cmd = commands[cmd_id]
+        params = artifact['missing_parameters'][0]
+        params.update({cmd['input_name']: str(a.id)})
+        job_params = Parameters.load(cmd['command'], values_dict=params)
+
+        artifact['workflow'] = ProcessingWorkflow.from_scratch(
+            user, job_params)
+
+        # now we can add the rest of the parameters to the workflow for
+        # the first command
+        for params in artifact['missing_parameters'][1:]:
+            job_params = Parameters.load(cmd['command'], values_dict=params)
+            artifact['workflow'].add(
+                job_params, req_params={cmd['input_name']: str(a.id)})
+
+        for cmd in commands[cmd_id + 1:]:
+            # get jobs from the workflow to which we can add this new command
+            previous_jobs = [j for j in artifact['workflow'].graph.nodes()
+                             if j.command.name == cmd['previous-step']]
+            for job in previous_jobs:
+                for params in cmd['parameters']:
+                    params = params['values']
+                    params.update({cmd['input_name']: '%s%s' % (
+                        job.id, cmd['parent_artifact_name'])})
+                    job_params = Parameters.load(
+                        cmd['command'], values_dict=params)
+
+                    artifact['workflow'].add(job_params, connections={job: {
+                        cmd['parent_artifact_name']: cmd['input_name']}})
+
+    # Step 7. submit the workflows!
+    _submit_workflows(artifact_process)