|
a |
|
b/scripts/qiita-recover-jobs |
|
|
1 |
#!/usr/bin/env python |
|
|
2 |
|
|
|
3 |
# ----------------------------------------------------------------------------- |
|
|
4 |
# Copyright (c) 2014--, The Qiita Development Team. |
|
|
5 |
# |
|
|
6 |
# Distributed under the terms of the BSD 3-clause License. |
|
|
7 |
# |
|
|
8 |
# The full license is in the file LICENSE, distributed with this software. |
|
|
9 |
# ----------------------------------------------------------------------------- |
|
|
10 |
from subprocess import check_output |
|
|
11 |
from qiita_db.sql_connection import TRN |
|
|
12 |
from qiita_db.processing_job import ProcessingJob |
|
|
13 |
import pandas as pd |
|
|
14 |
from time import sleep |
|
|
15 |
from math import ceil |
|
|
16 |
from io import StringIO |
|
|
17 |
|
|
|
18 |
|
|
|
19 |
SLEEP_TIME = 6 |
|
|
20 |
CHANCES = 3 |
|
|
21 |
SQL = """SELECT processing_job_id |
|
|
22 |
FROM qiita.processing_job |
|
|
23 |
JOIN qiita.processing_job_status |
|
|
24 |
USING (processing_job_status_id) |
|
|
25 |
WHERE processing_job_status = %s""" |
|
|
26 |
|
|
|
27 |
|
|
|
28 |
def _submit_jobs(jids_to_recover, recover_type): |
|
|
29 |
# we are going to split the SLEEP_TIME by CHANCES so we can ctrl-c |
|
|
30 |
# ... just in case |
|
|
31 |
st = int(ceil(SLEEP_TIME/CHANCES)) |
|
|
32 |
len_jids_to_recover = len(jids_to_recover) |
|
|
33 |
for i, j in enumerate(jids_to_recover): |
|
|
34 |
print(f'recovering {j} {recover_type}: {len_jids_to_recover}/{i}') |
|
|
35 |
job = ProcessingJob(j) |
|
|
36 |
job._set_status('in_construction') |
|
|
37 |
job.submit() |
|
|
38 |
for i in range(CHANCES): |
|
|
39 |
print('You can ctrl-c now, iteration %d' % i) |
|
|
40 |
sleep(st) |
|
|
41 |
|
|
|
42 |
|
|
|
43 |
def _retrieve_queue_jobs(): |
|
|
44 |
# getting all the jobs in the queues |
|
|
45 |
all_jobs = pd.read_csv(StringIO( |
|
|
46 |
check_output(['squeue', '-o', '%all']).decode('ascii')), sep='|') |
|
|
47 |
|
|
|
48 |
# just keeping the qiita jobs |
|
|
49 |
jobs = all_jobs[all_jobs.GROUP == 'qiita'] |
|
|
50 |
|
|
|
51 |
# ignore the merge-jobs and get unique values |
|
|
52 |
qiita_jids = jobs.NAME.str.replace('merge-', '').unique() |
|
|
53 |
qiita_jids = [x.replace( |
|
|
54 |
'finish-', '').replace('.txt', '') for x in qiita_jids] |
|
|
55 |
|
|
|
56 |
return set(qiita_jids) |
|
|
57 |
|
|
|
58 |
|
|
|
59 |
def _get_jids_to_recover(recover_type): |
|
|
60 |
with TRN: |
|
|
61 |
TRN.add(SQL, [recover_type]) |
|
|
62 |
jids = set(TRN.execute_fetchflatten()) |
|
|
63 |
jids_to_recover = list(jids - _retrieve_queue_jobs()) |
|
|
64 |
print('Total %s: %d' % (recover_type, len(jids_to_recover))) |
|
|
65 |
return jids_to_recover |
|
|
66 |
|
|
|
67 |
|
|
|
68 |
def _qiita_queue_log_parse(jids_to_recover): |
|
|
69 |
results = [] |
|
|
70 |
for jid in jids_to_recover: |
|
|
71 |
job = ProcessingJob(jid) |
|
|
72 |
if job.external_id: |
|
|
73 |
bvals = pd.read_csv(StringIO(check_output([ |
|
|
74 |
'sacct', '-p', |
|
|
75 |
'--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw', |
|
|
76 |
'-j', f'{job.external_id}.batch']).decode( |
|
|
77 |
'ascii')), sep='|').iloc[0].to_dict() |
|
|
78 |
vals = pd.read_csv(StringIO(check_output([ |
|
|
79 |
'sacct', '-p', |
|
|
80 |
'--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw', |
|
|
81 |
'-j', f'{job.external_id}']).decode( |
|
|
82 |
'ascii')), sep='|').iloc[0].to_dict() |
|
|
83 |
data = { |
|
|
84 |
'exit-code': bvals['ExitCode'], |
|
|
85 |
'mem-requested': bvals['ReqMem'], |
|
|
86 |
'time-requested': vals['TimelimitRaw'], |
|
|
87 |
'mem-used': bvals['MaxRSS'], |
|
|
88 |
'time-used': bvals['CPUTimeRAW']} |
|
|
89 |
else: |
|
|
90 |
data = { |
|
|
91 |
'exit-code': None, |
|
|
92 |
'mem-requested': None, |
|
|
93 |
'time-requested': None, |
|
|
94 |
'mem-used': None, |
|
|
95 |
'time-used': None} |
|
|
96 |
results.append(job, data) |
|
|
97 |
|
|
|
98 |
return results |
|
|
99 |
|
|
|
100 |
|
|
|
101 |
def _flush_queues(recover_type): |
|
|
102 |
# README 1: in theory we should be able to submit all recover_type jobs |
|
|
103 |
# one after the other but in reality that's not possible. The issue |
|
|
104 |
# is that a job is going to stay as running/waiting until is completed. |
|
|
105 |
# Thus, we need to run complete_job first, wait for everything to finish, |
|
|
106 |
# then continue with validate, then release_validators, and |
|
|
107 |
# finally everything else. Note that is suggested to wait for the |
|
|
108 |
# full recovery type to finish before moving to the next one |
|
|
109 |
# README 2: we now have a logging file for all submitted jobs, so let's |
|
|
110 |
# start checking for those that failed for system crashes or cause the |
|
|
111 |
# workers were busy, error-codes: 1-2 |
|
|
112 |
|
|
|
113 |
# first start with completing jobs that are not running |
|
|
114 |
jids_to_recover = _get_jids_to_recover(recover_type) |
|
|
115 |
review_jobs = _qiita_queue_log_parse(jids_to_recover) |
|
|
116 |
jids_review_jobs = [j.id for j, r in review_jobs |
|
|
117 |
if {rr['exit-code'] for rr in r} == {'1'}] |
|
|
118 |
_submit_jobs(jids_review_jobs, recover_type + '/queue_log/1') |
|
|
119 |
|
|
|
120 |
jids_to_recover = _get_jids_to_recover(recover_type) |
|
|
121 |
review_jobs = _qiita_queue_log_parse(jids_to_recover) |
|
|
122 |
jids_review_jobs = [j.id for j, r in review_jobs |
|
|
123 |
if {rr['exit-code'] for rr in r} == {'0'}] |
|
|
124 |
_submit_jobs(jids_review_jobs, recover_type + '/queue_log/0') |
|
|
125 |
|
|
|
126 |
jids_to_recover = _get_jids_to_recover(recover_type) |
|
|
127 |
complete_job = [j for j in jids_to_recover |
|
|
128 |
if ProcessingJob(j).command.name == 'complete_job'] |
|
|
129 |
_submit_jobs(complete_job, recover_type + '/complete_job') |
|
|
130 |
|
|
|
131 |
# first start validators that are not running |
|
|
132 |
jids_to_recover = _get_jids_to_recover(recover_type) |
|
|
133 |
validate = [j for j in jids_to_recover |
|
|
134 |
if ProcessingJob(j).command.name == 'Validate'] |
|
|
135 |
_submit_jobs(validate, recover_type + '/validate') |
|
|
136 |
|
|
|
137 |
# then the release validator |
|
|
138 |
jids_to_recover = _get_jids_to_recover(recover_type) |
|
|
139 |
release_validators = [ |
|
|
140 |
j for j in jids_to_recover |
|
|
141 |
if ProcessingJob(j).command.name == 'release_validators'] |
|
|
142 |
_submit_jobs(release_validators, recover_type + '/release_validators') |
|
|
143 |
|
|
|
144 |
|
|
|
145 |
def qiita_recover_jobs(): |
|
|
146 |
# general full processing pipeline, as an example a deblur job as it yields |
|
|
147 |
# two artifacts, each new line represents a new job, each idented block a |
|
|
148 |
# waiting job |
|
|
149 |
# -> deblur |
|
|
150 |
# -> complete_job -> release_validator |
|
|
151 |
# -> validate biom 1 |
|
|
152 |
# -> release_validator |
|
|
153 |
# -> complete_job -> create artifact |
|
|
154 |
# -> validate biom 2 |
|
|
155 |
# -> release_validator |
|
|
156 |
# -> complete_job -> create artifact |
|
|
157 |
|
|
|
158 |
# Step 1: recover jobs that are in queue status |
|
|
159 |
recover_type = 'queued' |
|
|
160 |
_flush_queues(recover_type) |
|
|
161 |
|
|
|
162 |
# then we recover what's left |
|
|
163 |
jids_to_recover = _get_jids_to_recover(recover_type) |
|
|
164 |
_submit_jobs(jids_to_recover, recover_type) |
|
|
165 |
|
|
|
166 |
# Step 2: recover jobs that are running, note that there are several steps |
|
|
167 |
# to recover this group: 2.1. check if they have validators, |
|
|
168 |
# 2.2. if so, recover validators, 2. recover failed jobs |
|
|
169 |
with TRN: |
|
|
170 |
recover_type = 'running' |
|
|
171 |
_flush_queues(recover_type) |
|
|
172 |
jids_to_recover = _get_jids_to_recover(recover_type) |
|
|
173 |
|
|
|
174 |
# 3.1, and 3.2: checking which jobs have validators, and recover them |
|
|
175 |
jobs_with_validators = [] |
|
|
176 |
for j in jids_to_recover: |
|
|
177 |
job = ProcessingJob(j) |
|
|
178 |
validators = list(job.validator_jobs) |
|
|
179 |
if not validators: |
|
|
180 |
jobs_with_validators.append(j) |
|
|
181 |
continue |
|
|
182 |
else: |
|
|
183 |
# adding validators to jobs_with_validators to ignore them |
|
|
184 |
# in the next code of block |
|
|
185 |
for vj in validators: |
|
|
186 |
jobs_with_validators.append(vj.id) |
|
|
187 |
status = set([v.status for v in validators |
|
|
188 |
if v.id not in _retrieve_queue_jobs()]) |
|
|
189 |
# if there are no status, that means that the validators weren't |
|
|
190 |
# created and we should rerun from scratch (Step 4) |
|
|
191 |
if not bool(status): |
|
|
192 |
continue |
|
|
193 |
# it multiple status in the validators, it's a complex behaivor |
|
|
194 |
# and needs a case by case solution |
|
|
195 |
if len(status) != 1: |
|
|
196 |
print("Job '%s' has too many validators status (%d), check " |
|
|
197 |
"them by hand" % (j, len(status))) |
|
|
198 |
continue |
|
|
199 |
status = list(status)[0] |
|
|
200 |
|
|
|
201 |
if status == 'waiting': |
|
|
202 |
print("releasing job validators: %s" % j) |
|
|
203 |
try: |
|
|
204 |
job.release_validators() |
|
|
205 |
except Exception: |
|
|
206 |
print("ERROR, releasing %s validators" % j) |
|
|
207 |
sleep(SLEEP_TIME) |
|
|
208 |
elif status == 'running': |
|
|
209 |
_submit_jobs(validators, recover_type + ' validator, running') |
|
|
210 |
elif status == 'error': |
|
|
211 |
# in this case is the same process than before but we need |
|
|
212 |
# to split the set in_construction and submit in 2 steps, |
|
|
213 |
# however, we can still submit via _submit_jobs |
|
|
214 |
for v in validators: |
|
|
215 |
vjob = ProcessingJob(v) |
|
|
216 |
vjob._set_status('in_construction') |
|
|
217 |
_submit_jobs(validators, recover_type + ' validator, error') |
|
|
218 |
else: |
|
|
219 |
print("Check the status of this job %s : %s and validators" |
|
|
220 |
"%s." % (j, status, validators)) |
|
|
221 |
|
|
|
222 |
jids_to_recover = set(jids_to_recover) - set(jobs_with_validators) |
|
|
223 |
|
|
|
224 |
# Step 3: Finally, we recover all the leftover jobs |
|
|
225 |
for i, j in enumerate(jids_to_recover): |
|
|
226 |
job = ProcessingJob(j) |
|
|
227 |
status = job.status |
|
|
228 |
|
|
|
229 |
if status == 'waiting': |
|
|
230 |
print("releasing job validators: %s" % j) |
|
|
231 |
job.release_validators() |
|
|
232 |
sleep(SLEEP_TIME) |
|
|
233 |
elif 'running' == status: |
|
|
234 |
_submit_jobs([j], 'main_job, running') |
|
|
235 |
|
|
|
236 |
|
|
|
237 |
if __name__ == '__main__': |
|
|
238 |
raise ValueError('This script should never be called directly but should ' |
|
|
239 |
'be used as a reference if we need to recover jobs, ' |
|
|
240 |
'see: qiita_recover_jobs') |