Switch to unified view

a b/qiita_ware/private_plugin.py
1
# -----------------------------------------------------------------------------
2
# Copyright (c) 2014--, The Qiita Development Team.
3
#
4
# Distributed under the terms of the BSD 3-clause License.
5
#
6
# The full license is in the file LICENSE, distributed with this software.
7
# -----------------------------------------------------------------------------
8
9
from json import dumps, loads
10
from sys import exc_info
11
from time import sleep
12
from os import remove
13
from os.path import join
14
import traceback
15
import warnings
16
17
import qiita_db as qdb
18
from qiita_core.qiita_settings import r_client, qiita_config
19
from qiita_ware.commands import (download_remote, list_remote,
20
                                 submit_VAMPS, submit_EBI)
21
from qiita_ware.metadata_pipeline import (
22
    create_templates_from_qiime_mapping_file)
23
from qiita_ware.exceptions import EBISubmissionError
24
25
26
def build_analysis_files(job):
27
    """Builds the files for an analysis
28
29
    Parameters
30
    ----------
31
    job : qiita_db.processing_job.ProcessingJob
32
        The processing job with the information for building the files
33
    """
34
    with qdb.sql_connection.TRN:
35
        params = job.parameters.values
36
        analysis_id = params['analysis']
37
        categories = params['categories']
38
        merge_duplicated_sample_ids = params['merge_dup_sample_ids']
39
        analysis = qdb.analysis.Analysis(analysis_id)
40
        biom_files = analysis.build_files(
41
            merge_duplicated_sample_ids, categories=categories)
42
43
        cmd = qdb.software.Command.get_validator('BIOM')
44
        val_jobs = []
45
        for dtype, biom_fp, archive_artifact_fp in biom_files:
46
            if archive_artifact_fp is not None:
47
                files = dumps({'biom': [biom_fp],
48
                               'plain_text': [archive_artifact_fp]})
49
            else:
50
                files = dumps({'biom': [biom_fp]})
51
            validate_params = qdb.software.Parameters.load(
52
                cmd, values_dict={'files': files,
53
                                  'artifact_type': 'BIOM',
54
                                  'provenance': dumps({'job': job.id,
55
                                                       'data_type': dtype}),
56
                                  'analysis': analysis_id,
57
                                  'template': None})
58
            val_jobs.append(qdb.processing_job.ProcessingJob.create(
59
                analysis.owner, validate_params, True))
60
61
        job._set_validator_jobs(val_jobs)
62
63
        for j in val_jobs:
64
            j.submit()
65
            sleep(1)
66
67
    # The validator jobs no longer finish the job automatically so we need
68
    # to release the validators here
69
    job.release_validators()
70
71
72
def release_validators(job):
73
    """Waits until all the validators of a job are completed
74
75
    Parameters
76
    ----------
77
    job : qiita_db.processing_job.ProcessingJob
78
        The processing job with the information of the parent job
79
    """
80
    qdb.processing_job.ProcessingJob(
81
        job.parameters.values['job']).release_validators()
82
    job._set_status('success')
83
84
85
def submit_to_VAMPS(job):
86
    """Submits an artifact to VAMPS
87
88
    Parameters
89
    ----------
90
    job : qiita_db.processing_job.ProcessingJob
91
        The processing job performing the task
92
    """
93
    with qdb.sql_connection.TRN:
94
        submit_VAMPS(job.parameters.values['artifact'])
95
        job._set_status('success')
96
97
98
def submit_to_EBI(job):
99
    """Submit a study to EBI
100
101
    Parameters
102
    ----------
103
    job : qiita_db.processing_job.ProcessingJob
104
        The processing job performing the task
105
    """
106
    with qdb.sql_connection.TRN:
107
        param_vals = job.parameters.values
108
        artifact_id = int(param_vals['artifact'])
109
        submission_type = param_vals['submission_type']
110
        artifact = qdb.artifact.Artifact(artifact_id)
111
112
        for info in artifact.study._ebi_submission_jobs():
113
            jid, aid, js, cbste, era = info
114
            if js in ('running', 'queued') and jid != job.id:
115
                error_msg = ("Cannot perform parallel EBI submission for "
116
                             "the same study. Current job running: %s" % js)
117
                raise EBISubmissionError(error_msg)
118
        submit_EBI(artifact_id, submission_type, True)
119
        job._set_status('success')
120
121
122
def copy_artifact(job):
123
    """Creates a copy of an artifact
124
125
    Parameters
126
    ----------
127
    job : qiita_db.processing_job.ProcessingJob
128
        The processing job performing the task
129
    """
130
    with qdb.sql_connection.TRN:
131
        param_vals = job.parameters.values
132
        orig_artifact = qdb.artifact.Artifact(param_vals['artifact'])
133
        prep_template = qdb.metadata_template.prep_template.PrepTemplate(
134
            param_vals['prep_template'])
135
        qdb.artifact.Artifact.copy(orig_artifact, prep_template)
136
        job._set_status('success')
137
138
139
def delete_artifact(job):
140
    """Deletes an artifact from the system
141
142
    Parameters
143
    ----------
144
    job : qiita_db.processing_job.ProcessingJob
145
        The processing job performing the task
146
    """
147
    with qdb.sql_connection.TRN:
148
        artifact_id = job.parameters.values['artifact']
149
        qdb.artifact.Artifact.delete(artifact_id)
150
        job._set_status('success')
151
152
153
def create_sample_template(job):
154
    """Creates a sample template
155
156
    Parameters
157
    ----------
158
    job : qiita_db.processing_job.ProcessingJob
159
        The processing job performing the task
160
    """
161
    with qdb.sql_connection.TRN:
162
        params = job.parameters.values
163
        fp = params['fp']
164
        study = qdb.study.Study(int(params['study_id']))
165
        is_mapping_file = params['is_mapping_file']
166
        data_type = params['data_type']
167
168
        with warnings.catch_warnings(record=True) as warns:
169
            if is_mapping_file:
170
                create_templates_from_qiime_mapping_file(fp, study, data_type)
171
            else:
172
                qdb.metadata_template.sample_template.SampleTemplate.create(
173
                    qdb.metadata_template.util.load_template_to_dataframe(fp),
174
                    study)
175
            remove(fp)
176
177
            if warns:
178
                msg = '\n'.join(set(str(w.message) for w in warns))
179
                r_client.set("sample_template_%s" % study.id,
180
                             dumps({'job_id': job.id, 'alert_type': 'warning',
181
                                    'alert_msg': msg}))
182
183
        job._set_status('success')
184
185
186
def update_sample_template(job):
187
    """Updates a sample template
188
189
    Parameters
190
    ----------
191
    job : qiita_db.processing_job.ProcessingJob
192
        The processing job performing the task
193
    """
194
    with qdb.sql_connection.TRN:
195
        param_vals = job.parameters.values
196
        study_id = param_vals['study']
197
        fp = param_vals['template_fp']
198
        with warnings.catch_warnings(record=True) as warns:
199
            st = qdb.metadata_template.sample_template.SampleTemplate(study_id)
200
            df = qdb.metadata_template.util.load_template_to_dataframe(fp)
201
            st.extend_and_update(df)
202
            remove(fp)
203
204
            # Join all the warning messages into one. Note that this info
205
            # will be ignored if an exception is raised
206
            if warns:
207
                msg = '\n'.join(set(str(w.message) for w in warns))
208
                r_client.set("sample_template_%s" % study_id,
209
                             dumps({'job_id': job.id, 'alert_type': 'warning',
210
                                    'alert_msg': msg}))
211
212
        job._set_status('success')
213
214
215
def delete_sample_template(job):
216
    """Deletes a sample template
217
218
    Parameters
219
    ----------
220
    job : qiita_db.processing_job.ProcessingJob
221
        The processing job performing the task
222
    """
223
    with qdb.sql_connection.TRN:
224
        qdb.metadata_template.sample_template.SampleTemplate.delete(
225
            job.parameters.values['study'])
226
        job._set_status('success')
227
228
229
def update_prep_template(job):
230
    """Updates a prep template
231
232
    Parameters
233
    ----------
234
    job : qiita_db.processing_job.ProcessingJob
235
        The processing job performing the task
236
    """
237
    with qdb.sql_connection.TRN:
238
        param_vals = job.parameters.values
239
        prep_id = param_vals['prep_template']
240
        fp = param_vals['template_fp']
241
242
        prep = qdb.metadata_template.prep_template.PrepTemplate(prep_id)
243
        with warnings.catch_warnings(record=True) as warns:
244
            df = qdb.metadata_template.util.load_template_to_dataframe(fp)
245
            prep.extend_and_update(df)
246
            remove(fp)
247
248
            # Join all the warning messages into one. Note that this info
249
            # will be ignored if an exception is raised
250
            if warns:
251
                msg = '\n'.join(set(str(w.message) for w in warns))
252
                r_client.set("prep_template_%s" % prep_id,
253
                             dumps({'job_id': job.id, 'alert_type': 'warning',
254
                                    'alert_msg': msg}))
255
256
        job._set_status('success')
257
258
259
def delete_sample_or_column(job):
260
    """Deletes a sample or a column from the metadata
261
262
    Parameters
263
    ----------
264
    job : qiita_db.processing_job.ProcessingJob
265
        The processing job performing the task
266
    """
267
    with qdb.sql_connection.TRN:
268
        param_vals = job.parameters.values
269
        obj_class = param_vals['obj_class']
270
        obj_id = param_vals['obj_id']
271
        sample_or_col = param_vals['sample_or_col']
272
        name = param_vals['name'].split(',')
273
274
        if obj_class == 'SampleTemplate':
275
            constructor = qdb.metadata_template.sample_template.SampleTemplate
276
        elif obj_class == 'PrepTemplate':
277
            constructor = qdb.metadata_template.prep_template.PrepTemplate
278
        else:
279
            raise ValueError('Unknown value "%s". Choose between '
280
                             '"SampleTemplate" and "PrepTemplate"' % obj_class)
281
282
        if sample_or_col == 'columns':
283
            del_func = constructor(obj_id).delete_column
284
            name = name[0]
285
        elif sample_or_col == 'samples':
286
            del_func = constructor(obj_id).delete_samples
287
        else:
288
            raise ValueError('Unknown value "%s". Choose between "samples" '
289
                             'and "columns"' % sample_or_col)
290
291
        del_func(name)
292
        job._set_status('success')
293
294
295
def delete_study(job):
296
    """Deletes a full study
297
298
    Parameters
299
    ----------
300
    job : qiita_db.processing_job.ProcessingJob
301
        The processing job performing the task
302
    """
303
    MT = qdb.metadata_template
304
    with qdb.sql_connection.TRN:
305
        study_id = job.parameters.values['study']
306
        study = qdb.study.Study(study_id)
307
308
        # deleting analyses
309
        for analysis in study.analyses():
310
            qdb.analysis.Analysis.delete_analysis_artifacts(analysis.id)
311
312
        for pt in study.prep_templates():
313
            if pt.artifact is not None:
314
                # Artifact.delete will delete descendants so just delete
315
                # the root
316
                qdb.artifact.Artifact.delete(pt.artifact.id)
317
            MT.prep_template.PrepTemplate.delete(pt.id)
318
319
        if MT.sample_template.SampleTemplate.exists(study_id):
320
            MT.sample_template.SampleTemplate.delete(study_id)
321
322
        qdb.study.Study.delete(study_id)
323
324
        job._set_status('success')
325
326
327
def complete_job(job):
328
    """Completes a job
329
330
    Parameters
331
    ----------
332
    job : qiita_db.processing_job.ProcessingJob
333
        The processing job performing the task
334
    """
335
    with qdb.sql_connection.TRN:
336
        param_vals = job.parameters.values
337
        payload = loads(param_vals['payload'])
338
        if payload['success']:
339
            artifacts = payload['artifacts']
340
            error = None
341
        else:
342
            artifacts = None
343
            error = payload['error']
344
        c_job = qdb.processing_job.ProcessingJob(param_vals['job_id'])
345
        c_job.step = 'Completing via %s [%s]' % (job.id, job.external_id)
346
        try:
347
            c_job.complete(payload['success'], artifacts, error)
348
        except Exception:
349
            c_job._set_error(traceback.format_exception(*exc_info()))
350
351
        job._set_status('success')
352
353
    if 'archive' in payload:
354
        pass
355
        # ToDo: Archive
356
        # features = payload['archive']
357
        # here we should call the method from the command to archive
358
359
360
def delete_analysis(job):
361
    """Deletes a full analysis
362
363
    Parameters
364
    ----------
365
    job : qiita_db.processing_job.ProcessingJob
366
        The processing job performing the task
367
    """
368
    with qdb.sql_connection.TRN:
369
        analysis_id = job.parameters.values['analysis_id']
370
        qdb.analysis.Analysis.delete_analysis_artifacts(analysis_id)
371
372
        r_client.delete('analysis_delete_%d' % analysis_id)
373
374
        job._set_status('success')
375
376
377
def list_remote_files(job):
378
    """Lists valid study files on a remote server
379
380
    Parameters
381
    ----------
382
    job : qiita_db.processing_job.ProcessingJob
383
        The processing job performing the task
384
    """
385
    with qdb.sql_connection.TRN:
386
        url = job.parameters.values['url']
387
        private_key = job.parameters.values['private_key']
388
        study_id = job.parameters.values['study_id']
389
        try:
390
            files = list_remote(url, private_key)
391
            r_client.set("upload_study_%s" % study_id,
392
                         dumps({'job_id': job.id, 'url': url, 'files': files}))
393
        except Exception:
394
            job._set_error(traceback.format_exception(*exc_info()))
395
        else:
396
            job._set_status('success')
397
398
399
def download_remote_files(job):
400
    """Downloads valid study files from a remote server
401
402
    Parameters
403
    ----------
404
    job : qiita_db.processing_job.ProcessingJob
405
        The processing job performing the task
406
    """
407
    with qdb.sql_connection.TRN:
408
        url = job.parameters.values['url']
409
        destination = job.parameters.values['destination']
410
        private_key = job.parameters.values['private_key']
411
        try:
412
            download_remote(url, private_key, destination)
413
        except Exception:
414
            job._set_error(traceback.format_exception(*exc_info()))
415
        else:
416
            job._set_status('success')
417
418
419
def INSDC_download(job):
420
    """Download an accession from INSDC
421
422
    Parameters
423
    ----------
424
    job : qiita_db.processing_job.ProcessingJob
425
        The processing job performing the task
426
    """
427
    with qdb.sql_connection.TRN:
428
        param_vals = job.parameters.values
429
        download_source = param_vals['download_source']
430
        accession = param_vals['accession']
431
432
        if job.user.level != 'admin':
433
            job._set_error('INSDC_download is only for administrators')
434
435
        job_dir = join(qiita_config.working_dir, job.id)
436
        qdb.util.create_nested_path(job_dir)
437
438
        # code doing something
439
        print(download_source, accession)
440
441
        job._set_status('success')
442
443
444
TASK_DICT = {'build_analysis_files': build_analysis_files,
445
             'release_validators': release_validators,
446
             'submit_to_VAMPS': submit_to_VAMPS,
447
             'submit_to_EBI': submit_to_EBI,
448
             'copy_artifact': copy_artifact,
449
             'delete_artifact': delete_artifact,
450
             'create_sample_template': create_sample_template,
451
             'update_sample_template': update_sample_template,
452
             'delete_sample_template': delete_sample_template,
453
             'update_prep_template': update_prep_template,
454
             'delete_sample_or_column': delete_sample_or_column,
455
             'delete_study': delete_study,
456
             'complete_job': complete_job,
457
             'delete_analysis': delete_analysis,
458
             'list_remote_files': list_remote_files,
459
             'download_remote_files': download_remote_files,
460
             'INSDC_download': INSDC_download}
461
462
463
def private_task(job_id):
464
    """Completes a Qiita private task
465
466
    Parameters
467
    ----------
468
    job_id : str
469
        The job id
470
    """
471
    if job_id == 'register':
472
        # We don't need to do anything here if Qiita is registering plugins
473
        return
474
475
    job = qdb.processing_job.ProcessingJob(job_id)
476
    job.update_heartbeat_state()
477
    task_name = job.command.name
478
479
    try:
480
        TASK_DICT[task_name](job)
481
    except Exception as e:
482
        log_msg = "Error on job %s: %s" % (
483
            job.id, ''.join(traceback.format_exception(*exc_info())))
484
        le = qdb.logger.LogEntry.create('Runtime', log_msg)
485
        job.complete(False, error="Error (log id: %d): %s" % (le.id, e))