Switch to unified view

a b/scripts/qiita-auto-processing
1
#!/usr/bin/env python
2
# -----------------------------------------------------------------------------
3
# Copyright (c) 2014--, The Qiita Development Team.
4
#
5
# Distributed under the terms of the BSD 3-clause License.
6
#
7
# The full license is in the file LICENSE, distributed with this software.
8
# -----------------------------------------------------------------------------
9
from qiita_db.handlers.plugin import _get_plugin
10
from qiita_db.study import Study
11
from qiita_db.software import Parameters
12
from qiita_db.user import User
13
from qiita_db.processing_job import ProcessingWorkflow
14
15
16
user = User('antoniog@ucsd.edu')
17
18
# full_pipelines is a list of dict as: {
19
#   'name': str,
20
#   'data_type': list of str,
21
#   'artifact_type': str,
22
#   'previous-step': the command on the previous step
23
#   'requirements': {'sample': {'column': name of the column,
24
#                               'value': the expected unique values as list
25
#                               'equal': the value should be equal (True)},
26
#                    'prep': dict of column values}
27
#   'steps': list of dict
28
# }
29
# that define the different pipelines that are being automated. Each 'step'
30
# should have: {
31
#   'previous-step': the command on the previous step
32
#   'plugin': the name of the pluin we want to use,
33
#   'version': the version of the plugin,
34
#   'cmd_name': the command we want to run,
35
#   'input_name': the name of the input parameter of that command
36
#   'ignore_parameters': list of parameters to ignore, for example: threads
37
#   'parent_artifact_name': name of the parent output, input for this command
38
#   'parameters_names': list of the names of the parameter sets we want to run
39
# }
40
full_pipelines = [
41
    {'name': 'Full WGS - Shogun',
42
     'data_type': ['Metagenomic'],
43
     'artifact_type': 'per_sample_FASTQ',
44
     'previous-step': None,
45
     'requirements': dict(),
46
     'steps': [
47
         {'previous-step': None,
48
          'plugin': 'qp-meta',
49
          'version': '2021.01',
50
          'cmd_name': 'Atropos v1.1.24',
51
          'input_name': 'input',
52
          'ignore_parameters': ['Number of threads used'],
53
          'parent_artifact_name': None,
54
          'parameters_names': ['KAPA HyperPlus with iTru']},
55
         {'previous-step': 'Atropos v1.1.24',
56
          'plugin': 'qp-shogun',
57
          'version': '072020',
58
          'cmd_name': 'Shogun v1.0.8',
59
          'input_name': 'input',
60
          'ignore_parameters': ['Number of threads'],
61
          'parent_artifact_name': 'Adapter trimmed files',
62
          'parameters_names': ['wol_bowtie2', 'rep200_bowtie2']}
63
      ]},
64
    {'name': 'Target Gene Processing',
65
     'data_type': ['16S', '18S', 'ITS'],
66
     'artifact_type': 'Demultiplexed',
67
     'previous-step': 'Split libraries FASTQ',
68
     'requirements': {
69
        'prep': [
70
            {'column': 'platform', 'value': ['illumina'],
71
             'equal': True},
72
            {'column': 'run_prefix', 'value': ['cmi_workshop_lane1_s1_l001'],
73
             'equal': False}]},
74
     'steps': [
75
        {'previous-step': None,
76
         'plugin': 'QIIMEq2',
77
         'version': '1.9.1',
78
         'cmd_name': 'Trimming',
79
         'input_name': 'input_data',
80
         'ignore_parameters': [],
81
         'parent_artifact_name': None,
82
         'parameters_names': ['90 base pairs',
83
                              '100 base pairs',
84
                              '150 base pairs'
85
                              ]},
86
        {'previous-step': 'Trimming',
87
         'plugin': 'QIIMEq2',
88
         'version': '1.9.1',
89
         'cmd_name': 'Pick closed-reference OTUs',
90
         'input_name': 'input_data',
91
         'ignore_parameters': [],
92
         'parent_artifact_name': 'Trimmed Demultiplexed',
93
         'parameters_names': ['Defaults - parallel']},
94
        {'previous-step': 'Trimming',
95
         'plugin': 'deblur',
96
         'version': '1.1.0',
97
         'cmd_name': 'Deblur',
98
         'input_name': 'Demultiplexed sequences',
99
         'ignore_parameters': [],
100
         'parent_artifact_name': 'Trimmed Demultiplexed',
101
         'parameters_names': ['Defaults']}
102
        ]},
103
]
104
105
106
def _check_previous_command(prev_step, pparams):
107
    if (prev_step is None and pparams is None) or (
108
            pparams is not None and prev_step == pparams.command.name):
109
        return True
110
    return False
111
112
113
def _check_requirements(requirements, template):
114
    satisfied = True
115
    for req in requirements:
116
        if satisfied:
117
            if req['column'] not in template.categories:
118
                if req['equal']:
119
                    satisfied = False
120
                continue
121
            template_value = list(map(str.lower, set(
122
                template.get_category(req['column']).values())))
123
            if req['equal'] and template_value != req['value']:
124
                satisfied = False
125
                continue
126
            elif not req['equal'] and template_value == req['value']:
127
                satisfied = False
128
                continue
129
    return satisfied
130
131
132
def _check_parameters(jobs, cmd):
133
    params = [{k: str(v) for k, v in j.parameters.values.items()
134
              if k not in cmd['ignore_parameters']} for j in jobs]
135
    return params
136
137
138
def _submit_workflows(artifact_process):
139
    for artifact in artifact_process:
140
        if artifact['workflow'] is None:
141
            continue
142
        # nodes will return in position [0] the first job created
143
        first_job = list(artifact['workflow'].graph.nodes())[0]
144
        if first_job.status == 'in_construction':
145
            artifact['workflow'].submit()
146
147
148
# Step 1. Loop over the full_pipelines to process each step
149
for pipeline in full_pipelines:
150
    # Step 2. From the steps generate the list of commands to add to the
151
    #         workflow
152
    commands = []
153
    for step in pipeline['steps']:
154
        plugin = _get_plugin(step['plugin'], step['version'])
155
        cmds = [c for c in plugin.commands if c.name == step['cmd_name']]
156
        if len(cmds) != 1:
157
            raise ValueError('There is more than one command with this '
158
                             'definition %s' % str(step))
159
160
        cmd = cmds[0]
161
        parameters = []
162
        for dps in cmd.default_parameter_sets:
163
            if dps.name in step['parameters_names']:
164
                # note that for simplicity we are converting all values in the
165
                # parameters to string
166
                parameters.append({'id': dps.id, 'values': {
167
                    k: str(v) for k, v in dps.values.items()}})
168
169
        commands.append({
170
            'command': cmd,
171
            'command-name': cmd.name,
172
            'previous-step': step['previous-step'],
173
            'parent_artifact_name': step['parent_artifact_name'],
174
            'input_name': step['input_name'],
175
            'ignore_parameters': step['ignore_parameters'],
176
            'parameters': parameters})
177
178
    # Step 2. - for children. Get their commands. We currently only support
179
    #         processing for 2 levels, like:
180
    #         -> Trim -> Deblur
181
    #                 -> Close reference
182
    #         which should be fine for now as all our pipelines only
183
    #         have 2 levels
184
    children_cmds = [c for c in commands[1:]
185
                     if c['previous-step'] == commands[0]['command-name']]
186
187
    # Step 3. Find all preparations/artifacts that we can add the pipeline
188
    #         ... as a first pass we will only process study 10317 (AGP) ...
189
    # artifacts_all = [a for study in Study.iter()
190
    artifacts_all = [a for study in [Study(10317)]
191
                     # loop over all artifacts of artifact_type with in study
192
                     for a in study.artifacts(
193
                         artifact_type=pipeline['artifact_type'])
194
                     if _check_previous_command(
195
                         pipeline['previous-step'], a.processing_parameters)]
196
197
    # Step 4. Limit all_artifacts to those within restrictions
198
    artifacts_compliant = []
199
    for a in artifacts_all:
200
        st = a.study.sample_template
201
        pts = a.prep_templates
202
        if not pts:
203
            continue
204
        pt = pts[0]
205
206
        # {'sandbox', 'awaiting_approval', 'private', 'public'}
207
        if a.visibility in ('sandbox', 'awaiting_approval'):
208
            continue
209
210
        if pt.data_type() not in pipeline['data_type']:
211
            continue
212
213
        reqs = pipeline['requirements']
214
        if 'sample' in reqs and not _check_requirements(reqs['sample'], st):
215
            continue
216
        if 'prep' in reqs and not _check_requirements(reqs['prep'], pt):
217
            continue
218
219
        artifacts_compliant.append(a)
220
221
    # Step 5a. Limit artifacts_compliant to those artifacts missing the command
222
    #          and parameters of this pipeline. Note that this could be part
223
    #          of Step 4 but for debugging it makes sense to separate
224
    artifact_process = []
225
    children_compliant = []
226
    cmd = commands[0]
227
    for a in artifacts_compliant:
228
        # getting all jobs, includen hiddens, in case the job failed
229
        jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
230
        params = _check_parameters(jobs, cmd)
231
232
        # checking that all required parameters of this command exist
233
        missing_parameters = []
234
        for p in cmd['parameters']:
235
            p = p['values']
236
            p.update({cmd['input_name']: str(a.id)})
237
            p_to_compare = p.copy()
238
            for k in cmd['ignore_parameters']:
239
                del p_to_compare[k]
240
            if p_to_compare not in params:
241
                missing_parameters.append(p)
242
            else:
243
                for c in a.children:
244
                    cpp = c.processing_parameters
245
                    if cpp.command.name == cmd['command-name']:
246
                        cparams = _check_parameters([cpp], cmd)
247
                        if cparams == p_to_compare:
248
                            children_compliant.append(c)
249
        if missing_parameters:
250
            # note that we are building a dict for each artifact so we can
251
            # save the workflow id, useful for when we run this in a terminal
252
            # and we want to follow up on those workflows
253
            artifact_process.append({'workflow': None, 'artifact': a,
254
                                     'missing_parameters': missing_parameters,
255
                                     'cmd_id': 0})
256
257
    # Step 5b. Add workflow/commands for children
258
    for a in children_compliant:
259
        for cmd_id, cmd in enumerate(children_cmds):
260
            # getting all jobs, includen hiddens, in case the job failed
261
            jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
262
            params = _check_parameters(jobs, cmd)
263
264
            # checking that all required parameters of this command exist
265
            missing_parameters = []
266
            for p in cmd['parameters']:
267
                p = p['values']
268
                p.update({cmd['input_name']: str(a.id)})
269
                p_to_compare = p.copy()
270
                for k in cmd['ignore_parameters']:
271
                    del p_to_compare[k]
272
273
                if p_to_compare not in params:
274
                    missing_parameters.append(p)
275
            if missing_parameters:
276
                artifact_process.append(
277
                    {'workflow': None, 'artifact': a,
278
                     'missing_parameters': missing_parameters,
279
                     'cmd_id': cmd_id + 1})
280
281
    # Step 6: add workflows and jobs
282
    # max processing will be useful for debugging as it allows to stop after
283
    # any number of artifact_process
284
    max_processing = len(artifact_process)
285
286
    for i, artifact in enumerate(artifact_process):
287
        if i >= max_processing:
288
            break
289
290
        if artifact['workflow'] is not None:
291
            continue
292
293
        a = artifact['artifact']
294
        cmd_id = artifact['cmd_id']
295
        # create the first-job/workflow with the first command and the first
296
        # set of parameters
297
        cmd = commands[cmd_id]
298
        params = artifact['missing_parameters'][0]
299
        params.update({cmd['input_name']: str(a.id)})
300
        job_params = Parameters.load(cmd['command'], values_dict=params)
301
302
        artifact['workflow'] = ProcessingWorkflow.from_scratch(
303
            user, job_params)
304
305
        # now we can add the rest of the parameters to the workflow for
306
        # the first command
307
        for params in artifact['missing_parameters'][1:]:
308
            job_params = Parameters.load(cmd['command'], values_dict=params)
309
            artifact['workflow'].add(
310
                job_params, req_params={cmd['input_name']: str(a.id)})
311
312
        for cmd in commands[cmd_id + 1:]:
313
            # get jobs from the workflow to which we can add this new command
314
            previous_jobs = [j for j in artifact['workflow'].graph.nodes()
315
                             if j.command.name == cmd['previous-step']]
316
            for job in previous_jobs:
317
                for params in cmd['parameters']:
318
                    params = params['values']
319
                    params.update({cmd['input_name']: '%s%s' % (
320
                        job.id, cmd['parent_artifact_name'])})
321
                    job_params = Parameters.load(
322
                        cmd['command'], values_dict=params)
323
324
                    artifact['workflow'].add(job_params, connections={job: {
325
                        cmd['parent_artifact_name']: cmd['input_name']}})
326
327
    # Step 7. submit the workflows!
328
    _submit_workflows(artifact_process)