[973924]: / qiita_db / handlers / processing_job.py

Download this file

177 lines (138 with data), 5.1 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from json import loads
from tornado.web import HTTPError
import qiita_db as qdb
from .oauth2 import OauthBaseHandler, authenticate_oauth
def _get_job(job_id):
"""Returns the job with the given id if it exists
Parameters
----------
job_id : str
The job id to check
Returns
-------
qiita_db.processing_job.ProcessingJob
The requested job
Raises
------
HTTPError
If the job does not exist, with error code 404
If there is a problem instantiating the processing job, with error
code 500
"""
if not qdb.processing_job.ProcessingJob.exists(job_id):
raise HTTPError(404)
try:
job = qdb.processing_job.ProcessingJob(job_id)
except Exception as e:
raise HTTPError(500, reason='Error instantiating the job: %s' % str(e))
return job
class JobHandler(OauthBaseHandler):
@authenticate_oauth
def get(self, job_id):
"""Get the job information
Parameters
----------
job_id : str
The job id
Returns
-------
dict
{'command': str,
'parameters': dict of {str, obj},
'status': str}
- command: the name of the command that the job executes
- parameters: the parameters of the command, keyed by parameter
name
- status: the status of the job
"""
with qdb.sql_connection.TRN:
job = _get_job(job_id)
cmd = job.command.name
params = job.parameters.values
status = job.status
msg = '' if status != 'error' else job.log.msg
response = {'command': cmd, 'parameters': params,
'status': status, 'msg': msg}
self.write(response)
class HeartbeatHandler(OauthBaseHandler):
@authenticate_oauth
def post(self, job_id):
"""Update the heartbeat timestamp of the job
Parameters
----------
job_id : str
The job id
"""
with qdb.sql_connection.TRN:
job = _get_job(job_id)
try:
job.update_heartbeat_state()
except qdb.exceptions.QiitaDBOperationNotPermittedError as e:
raise HTTPError(403, reason=str(e))
self.finish()
class ActiveStepHandler(OauthBaseHandler):
@authenticate_oauth
def post(self, job_id):
"""Changes the current execution step of the given job
Parameters
----------
job_id : str
The job id
"""
with qdb.sql_connection.TRN:
job = _get_job(job_id)
payload = loads(self.request.body)
step = payload['step']
try:
job.step = step
except qdb.exceptions.QiitaDBOperationNotPermittedError as e:
raise HTTPError(403, reason=str(e))
self.finish()
class CompleteHandler(OauthBaseHandler):
@authenticate_oauth
def post(self, job_id):
"""Updates the job to one of the completed statuses: 'success', 'error'
Parameters
----------
job_id : str
The job to complete
"""
with qdb.sql_connection.TRN:
job = _get_job(job_id)
if job.status != 'running':
raise HTTPError(
403, "Can't complete job: not in a running state")
qiita_plugin = qdb.software.Software.from_name_and_version(
'Qiita', 'alpha')
cmd = qiita_plugin.get_command('complete_job')
params = qdb.software.Parameters.load(
cmd, values_dict={'job_id': job_id,
'payload': self.request.body.decode(
'ascii')})
# complete_job are unique so it is fine to force them to be created
job = qdb.processing_job.ProcessingJob.create(
job.user, params, force=True)
job.submit()
self.finish()
class ProcessingJobAPItestHandler(OauthBaseHandler):
@authenticate_oauth
def post(self):
user = self.get_argument('user', 'test@foo.bar')
s_name, s_version, cmd_name = loads(self.get_argument('command'))
params_dict = self.get_argument('parameters')
status = self.get_argument('status', None)
cmd = qdb.software.Software.from_name_and_version(
s_name, s_version).get_command(cmd_name)
params = qdb.software.Parameters.load(cmd, json_str=params_dict)
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User(user), params, True)
if status:
job._set_status(status)
self.write({'job': job.id})