[879b32]: / scripts / qiita-auto-processing

Download this file

329 lines (292 with data), 13.3 kB

#!/usr/bin/env python
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from qiita_db.handlers.plugin import _get_plugin
from qiita_db.study import Study
from qiita_db.software import Parameters
from qiita_db.user import User
from qiita_db.processing_job import ProcessingWorkflow


user = User('antoniog@ucsd.edu')

# full_pipelines is a list of dict as: {
#   'name': str,
#   'data_type': list of str,
#   'artifact_type': str,
#   'previous-step': the command on the previous step
#   'requirements': {'sample': {'column': name of the column,
#                               'value': the expected unique values as list
#                               'equal': the value should be equal (True)},
#                    'prep': dict of column values}
#   'steps': list of dict
# }
# that define the different pipelines that are being automated. Each 'step'
# should have: {
#   'previous-step': the command on the previous step
#   'plugin': the name of the pluin we want to use,
#   'version': the version of the plugin,
#   'cmd_name': the command we want to run,
#   'input_name': the name of the input parameter of that command
#   'ignore_parameters': list of parameters to ignore, for example: threads
#   'parent_artifact_name': name of the parent output, input for this command
#   'parameters_names': list of the names of the parameter sets we want to run
# }
full_pipelines = [
    {'name': 'Full WGS - Shogun',
     'data_type': ['Metagenomic'],
     'artifact_type': 'per_sample_FASTQ',
     'previous-step': None,
     'requirements': dict(),
     'steps': [
         {'previous-step': None,
          'plugin': 'qp-meta',
          'version': '2021.01',
          'cmd_name': 'Atropos v1.1.24',
          'input_name': 'input',
          'ignore_parameters': ['Number of threads used'],
          'parent_artifact_name': None,
          'parameters_names': ['KAPA HyperPlus with iTru']},
         {'previous-step': 'Atropos v1.1.24',
          'plugin': 'qp-shogun',
          'version': '072020',
          'cmd_name': 'Shogun v1.0.8',
          'input_name': 'input',
          'ignore_parameters': ['Number of threads'],
          'parent_artifact_name': 'Adapter trimmed files',
          'parameters_names': ['wol_bowtie2', 'rep200_bowtie2']}
      ]},
    {'name': 'Target Gene Processing',
     'data_type': ['16S', '18S', 'ITS'],
     'artifact_type': 'Demultiplexed',
     'previous-step': 'Split libraries FASTQ',
     'requirements': {
        'prep': [
            {'column': 'platform', 'value': ['illumina'],
             'equal': True},
            {'column': 'run_prefix', 'value': ['cmi_workshop_lane1_s1_l001'],
             'equal': False}]},
     'steps': [
        {'previous-step': None,
         'plugin': 'QIIMEq2',
         'version': '1.9.1',
         'cmd_name': 'Trimming',
         'input_name': 'input_data',
         'ignore_parameters': [],
         'parent_artifact_name': None,
         'parameters_names': ['90 base pairs',
                              '100 base pairs',
                              '150 base pairs'
                              ]},
        {'previous-step': 'Trimming',
         'plugin': 'QIIMEq2',
         'version': '1.9.1',
         'cmd_name': 'Pick closed-reference OTUs',
         'input_name': 'input_data',
         'ignore_parameters': [],
         'parent_artifact_name': 'Trimmed Demultiplexed',
         'parameters_names': ['Defaults - parallel']},
        {'previous-step': 'Trimming',
         'plugin': 'deblur',
         'version': '1.1.0',
         'cmd_name': 'Deblur',
         'input_name': 'Demultiplexed sequences',
         'ignore_parameters': [],
         'parent_artifact_name': 'Trimmed Demultiplexed',
         'parameters_names': ['Defaults']}
        ]},
]


def _check_previous_command(prev_step, pparams):
    if (prev_step is None and pparams is None) or (
            pparams is not None and prev_step == pparams.command.name):
        return True
    return False


def _check_requirements(requirements, template):
    satisfied = True
    for req in requirements:
        if satisfied:
            if req['column'] not in template.categories:
                if req['equal']:
                    satisfied = False
                continue
            template_value = list(map(str.lower, set(
                template.get_category(req['column']).values())))
            if req['equal'] and template_value != req['value']:
                satisfied = False
                continue
            elif not req['equal'] and template_value == req['value']:
                satisfied = False
                continue
    return satisfied


def _check_parameters(jobs, cmd):
    params = [{k: str(v) for k, v in j.parameters.values.items()
              if k not in cmd['ignore_parameters']} for j in jobs]
    return params


def _submit_workflows(artifact_process):
    for artifact in artifact_process:
        if artifact['workflow'] is None:
            continue
        # nodes will return in position [0] the first job created
        first_job = list(artifact['workflow'].graph.nodes())[0]
        if first_job.status == 'in_construction':
            artifact['workflow'].submit()


# Step 1. Loop over the full_pipelines to process each step
for pipeline in full_pipelines:
    # Step 2. From the steps generate the list of commands to add to the
    #         workflow
    commands = []
    for step in pipeline['steps']:
        plugin = _get_plugin(step['plugin'], step['version'])
        cmds = [c for c in plugin.commands if c.name == step['cmd_name']]
        if len(cmds) != 1:
            raise ValueError('There is more than one command with this '
                             'definition %s' % str(step))

        cmd = cmds[0]
        parameters = []
        for dps in cmd.default_parameter_sets:
            if dps.name in step['parameters_names']:
                # note that for simplicity we are converting all values in the
                # parameters to string
                parameters.append({'id': dps.id, 'values': {
                    k: str(v) for k, v in dps.values.items()}})

        commands.append({
            'command': cmd,
            'command-name': cmd.name,
            'previous-step': step['previous-step'],
            'parent_artifact_name': step['parent_artifact_name'],
            'input_name': step['input_name'],
            'ignore_parameters': step['ignore_parameters'],
            'parameters': parameters})

    # Step 2. - for children. Get their commands. We currently only support
    #         processing for 2 levels, like:
    #         -> Trim -> Deblur
    #                 -> Close reference
    #         which should be fine for now as all our pipelines only
    #         have 2 levels
    children_cmds = [c for c in commands[1:]
                     if c['previous-step'] == commands[0]['command-name']]

    # Step 3. Find all preparations/artifacts that we can add the pipeline
    #         ... as a first pass we will only process study 10317 (AGP) ...
    # artifacts_all = [a for study in Study.iter()
    artifacts_all = [a for study in [Study(10317)]
                     # loop over all artifacts of artifact_type with in study
                     for a in study.artifacts(
                         artifact_type=pipeline['artifact_type'])
                     if _check_previous_command(
                         pipeline['previous-step'], a.processing_parameters)]

    # Step 4. Limit all_artifacts to those within restrictions
    artifacts_compliant = []
    for a in artifacts_all:
        st = a.study.sample_template
        pts = a.prep_templates
        if not pts:
            continue
        pt = pts[0]

        # {'sandbox', 'awaiting_approval', 'private', 'public'}
        if a.visibility in ('sandbox', 'awaiting_approval'):
            continue

        if pt.data_type() not in pipeline['data_type']:
            continue

        reqs = pipeline['requirements']
        if 'sample' in reqs and not _check_requirements(reqs['sample'], st):
            continue
        if 'prep' in reqs and not _check_requirements(reqs['prep'], pt):
            continue

        artifacts_compliant.append(a)

    # Step 5a. Limit artifacts_compliant to those artifacts missing the command
    #          and parameters of this pipeline. Note that this could be part
    #          of Step 4 but for debugging it makes sense to separate
    artifact_process = []
    children_compliant = []
    cmd = commands[0]
    for a in artifacts_compliant:
        # getting all jobs, includen hiddens, in case the job failed
        jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
        params = _check_parameters(jobs, cmd)

        # checking that all required parameters of this command exist
        missing_parameters = []
        for p in cmd['parameters']:
            p = p['values']
            p.update({cmd['input_name']: str(a.id)})
            p_to_compare = p.copy()
            for k in cmd['ignore_parameters']:
                del p_to_compare[k]
            if p_to_compare not in params:
                missing_parameters.append(p)
            else:
                for c in a.children:
                    cpp = c.processing_parameters
                    if cpp.command.name == cmd['command-name']:
                        cparams = _check_parameters([cpp], cmd)
                        if cparams == p_to_compare:
                            children_compliant.append(c)
        if missing_parameters:
            # note that we are building a dict for each artifact so we can
            # save the workflow id, useful for when we run this in a terminal
            # and we want to follow up on those workflows
            artifact_process.append({'workflow': None, 'artifact': a,
                                     'missing_parameters': missing_parameters,
                                     'cmd_id': 0})

    # Step 5b. Add workflow/commands for children
    for a in children_compliant:
        for cmd_id, cmd in enumerate(children_cmds):
            # getting all jobs, includen hiddens, in case the job failed
            jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
            params = _check_parameters(jobs, cmd)

            # checking that all required parameters of this command exist
            missing_parameters = []
            for p in cmd['parameters']:
                p = p['values']
                p.update({cmd['input_name']: str(a.id)})
                p_to_compare = p.copy()
                for k in cmd['ignore_parameters']:
                    del p_to_compare[k]

                if p_to_compare not in params:
                    missing_parameters.append(p)
            if missing_parameters:
                artifact_process.append(
                    {'workflow': None, 'artifact': a,
                     'missing_parameters': missing_parameters,
                     'cmd_id': cmd_id + 1})

    # Step 6: add workflows and jobs
    # max processing will be useful for debugging as it allows to stop after
    # any number of artifact_process
    max_processing = len(artifact_process)

    for i, artifact in enumerate(artifact_process):
        if i >= max_processing:
            break

        if artifact['workflow'] is not None:
            continue

        a = artifact['artifact']
        cmd_id = artifact['cmd_id']
        # create the first-job/workflow with the first command and the first
        # set of parameters
        cmd = commands[cmd_id]
        params = artifact['missing_parameters'][0]
        params.update({cmd['input_name']: str(a.id)})
        job_params = Parameters.load(cmd['command'], values_dict=params)

        artifact['workflow'] = ProcessingWorkflow.from_scratch(
            user, job_params)

        # now we can add the rest of the parameters to the workflow for
        # the first command
        for params in artifact['missing_parameters'][1:]:
            job_params = Parameters.load(cmd['command'], values_dict=params)
            artifact['workflow'].add(
                job_params, req_params={cmd['input_name']: str(a.id)})

        for cmd in commands[cmd_id + 1:]:
            # get jobs from the workflow to which we can add this new command
            previous_jobs = [j for j in artifact['workflow'].graph.nodes()
                             if j.command.name == cmd['previous-step']]
            for job in previous_jobs:
                for params in cmd['parameters']:
                    params = params['values']
                    params.update({cmd['input_name']: '%s%s' % (
                        job.id, cmd['parent_artifact_name'])})
                    job_params = Parameters.load(
                        cmd['command'], values_dict=params)

                    artifact['workflow'].add(job_params, connections={job: {
                        cmd['parent_artifact_name']: cmd['input_name']}})

    # Step 7. submit the workflows!
    _submit_workflows(artifact_process)