--- a +++ b/scripts/qiita-auto-processing @@ -0,0 +1,328 @@ +#!/usr/bin/env python +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- +from qiita_db.handlers.plugin import _get_plugin +from qiita_db.study import Study +from qiita_db.software import Parameters +from qiita_db.user import User +from qiita_db.processing_job import ProcessingWorkflow + + +user = User('antoniog@ucsd.edu') + +# full_pipelines is a list of dict as: { +# 'name': str, +# 'data_type': list of str, +# 'artifact_type': str, +# 'previous-step': the command on the previous step +# 'requirements': {'sample': {'column': name of the column, +# 'value': the expected unique values as list +# 'equal': the value should be equal (True)}, +# 'prep': dict of column values} +# 'steps': list of dict +# } +# that define the different pipelines that are being automated. Each 'step' +# should have: { +# 'previous-step': the command on the previous step +# 'plugin': the name of the pluin we want to use, +# 'version': the version of the plugin, +# 'cmd_name': the command we want to run, +# 'input_name': the name of the input parameter of that command +# 'ignore_parameters': list of parameters to ignore, for example: threads +# 'parent_artifact_name': name of the parent output, input for this command +# 'parameters_names': list of the names of the parameter sets we want to run +# } +full_pipelines = [ + {'name': 'Full WGS - Shogun', + 'data_type': ['Metagenomic'], + 'artifact_type': 'per_sample_FASTQ', + 'previous-step': None, + 'requirements': dict(), + 'steps': [ + {'previous-step': None, + 'plugin': 'qp-meta', + 'version': '2021.01', + 'cmd_name': 'Atropos v1.1.24', + 'input_name': 'input', + 'ignore_parameters': ['Number of threads used'], + 'parent_artifact_name': None, + 'parameters_names': ['KAPA HyperPlus with iTru']}, + {'previous-step': 'Atropos v1.1.24', + 'plugin': 'qp-shogun', + 'version': '072020', + 'cmd_name': 'Shogun v1.0.8', + 'input_name': 'input', + 'ignore_parameters': ['Number of threads'], + 'parent_artifact_name': 'Adapter trimmed files', + 'parameters_names': ['wol_bowtie2', 'rep200_bowtie2']} + ]}, + {'name': 'Target Gene Processing', + 'data_type': ['16S', '18S', 'ITS'], + 'artifact_type': 'Demultiplexed', + 'previous-step': 'Split libraries FASTQ', + 'requirements': { + 'prep': [ + {'column': 'platform', 'value': ['illumina'], + 'equal': True}, + {'column': 'run_prefix', 'value': ['cmi_workshop_lane1_s1_l001'], + 'equal': False}]}, + 'steps': [ + {'previous-step': None, + 'plugin': 'QIIMEq2', + 'version': '1.9.1', + 'cmd_name': 'Trimming', + 'input_name': 'input_data', + 'ignore_parameters': [], + 'parent_artifact_name': None, + 'parameters_names': ['90 base pairs', + '100 base pairs', + '150 base pairs' + ]}, + {'previous-step': 'Trimming', + 'plugin': 'QIIMEq2', + 'version': '1.9.1', + 'cmd_name': 'Pick closed-reference OTUs', + 'input_name': 'input_data', + 'ignore_parameters': [], + 'parent_artifact_name': 'Trimmed Demultiplexed', + 'parameters_names': ['Defaults - parallel']}, + {'previous-step': 'Trimming', + 'plugin': 'deblur', + 'version': '1.1.0', + 'cmd_name': 'Deblur', + 'input_name': 'Demultiplexed sequences', + 'ignore_parameters': [], + 'parent_artifact_name': 'Trimmed Demultiplexed', + 'parameters_names': ['Defaults']} + ]}, +] + + +def _check_previous_command(prev_step, pparams): + if (prev_step is None and pparams is None) or ( + pparams is not None and prev_step == pparams.command.name): + return True + return False + + +def _check_requirements(requirements, template): + satisfied = True + for req in requirements: + if satisfied: + if req['column'] not in template.categories: + if req['equal']: + satisfied = False + continue + template_value = list(map(str.lower, set( + template.get_category(req['column']).values()))) + if req['equal'] and template_value != req['value']: + satisfied = False + continue + elif not req['equal'] and template_value == req['value']: + satisfied = False + continue + return satisfied + + +def _check_parameters(jobs, cmd): + params = [{k: str(v) for k, v in j.parameters.values.items() + if k not in cmd['ignore_parameters']} for j in jobs] + return params + + +def _submit_workflows(artifact_process): + for artifact in artifact_process: + if artifact['workflow'] is None: + continue + # nodes will return in position [0] the first job created + first_job = list(artifact['workflow'].graph.nodes())[0] + if first_job.status == 'in_construction': + artifact['workflow'].submit() + + +# Step 1. Loop over the full_pipelines to process each step +for pipeline in full_pipelines: + # Step 2. From the steps generate the list of commands to add to the + # workflow + commands = [] + for step in pipeline['steps']: + plugin = _get_plugin(step['plugin'], step['version']) + cmds = [c for c in plugin.commands if c.name == step['cmd_name']] + if len(cmds) != 1: + raise ValueError('There is more than one command with this ' + 'definition %s' % str(step)) + + cmd = cmds[0] + parameters = [] + for dps in cmd.default_parameter_sets: + if dps.name in step['parameters_names']: + # note that for simplicity we are converting all values in the + # parameters to string + parameters.append({'id': dps.id, 'values': { + k: str(v) for k, v in dps.values.items()}}) + + commands.append({ + 'command': cmd, + 'command-name': cmd.name, + 'previous-step': step['previous-step'], + 'parent_artifact_name': step['parent_artifact_name'], + 'input_name': step['input_name'], + 'ignore_parameters': step['ignore_parameters'], + 'parameters': parameters}) + + # Step 2. - for children. Get their commands. We currently only support + # processing for 2 levels, like: + # -> Trim -> Deblur + # -> Close reference + # which should be fine for now as all our pipelines only + # have 2 levels + children_cmds = [c for c in commands[1:] + if c['previous-step'] == commands[0]['command-name']] + + # Step 3. Find all preparations/artifacts that we can add the pipeline + # ... as a first pass we will only process study 10317 (AGP) ... + # artifacts_all = [a for study in Study.iter() + artifacts_all = [a for study in [Study(10317)] + # loop over all artifacts of artifact_type with in study + for a in study.artifacts( + artifact_type=pipeline['artifact_type']) + if _check_previous_command( + pipeline['previous-step'], a.processing_parameters)] + + # Step 4. Limit all_artifacts to those within restrictions + artifacts_compliant = [] + for a in artifacts_all: + st = a.study.sample_template + pts = a.prep_templates + if not pts: + continue + pt = pts[0] + + # {'sandbox', 'awaiting_approval', 'private', 'public'} + if a.visibility in ('sandbox', 'awaiting_approval'): + continue + + if pt.data_type() not in pipeline['data_type']: + continue + + reqs = pipeline['requirements'] + if 'sample' in reqs and not _check_requirements(reqs['sample'], st): + continue + if 'prep' in reqs and not _check_requirements(reqs['prep'], pt): + continue + + artifacts_compliant.append(a) + + # Step 5a. Limit artifacts_compliant to those artifacts missing the command + # and parameters of this pipeline. Note that this could be part + # of Step 4 but for debugging it makes sense to separate + artifact_process = [] + children_compliant = [] + cmd = commands[0] + for a in artifacts_compliant: + # getting all jobs, includen hiddens, in case the job failed + jobs = a.jobs(cmd=cmd['command'], show_hidden=True) + params = _check_parameters(jobs, cmd) + + # checking that all required parameters of this command exist + missing_parameters = [] + for p in cmd['parameters']: + p = p['values'] + p.update({cmd['input_name']: str(a.id)}) + p_to_compare = p.copy() + for k in cmd['ignore_parameters']: + del p_to_compare[k] + if p_to_compare not in params: + missing_parameters.append(p) + else: + for c in a.children: + cpp = c.processing_parameters + if cpp.command.name == cmd['command-name']: + cparams = _check_parameters([cpp], cmd) + if cparams == p_to_compare: + children_compliant.append(c) + if missing_parameters: + # note that we are building a dict for each artifact so we can + # save the workflow id, useful for when we run this in a terminal + # and we want to follow up on those workflows + artifact_process.append({'workflow': None, 'artifact': a, + 'missing_parameters': missing_parameters, + 'cmd_id': 0}) + + # Step 5b. Add workflow/commands for children + for a in children_compliant: + for cmd_id, cmd in enumerate(children_cmds): + # getting all jobs, includen hiddens, in case the job failed + jobs = a.jobs(cmd=cmd['command'], show_hidden=True) + params = _check_parameters(jobs, cmd) + + # checking that all required parameters of this command exist + missing_parameters = [] + for p in cmd['parameters']: + p = p['values'] + p.update({cmd['input_name']: str(a.id)}) + p_to_compare = p.copy() + for k in cmd['ignore_parameters']: + del p_to_compare[k] + + if p_to_compare not in params: + missing_parameters.append(p) + if missing_parameters: + artifact_process.append( + {'workflow': None, 'artifact': a, + 'missing_parameters': missing_parameters, + 'cmd_id': cmd_id + 1}) + + # Step 6: add workflows and jobs + # max processing will be useful for debugging as it allows to stop after + # any number of artifact_process + max_processing = len(artifact_process) + + for i, artifact in enumerate(artifact_process): + if i >= max_processing: + break + + if artifact['workflow'] is not None: + continue + + a = artifact['artifact'] + cmd_id = artifact['cmd_id'] + # create the first-job/workflow with the first command and the first + # set of parameters + cmd = commands[cmd_id] + params = artifact['missing_parameters'][0] + params.update({cmd['input_name']: str(a.id)}) + job_params = Parameters.load(cmd['command'], values_dict=params) + + artifact['workflow'] = ProcessingWorkflow.from_scratch( + user, job_params) + + # now we can add the rest of the parameters to the workflow for + # the first command + for params in artifact['missing_parameters'][1:]: + job_params = Parameters.load(cmd['command'], values_dict=params) + artifact['workflow'].add( + job_params, req_params={cmd['input_name']: str(a.id)}) + + for cmd in commands[cmd_id + 1:]: + # get jobs from the workflow to which we can add this new command + previous_jobs = [j for j in artifact['workflow'].graph.nodes() + if j.command.name == cmd['previous-step']] + for job in previous_jobs: + for params in cmd['parameters']: + params = params['values'] + params.update({cmd['input_name']: '%s%s' % ( + job.id, cmd['parent_artifact_name'])}) + job_params = Parameters.load( + cmd['command'], values_dict=params) + + artifact['workflow'].add(job_params, connections={job: { + cmd['parent_artifact_name']: cmd['input_name']}}) + + # Step 7. submit the workflows! + _submit_workflows(artifact_process)