|
a |
|
b/scripts/qiita-auto-processing |
|
|
1 |
#!/usr/bin/env python |
|
|
2 |
# ----------------------------------------------------------------------------- |
|
|
3 |
# Copyright (c) 2014--, The Qiita Development Team. |
|
|
4 |
# |
|
|
5 |
# Distributed under the terms of the BSD 3-clause License. |
|
|
6 |
# |
|
|
7 |
# The full license is in the file LICENSE, distributed with this software. |
|
|
8 |
# ----------------------------------------------------------------------------- |
|
|
9 |
from qiita_db.handlers.plugin import _get_plugin |
|
|
10 |
from qiita_db.study import Study |
|
|
11 |
from qiita_db.software import Parameters |
|
|
12 |
from qiita_db.user import User |
|
|
13 |
from qiita_db.processing_job import ProcessingWorkflow |
|
|
14 |
|
|
|
15 |
|
|
|
16 |
user = User('antoniog@ucsd.edu') |
|
|
17 |
|
|
|
18 |
# full_pipelines is a list of dict as: { |
|
|
19 |
# 'name': str, |
|
|
20 |
# 'data_type': list of str, |
|
|
21 |
# 'artifact_type': str, |
|
|
22 |
# 'previous-step': the command on the previous step |
|
|
23 |
# 'requirements': {'sample': {'column': name of the column, |
|
|
24 |
# 'value': the expected unique values as list |
|
|
25 |
# 'equal': the value should be equal (True)}, |
|
|
26 |
# 'prep': dict of column values} |
|
|
27 |
# 'steps': list of dict |
|
|
28 |
# } |
|
|
29 |
# that define the different pipelines that are being automated. Each 'step' |
|
|
30 |
# should have: { |
|
|
31 |
# 'previous-step': the command on the previous step |
|
|
32 |
# 'plugin': the name of the pluin we want to use, |
|
|
33 |
# 'version': the version of the plugin, |
|
|
34 |
# 'cmd_name': the command we want to run, |
|
|
35 |
# 'input_name': the name of the input parameter of that command |
|
|
36 |
# 'ignore_parameters': list of parameters to ignore, for example: threads |
|
|
37 |
# 'parent_artifact_name': name of the parent output, input for this command |
|
|
38 |
# 'parameters_names': list of the names of the parameter sets we want to run |
|
|
39 |
# } |
|
|
40 |
full_pipelines = [ |
|
|
41 |
{'name': 'Full WGS - Shogun', |
|
|
42 |
'data_type': ['Metagenomic'], |
|
|
43 |
'artifact_type': 'per_sample_FASTQ', |
|
|
44 |
'previous-step': None, |
|
|
45 |
'requirements': dict(), |
|
|
46 |
'steps': [ |
|
|
47 |
{'previous-step': None, |
|
|
48 |
'plugin': 'qp-meta', |
|
|
49 |
'version': '2021.01', |
|
|
50 |
'cmd_name': 'Atropos v1.1.24', |
|
|
51 |
'input_name': 'input', |
|
|
52 |
'ignore_parameters': ['Number of threads used'], |
|
|
53 |
'parent_artifact_name': None, |
|
|
54 |
'parameters_names': ['KAPA HyperPlus with iTru']}, |
|
|
55 |
{'previous-step': 'Atropos v1.1.24', |
|
|
56 |
'plugin': 'qp-shogun', |
|
|
57 |
'version': '072020', |
|
|
58 |
'cmd_name': 'Shogun v1.0.8', |
|
|
59 |
'input_name': 'input', |
|
|
60 |
'ignore_parameters': ['Number of threads'], |
|
|
61 |
'parent_artifact_name': 'Adapter trimmed files', |
|
|
62 |
'parameters_names': ['wol_bowtie2', 'rep200_bowtie2']} |
|
|
63 |
]}, |
|
|
64 |
{'name': 'Target Gene Processing', |
|
|
65 |
'data_type': ['16S', '18S', 'ITS'], |
|
|
66 |
'artifact_type': 'Demultiplexed', |
|
|
67 |
'previous-step': 'Split libraries FASTQ', |
|
|
68 |
'requirements': { |
|
|
69 |
'prep': [ |
|
|
70 |
{'column': 'platform', 'value': ['illumina'], |
|
|
71 |
'equal': True}, |
|
|
72 |
{'column': 'run_prefix', 'value': ['cmi_workshop_lane1_s1_l001'], |
|
|
73 |
'equal': False}]}, |
|
|
74 |
'steps': [ |
|
|
75 |
{'previous-step': None, |
|
|
76 |
'plugin': 'QIIMEq2', |
|
|
77 |
'version': '1.9.1', |
|
|
78 |
'cmd_name': 'Trimming', |
|
|
79 |
'input_name': 'input_data', |
|
|
80 |
'ignore_parameters': [], |
|
|
81 |
'parent_artifact_name': None, |
|
|
82 |
'parameters_names': ['90 base pairs', |
|
|
83 |
'100 base pairs', |
|
|
84 |
'150 base pairs' |
|
|
85 |
]}, |
|
|
86 |
{'previous-step': 'Trimming', |
|
|
87 |
'plugin': 'QIIMEq2', |
|
|
88 |
'version': '1.9.1', |
|
|
89 |
'cmd_name': 'Pick closed-reference OTUs', |
|
|
90 |
'input_name': 'input_data', |
|
|
91 |
'ignore_parameters': [], |
|
|
92 |
'parent_artifact_name': 'Trimmed Demultiplexed', |
|
|
93 |
'parameters_names': ['Defaults - parallel']}, |
|
|
94 |
{'previous-step': 'Trimming', |
|
|
95 |
'plugin': 'deblur', |
|
|
96 |
'version': '1.1.0', |
|
|
97 |
'cmd_name': 'Deblur', |
|
|
98 |
'input_name': 'Demultiplexed sequences', |
|
|
99 |
'ignore_parameters': [], |
|
|
100 |
'parent_artifact_name': 'Trimmed Demultiplexed', |
|
|
101 |
'parameters_names': ['Defaults']} |
|
|
102 |
]}, |
|
|
103 |
] |
|
|
104 |
|
|
|
105 |
|
|
|
106 |
def _check_previous_command(prev_step, pparams): |
|
|
107 |
if (prev_step is None and pparams is None) or ( |
|
|
108 |
pparams is not None and prev_step == pparams.command.name): |
|
|
109 |
return True |
|
|
110 |
return False |
|
|
111 |
|
|
|
112 |
|
|
|
113 |
def _check_requirements(requirements, template): |
|
|
114 |
satisfied = True |
|
|
115 |
for req in requirements: |
|
|
116 |
if satisfied: |
|
|
117 |
if req['column'] not in template.categories: |
|
|
118 |
if req['equal']: |
|
|
119 |
satisfied = False |
|
|
120 |
continue |
|
|
121 |
template_value = list(map(str.lower, set( |
|
|
122 |
template.get_category(req['column']).values()))) |
|
|
123 |
if req['equal'] and template_value != req['value']: |
|
|
124 |
satisfied = False |
|
|
125 |
continue |
|
|
126 |
elif not req['equal'] and template_value == req['value']: |
|
|
127 |
satisfied = False |
|
|
128 |
continue |
|
|
129 |
return satisfied |
|
|
130 |
|
|
|
131 |
|
|
|
132 |
def _check_parameters(jobs, cmd): |
|
|
133 |
params = [{k: str(v) for k, v in j.parameters.values.items() |
|
|
134 |
if k not in cmd['ignore_parameters']} for j in jobs] |
|
|
135 |
return params |
|
|
136 |
|
|
|
137 |
|
|
|
138 |
def _submit_workflows(artifact_process): |
|
|
139 |
for artifact in artifact_process: |
|
|
140 |
if artifact['workflow'] is None: |
|
|
141 |
continue |
|
|
142 |
# nodes will return in position [0] the first job created |
|
|
143 |
first_job = list(artifact['workflow'].graph.nodes())[0] |
|
|
144 |
if first_job.status == 'in_construction': |
|
|
145 |
artifact['workflow'].submit() |
|
|
146 |
|
|
|
147 |
|
|
|
148 |
# Step 1. Loop over the full_pipelines to process each step |
|
|
149 |
for pipeline in full_pipelines: |
|
|
150 |
# Step 2. From the steps generate the list of commands to add to the |
|
|
151 |
# workflow |
|
|
152 |
commands = [] |
|
|
153 |
for step in pipeline['steps']: |
|
|
154 |
plugin = _get_plugin(step['plugin'], step['version']) |
|
|
155 |
cmds = [c for c in plugin.commands if c.name == step['cmd_name']] |
|
|
156 |
if len(cmds) != 1: |
|
|
157 |
raise ValueError('There is more than one command with this ' |
|
|
158 |
'definition %s' % str(step)) |
|
|
159 |
|
|
|
160 |
cmd = cmds[0] |
|
|
161 |
parameters = [] |
|
|
162 |
for dps in cmd.default_parameter_sets: |
|
|
163 |
if dps.name in step['parameters_names']: |
|
|
164 |
# note that for simplicity we are converting all values in the |
|
|
165 |
# parameters to string |
|
|
166 |
parameters.append({'id': dps.id, 'values': { |
|
|
167 |
k: str(v) for k, v in dps.values.items()}}) |
|
|
168 |
|
|
|
169 |
commands.append({ |
|
|
170 |
'command': cmd, |
|
|
171 |
'command-name': cmd.name, |
|
|
172 |
'previous-step': step['previous-step'], |
|
|
173 |
'parent_artifact_name': step['parent_artifact_name'], |
|
|
174 |
'input_name': step['input_name'], |
|
|
175 |
'ignore_parameters': step['ignore_parameters'], |
|
|
176 |
'parameters': parameters}) |
|
|
177 |
|
|
|
178 |
# Step 2. - for children. Get their commands. We currently only support |
|
|
179 |
# processing for 2 levels, like: |
|
|
180 |
# -> Trim -> Deblur |
|
|
181 |
# -> Close reference |
|
|
182 |
# which should be fine for now as all our pipelines only |
|
|
183 |
# have 2 levels |
|
|
184 |
children_cmds = [c for c in commands[1:] |
|
|
185 |
if c['previous-step'] == commands[0]['command-name']] |
|
|
186 |
|
|
|
187 |
# Step 3. Find all preparations/artifacts that we can add the pipeline |
|
|
188 |
# ... as a first pass we will only process study 10317 (AGP) ... |
|
|
189 |
# artifacts_all = [a for study in Study.iter() |
|
|
190 |
artifacts_all = [a for study in [Study(10317)] |
|
|
191 |
# loop over all artifacts of artifact_type with in study |
|
|
192 |
for a in study.artifacts( |
|
|
193 |
artifact_type=pipeline['artifact_type']) |
|
|
194 |
if _check_previous_command( |
|
|
195 |
pipeline['previous-step'], a.processing_parameters)] |
|
|
196 |
|
|
|
197 |
# Step 4. Limit all_artifacts to those within restrictions |
|
|
198 |
artifacts_compliant = [] |
|
|
199 |
for a in artifacts_all: |
|
|
200 |
st = a.study.sample_template |
|
|
201 |
pts = a.prep_templates |
|
|
202 |
if not pts: |
|
|
203 |
continue |
|
|
204 |
pt = pts[0] |
|
|
205 |
|
|
|
206 |
# {'sandbox', 'awaiting_approval', 'private', 'public'} |
|
|
207 |
if a.visibility in ('sandbox', 'awaiting_approval'): |
|
|
208 |
continue |
|
|
209 |
|
|
|
210 |
if pt.data_type() not in pipeline['data_type']: |
|
|
211 |
continue |
|
|
212 |
|
|
|
213 |
reqs = pipeline['requirements'] |
|
|
214 |
if 'sample' in reqs and not _check_requirements(reqs['sample'], st): |
|
|
215 |
continue |
|
|
216 |
if 'prep' in reqs and not _check_requirements(reqs['prep'], pt): |
|
|
217 |
continue |
|
|
218 |
|
|
|
219 |
artifacts_compliant.append(a) |
|
|
220 |
|
|
|
221 |
# Step 5a. Limit artifacts_compliant to those artifacts missing the command |
|
|
222 |
# and parameters of this pipeline. Note that this could be part |
|
|
223 |
# of Step 4 but for debugging it makes sense to separate |
|
|
224 |
artifact_process = [] |
|
|
225 |
children_compliant = [] |
|
|
226 |
cmd = commands[0] |
|
|
227 |
for a in artifacts_compliant: |
|
|
228 |
# getting all jobs, includen hiddens, in case the job failed |
|
|
229 |
jobs = a.jobs(cmd=cmd['command'], show_hidden=True) |
|
|
230 |
params = _check_parameters(jobs, cmd) |
|
|
231 |
|
|
|
232 |
# checking that all required parameters of this command exist |
|
|
233 |
missing_parameters = [] |
|
|
234 |
for p in cmd['parameters']: |
|
|
235 |
p = p['values'] |
|
|
236 |
p.update({cmd['input_name']: str(a.id)}) |
|
|
237 |
p_to_compare = p.copy() |
|
|
238 |
for k in cmd['ignore_parameters']: |
|
|
239 |
del p_to_compare[k] |
|
|
240 |
if p_to_compare not in params: |
|
|
241 |
missing_parameters.append(p) |
|
|
242 |
else: |
|
|
243 |
for c in a.children: |
|
|
244 |
cpp = c.processing_parameters |
|
|
245 |
if cpp.command.name == cmd['command-name']: |
|
|
246 |
cparams = _check_parameters([cpp], cmd) |
|
|
247 |
if cparams == p_to_compare: |
|
|
248 |
children_compliant.append(c) |
|
|
249 |
if missing_parameters: |
|
|
250 |
# note that we are building a dict for each artifact so we can |
|
|
251 |
# save the workflow id, useful for when we run this in a terminal |
|
|
252 |
# and we want to follow up on those workflows |
|
|
253 |
artifact_process.append({'workflow': None, 'artifact': a, |
|
|
254 |
'missing_parameters': missing_parameters, |
|
|
255 |
'cmd_id': 0}) |
|
|
256 |
|
|
|
257 |
# Step 5b. Add workflow/commands for children |
|
|
258 |
for a in children_compliant: |
|
|
259 |
for cmd_id, cmd in enumerate(children_cmds): |
|
|
260 |
# getting all jobs, includen hiddens, in case the job failed |
|
|
261 |
jobs = a.jobs(cmd=cmd['command'], show_hidden=True) |
|
|
262 |
params = _check_parameters(jobs, cmd) |
|
|
263 |
|
|
|
264 |
# checking that all required parameters of this command exist |
|
|
265 |
missing_parameters = [] |
|
|
266 |
for p in cmd['parameters']: |
|
|
267 |
p = p['values'] |
|
|
268 |
p.update({cmd['input_name']: str(a.id)}) |
|
|
269 |
p_to_compare = p.copy() |
|
|
270 |
for k in cmd['ignore_parameters']: |
|
|
271 |
del p_to_compare[k] |
|
|
272 |
|
|
|
273 |
if p_to_compare not in params: |
|
|
274 |
missing_parameters.append(p) |
|
|
275 |
if missing_parameters: |
|
|
276 |
artifact_process.append( |
|
|
277 |
{'workflow': None, 'artifact': a, |
|
|
278 |
'missing_parameters': missing_parameters, |
|
|
279 |
'cmd_id': cmd_id + 1}) |
|
|
280 |
|
|
|
281 |
# Step 6: add workflows and jobs |
|
|
282 |
# max processing will be useful for debugging as it allows to stop after |
|
|
283 |
# any number of artifact_process |
|
|
284 |
max_processing = len(artifact_process) |
|
|
285 |
|
|
|
286 |
for i, artifact in enumerate(artifact_process): |
|
|
287 |
if i >= max_processing: |
|
|
288 |
break |
|
|
289 |
|
|
|
290 |
if artifact['workflow'] is not None: |
|
|
291 |
continue |
|
|
292 |
|
|
|
293 |
a = artifact['artifact'] |
|
|
294 |
cmd_id = artifact['cmd_id'] |
|
|
295 |
# create the first-job/workflow with the first command and the first |
|
|
296 |
# set of parameters |
|
|
297 |
cmd = commands[cmd_id] |
|
|
298 |
params = artifact['missing_parameters'][0] |
|
|
299 |
params.update({cmd['input_name']: str(a.id)}) |
|
|
300 |
job_params = Parameters.load(cmd['command'], values_dict=params) |
|
|
301 |
|
|
|
302 |
artifact['workflow'] = ProcessingWorkflow.from_scratch( |
|
|
303 |
user, job_params) |
|
|
304 |
|
|
|
305 |
# now we can add the rest of the parameters to the workflow for |
|
|
306 |
# the first command |
|
|
307 |
for params in artifact['missing_parameters'][1:]: |
|
|
308 |
job_params = Parameters.load(cmd['command'], values_dict=params) |
|
|
309 |
artifact['workflow'].add( |
|
|
310 |
job_params, req_params={cmd['input_name']: str(a.id)}) |
|
|
311 |
|
|
|
312 |
for cmd in commands[cmd_id + 1:]: |
|
|
313 |
# get jobs from the workflow to which we can add this new command |
|
|
314 |
previous_jobs = [j for j in artifact['workflow'].graph.nodes() |
|
|
315 |
if j.command.name == cmd['previous-step']] |
|
|
316 |
for job in previous_jobs: |
|
|
317 |
for params in cmd['parameters']: |
|
|
318 |
params = params['values'] |
|
|
319 |
params.update({cmd['input_name']: '%s%s' % ( |
|
|
320 |
job.id, cmd['parent_artifact_name'])}) |
|
|
321 |
job_params = Parameters.load( |
|
|
322 |
cmd['command'], values_dict=params) |
|
|
323 |
|
|
|
324 |
artifact['workflow'].add(job_params, connections={job: { |
|
|
325 |
cmd['parent_artifact_name']: cmd['input_name']}}) |
|
|
326 |
|
|
|
327 |
# Step 7. submit the workflows! |
|
|
328 |
_submit_workflows(artifact_process) |