329 lines (292 with data), 13.3 kB
#!/usr/bin/env python
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from qiita_db.handlers.plugin import _get_plugin
from qiita_db.study import Study
from qiita_db.software import Parameters
from qiita_db.user import User
from qiita_db.processing_job import ProcessingWorkflow
user = User('antoniog@ucsd.edu')
# full_pipelines is a list of dict as: {
# 'name': str,
# 'data_type': list of str,
# 'artifact_type': str,
# 'previous-step': the command on the previous step
# 'requirements': {'sample': {'column': name of the column,
# 'value': the expected unique values as list
# 'equal': the value should be equal (True)},
# 'prep': dict of column values}
# 'steps': list of dict
# }
# that define the different pipelines that are being automated. Each 'step'
# should have: {
# 'previous-step': the command on the previous step
# 'plugin': the name of the pluin we want to use,
# 'version': the version of the plugin,
# 'cmd_name': the command we want to run,
# 'input_name': the name of the input parameter of that command
# 'ignore_parameters': list of parameters to ignore, for example: threads
# 'parent_artifact_name': name of the parent output, input for this command
# 'parameters_names': list of the names of the parameter sets we want to run
# }
full_pipelines = [
{'name': 'Full WGS - Shogun',
'data_type': ['Metagenomic'],
'artifact_type': 'per_sample_FASTQ',
'previous-step': None,
'requirements': dict(),
'steps': [
{'previous-step': None,
'plugin': 'qp-meta',
'version': '2021.01',
'cmd_name': 'Atropos v1.1.24',
'input_name': 'input',
'ignore_parameters': ['Number of threads used'],
'parent_artifact_name': None,
'parameters_names': ['KAPA HyperPlus with iTru']},
{'previous-step': 'Atropos v1.1.24',
'plugin': 'qp-shogun',
'version': '072020',
'cmd_name': 'Shogun v1.0.8',
'input_name': 'input',
'ignore_parameters': ['Number of threads'],
'parent_artifact_name': 'Adapter trimmed files',
'parameters_names': ['wol_bowtie2', 'rep200_bowtie2']}
]},
{'name': 'Target Gene Processing',
'data_type': ['16S', '18S', 'ITS'],
'artifact_type': 'Demultiplexed',
'previous-step': 'Split libraries FASTQ',
'requirements': {
'prep': [
{'column': 'platform', 'value': ['illumina'],
'equal': True},
{'column': 'run_prefix', 'value': ['cmi_workshop_lane1_s1_l001'],
'equal': False}]},
'steps': [
{'previous-step': None,
'plugin': 'QIIMEq2',
'version': '1.9.1',
'cmd_name': 'Trimming',
'input_name': 'input_data',
'ignore_parameters': [],
'parent_artifact_name': None,
'parameters_names': ['90 base pairs',
'100 base pairs',
'150 base pairs'
]},
{'previous-step': 'Trimming',
'plugin': 'QIIMEq2',
'version': '1.9.1',
'cmd_name': 'Pick closed-reference OTUs',
'input_name': 'input_data',
'ignore_parameters': [],
'parent_artifact_name': 'Trimmed Demultiplexed',
'parameters_names': ['Defaults - parallel']},
{'previous-step': 'Trimming',
'plugin': 'deblur',
'version': '1.1.0',
'cmd_name': 'Deblur',
'input_name': 'Demultiplexed sequences',
'ignore_parameters': [],
'parent_artifact_name': 'Trimmed Demultiplexed',
'parameters_names': ['Defaults']}
]},
]
def _check_previous_command(prev_step, pparams):
if (prev_step is None and pparams is None) or (
pparams is not None and prev_step == pparams.command.name):
return True
return False
def _check_requirements(requirements, template):
satisfied = True
for req in requirements:
if satisfied:
if req['column'] not in template.categories:
if req['equal']:
satisfied = False
continue
template_value = list(map(str.lower, set(
template.get_category(req['column']).values())))
if req['equal'] and template_value != req['value']:
satisfied = False
continue
elif not req['equal'] and template_value == req['value']:
satisfied = False
continue
return satisfied
def _check_parameters(jobs, cmd):
params = [{k: str(v) for k, v in j.parameters.values.items()
if k not in cmd['ignore_parameters']} for j in jobs]
return params
def _submit_workflows(artifact_process):
for artifact in artifact_process:
if artifact['workflow'] is None:
continue
# nodes will return in position [0] the first job created
first_job = list(artifact['workflow'].graph.nodes())[0]
if first_job.status == 'in_construction':
artifact['workflow'].submit()
# Step 1. Loop over the full_pipelines to process each step
for pipeline in full_pipelines:
# Step 2. From the steps generate the list of commands to add to the
# workflow
commands = []
for step in pipeline['steps']:
plugin = _get_plugin(step['plugin'], step['version'])
cmds = [c for c in plugin.commands if c.name == step['cmd_name']]
if len(cmds) != 1:
raise ValueError('There is more than one command with this '
'definition %s' % str(step))
cmd = cmds[0]
parameters = []
for dps in cmd.default_parameter_sets:
if dps.name in step['parameters_names']:
# note that for simplicity we are converting all values in the
# parameters to string
parameters.append({'id': dps.id, 'values': {
k: str(v) for k, v in dps.values.items()}})
commands.append({
'command': cmd,
'command-name': cmd.name,
'previous-step': step['previous-step'],
'parent_artifact_name': step['parent_artifact_name'],
'input_name': step['input_name'],
'ignore_parameters': step['ignore_parameters'],
'parameters': parameters})
# Step 2. - for children. Get their commands. We currently only support
# processing for 2 levels, like:
# -> Trim -> Deblur
# -> Close reference
# which should be fine for now as all our pipelines only
# have 2 levels
children_cmds = [c for c in commands[1:]
if c['previous-step'] == commands[0]['command-name']]
# Step 3. Find all preparations/artifacts that we can add the pipeline
# ... as a first pass we will only process study 10317 (AGP) ...
# artifacts_all = [a for study in Study.iter()
artifacts_all = [a for study in [Study(10317)]
# loop over all artifacts of artifact_type with in study
for a in study.artifacts(
artifact_type=pipeline['artifact_type'])
if _check_previous_command(
pipeline['previous-step'], a.processing_parameters)]
# Step 4. Limit all_artifacts to those within restrictions
artifacts_compliant = []
for a in artifacts_all:
st = a.study.sample_template
pts = a.prep_templates
if not pts:
continue
pt = pts[0]
# {'sandbox', 'awaiting_approval', 'private', 'public'}
if a.visibility in ('sandbox', 'awaiting_approval'):
continue
if pt.data_type() not in pipeline['data_type']:
continue
reqs = pipeline['requirements']
if 'sample' in reqs and not _check_requirements(reqs['sample'], st):
continue
if 'prep' in reqs and not _check_requirements(reqs['prep'], pt):
continue
artifacts_compliant.append(a)
# Step 5a. Limit artifacts_compliant to those artifacts missing the command
# and parameters of this pipeline. Note that this could be part
# of Step 4 but for debugging it makes sense to separate
artifact_process = []
children_compliant = []
cmd = commands[0]
for a in artifacts_compliant:
# getting all jobs, includen hiddens, in case the job failed
jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
params = _check_parameters(jobs, cmd)
# checking that all required parameters of this command exist
missing_parameters = []
for p in cmd['parameters']:
p = p['values']
p.update({cmd['input_name']: str(a.id)})
p_to_compare = p.copy()
for k in cmd['ignore_parameters']:
del p_to_compare[k]
if p_to_compare not in params:
missing_parameters.append(p)
else:
for c in a.children:
cpp = c.processing_parameters
if cpp.command.name == cmd['command-name']:
cparams = _check_parameters([cpp], cmd)
if cparams == p_to_compare:
children_compliant.append(c)
if missing_parameters:
# note that we are building a dict for each artifact so we can
# save the workflow id, useful for when we run this in a terminal
# and we want to follow up on those workflows
artifact_process.append({'workflow': None, 'artifact': a,
'missing_parameters': missing_parameters,
'cmd_id': 0})
# Step 5b. Add workflow/commands for children
for a in children_compliant:
for cmd_id, cmd in enumerate(children_cmds):
# getting all jobs, includen hiddens, in case the job failed
jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
params = _check_parameters(jobs, cmd)
# checking that all required parameters of this command exist
missing_parameters = []
for p in cmd['parameters']:
p = p['values']
p.update({cmd['input_name']: str(a.id)})
p_to_compare = p.copy()
for k in cmd['ignore_parameters']:
del p_to_compare[k]
if p_to_compare not in params:
missing_parameters.append(p)
if missing_parameters:
artifact_process.append(
{'workflow': None, 'artifact': a,
'missing_parameters': missing_parameters,
'cmd_id': cmd_id + 1})
# Step 6: add workflows and jobs
# max processing will be useful for debugging as it allows to stop after
# any number of artifact_process
max_processing = len(artifact_process)
for i, artifact in enumerate(artifact_process):
if i >= max_processing:
break
if artifact['workflow'] is not None:
continue
a = artifact['artifact']
cmd_id = artifact['cmd_id']
# create the first-job/workflow with the first command and the first
# set of parameters
cmd = commands[cmd_id]
params = artifact['missing_parameters'][0]
params.update({cmd['input_name']: str(a.id)})
job_params = Parameters.load(cmd['command'], values_dict=params)
artifact['workflow'] = ProcessingWorkflow.from_scratch(
user, job_params)
# now we can add the rest of the parameters to the workflow for
# the first command
for params in artifact['missing_parameters'][1:]:
job_params = Parameters.load(cmd['command'], values_dict=params)
artifact['workflow'].add(
job_params, req_params={cmd['input_name']: str(a.id)})
for cmd in commands[cmd_id + 1:]:
# get jobs from the workflow to which we can add this new command
previous_jobs = [j for j in artifact['workflow'].graph.nodes()
if j.command.name == cmd['previous-step']]
for job in previous_jobs:
for params in cmd['parameters']:
params = params['values']
params.update({cmd['input_name']: '%s%s' % (
job.id, cmd['parent_artifact_name'])})
job_params = Parameters.load(
cmd['command'], values_dict=params)
artifact['workflow'].add(job_params, connections={job: {
cmd['parent_artifact_name']: cmd['input_name']}})
# Step 7. submit the workflows!
_submit_workflows(artifact_process)