Switch to unified view

a b/qiita_db/handlers/processing_job.py
1
# -----------------------------------------------------------------------------
2
# Copyright (c) 2014--, The Qiita Development Team.
3
#
4
# Distributed under the terms of the BSD 3-clause License.
5
#
6
# The full license is in the file LICENSE, distributed with this software.
7
# -----------------------------------------------------------------------------
8
9
from json import loads
10
11
from tornado.web import HTTPError
12
13
import qiita_db as qdb
14
from .oauth2 import OauthBaseHandler, authenticate_oauth
15
16
17
def _get_job(job_id):
18
    """Returns the job with the given id if it exists
19
20
    Parameters
21
    ----------
22
    job_id : str
23
        The job id to check
24
25
    Returns
26
    -------
27
    qiita_db.processing_job.ProcessingJob
28
        The requested job
29
30
    Raises
31
    ------
32
    HTTPError
33
        If the job does not exist, with error code 404
34
        If there is a problem instantiating the processing job, with error
35
        code 500
36
    """
37
    if not qdb.processing_job.ProcessingJob.exists(job_id):
38
        raise HTTPError(404)
39
40
    try:
41
        job = qdb.processing_job.ProcessingJob(job_id)
42
    except Exception as e:
43
        raise HTTPError(500, reason='Error instantiating the job: %s' % str(e))
44
45
    return job
46
47
48
class JobHandler(OauthBaseHandler):
49
    @authenticate_oauth
50
    def get(self, job_id):
51
        """Get the job information
52
53
        Parameters
54
        ----------
55
        job_id : str
56
            The job id
57
58
        Returns
59
        -------
60
        dict
61
            {'command': str,
62
             'parameters': dict of {str, obj},
63
             'status': str}
64
             - command: the name of the command that the job executes
65
             - parameters: the parameters of the command, keyed by parameter
66
             name
67
             - status: the status of the job
68
        """
69
        with qdb.sql_connection.TRN:
70
            job = _get_job(job_id)
71
            cmd = job.command.name
72
            params = job.parameters.values
73
            status = job.status
74
            msg = '' if status != 'error' else job.log.msg
75
76
        response = {'command': cmd, 'parameters': params,
77
                    'status': status, 'msg': msg}
78
79
        self.write(response)
80
81
82
class HeartbeatHandler(OauthBaseHandler):
83
    @authenticate_oauth
84
    def post(self, job_id):
85
        """Update the heartbeat timestamp of the job
86
87
        Parameters
88
        ----------
89
        job_id : str
90
            The job id
91
        """
92
        with qdb.sql_connection.TRN:
93
            job = _get_job(job_id)
94
95
            try:
96
                job.update_heartbeat_state()
97
            except qdb.exceptions.QiitaDBOperationNotPermittedError as e:
98
                raise HTTPError(403, reason=str(e))
99
100
        self.finish()
101
102
103
class ActiveStepHandler(OauthBaseHandler):
104
    @authenticate_oauth
105
    def post(self, job_id):
106
        """Changes the current execution step of the given job
107
108
        Parameters
109
        ----------
110
        job_id : str
111
            The job id
112
        """
113
        with qdb.sql_connection.TRN:
114
            job = _get_job(job_id)
115
            payload = loads(self.request.body)
116
            step = payload['step']
117
            try:
118
                job.step = step
119
            except qdb.exceptions.QiitaDBOperationNotPermittedError as e:
120
                raise HTTPError(403, reason=str(e))
121
122
        self.finish()
123
124
125
class CompleteHandler(OauthBaseHandler):
126
    @authenticate_oauth
127
    def post(self, job_id):
128
        """Updates the job to one of the completed statuses: 'success', 'error'
129
130
        Parameters
131
        ----------
132
        job_id : str
133
            The job to complete
134
        """
135
        with qdb.sql_connection.TRN:
136
            job = _get_job(job_id)
137
138
            if job.status != 'running':
139
                raise HTTPError(
140
                    403, "Can't complete job: not in a running state")
141
142
            qiita_plugin = qdb.software.Software.from_name_and_version(
143
                'Qiita', 'alpha')
144
            cmd = qiita_plugin.get_command('complete_job')
145
            params = qdb.software.Parameters.load(
146
                cmd, values_dict={'job_id': job_id,
147
                                  'payload': self.request.body.decode(
148
                                      'ascii')})
149
            # complete_job are unique so it is fine to force them to be created
150
            job = qdb.processing_job.ProcessingJob.create(
151
                job.user, params, force=True)
152
            job.submit()
153
154
        self.finish()
155
156
157
class ProcessingJobAPItestHandler(OauthBaseHandler):
158
    @authenticate_oauth
159
    def post(self):
160
        user = self.get_argument('user', 'test@foo.bar')
161
        s_name, s_version, cmd_name = loads(self.get_argument('command'))
162
        params_dict = self.get_argument('parameters')
163
        status = self.get_argument('status', None)
164
165
        cmd = qdb.software.Software.from_name_and_version(
166
            s_name, s_version).get_command(cmd_name)
167
168
        params = qdb.software.Parameters.load(cmd, json_str=params_dict)
169
170
        job = qdb.processing_job.ProcessingJob.create(
171
            qdb.user.User(user), params, True)
172
173
        if status:
174
            job._set_status(status)
175
176
        self.write({'job': job.id})