241 lines (206 with data), 9.7 kB
#!/usr/bin/env python
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from subprocess import check_output
from qiita_db.sql_connection import TRN
from qiita_db.processing_job import ProcessingJob
import pandas as pd
from time import sleep
from math import ceil
from io import StringIO
SLEEP_TIME = 6
CHANCES = 3
SQL = """SELECT processing_job_id
FROM qiita.processing_job
JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE processing_job_status = %s"""
def _submit_jobs(jids_to_recover, recover_type):
# we are going to split the SLEEP_TIME by CHANCES so we can ctrl-c
# ... just in case
st = int(ceil(SLEEP_TIME/CHANCES))
len_jids_to_recover = len(jids_to_recover)
for i, j in enumerate(jids_to_recover):
print(f'recovering {j} {recover_type}: {len_jids_to_recover}/{i}')
job = ProcessingJob(j)
job._set_status('in_construction')
job.submit()
for i in range(CHANCES):
print('You can ctrl-c now, iteration %d' % i)
sleep(st)
def _retrieve_queue_jobs():
# getting all the jobs in the queues
all_jobs = pd.read_csv(StringIO(
check_output(['squeue', '-o', '%all']).decode('ascii')), sep='|')
# just keeping the qiita jobs
jobs = all_jobs[all_jobs.GROUP == 'qiita']
# ignore the merge-jobs and get unique values
qiita_jids = jobs.NAME.str.replace('merge-', '').unique()
qiita_jids = [x.replace(
'finish-', '').replace('.txt', '') for x in qiita_jids]
return set(qiita_jids)
def _get_jids_to_recover(recover_type):
with TRN:
TRN.add(SQL, [recover_type])
jids = set(TRN.execute_fetchflatten())
jids_to_recover = list(jids - _retrieve_queue_jobs())
print('Total %s: %d' % (recover_type, len(jids_to_recover)))
return jids_to_recover
def _qiita_queue_log_parse(jids_to_recover):
results = []
for jid in jids_to_recover:
job = ProcessingJob(jid)
if job.external_id:
bvals = pd.read_csv(StringIO(check_output([
'sacct', '-p',
'--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw',
'-j', f'{job.external_id}.batch']).decode(
'ascii')), sep='|').iloc[0].to_dict()
vals = pd.read_csv(StringIO(check_output([
'sacct', '-p',
'--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw',
'-j', f'{job.external_id}']).decode(
'ascii')), sep='|').iloc[0].to_dict()
data = {
'exit-code': bvals['ExitCode'],
'mem-requested': bvals['ReqMem'],
'time-requested': vals['TimelimitRaw'],
'mem-used': bvals['MaxRSS'],
'time-used': bvals['CPUTimeRAW']}
else:
data = {
'exit-code': None,
'mem-requested': None,
'time-requested': None,
'mem-used': None,
'time-used': None}
results.append(job, data)
return results
def _flush_queues(recover_type):
# README 1: in theory we should be able to submit all recover_type jobs
# one after the other but in reality that's not possible. The issue
# is that a job is going to stay as running/waiting until is completed.
# Thus, we need to run complete_job first, wait for everything to finish,
# then continue with validate, then release_validators, and
# finally everything else. Note that is suggested to wait for the
# full recovery type to finish before moving to the next one
# README 2: we now have a logging file for all submitted jobs, so let's
# start checking for those that failed for system crashes or cause the
# workers were busy, error-codes: 1-2
# first start with completing jobs that are not running
jids_to_recover = _get_jids_to_recover(recover_type)
review_jobs = _qiita_queue_log_parse(jids_to_recover)
jids_review_jobs = [j.id for j, r in review_jobs
if {rr['exit-code'] for rr in r} == {'1'}]
_submit_jobs(jids_review_jobs, recover_type + '/queue_log/1')
jids_to_recover = _get_jids_to_recover(recover_type)
review_jobs = _qiita_queue_log_parse(jids_to_recover)
jids_review_jobs = [j.id for j, r in review_jobs
if {rr['exit-code'] for rr in r} == {'0'}]
_submit_jobs(jids_review_jobs, recover_type + '/queue_log/0')
jids_to_recover = _get_jids_to_recover(recover_type)
complete_job = [j for j in jids_to_recover
if ProcessingJob(j).command.name == 'complete_job']
_submit_jobs(complete_job, recover_type + '/complete_job')
# first start validators that are not running
jids_to_recover = _get_jids_to_recover(recover_type)
validate = [j for j in jids_to_recover
if ProcessingJob(j).command.name == 'Validate']
_submit_jobs(validate, recover_type + '/validate')
# then the release validator
jids_to_recover = _get_jids_to_recover(recover_type)
release_validators = [
j for j in jids_to_recover
if ProcessingJob(j).command.name == 'release_validators']
_submit_jobs(release_validators, recover_type + '/release_validators')
def qiita_recover_jobs():
# general full processing pipeline, as an example a deblur job as it yields
# two artifacts, each new line represents a new job, each idented block a
# waiting job
# -> deblur
# -> complete_job -> release_validator
# -> validate biom 1
# -> release_validator
# -> complete_job -> create artifact
# -> validate biom 2
# -> release_validator
# -> complete_job -> create artifact
# Step 1: recover jobs that are in queue status
recover_type = 'queued'
_flush_queues(recover_type)
# then we recover what's left
jids_to_recover = _get_jids_to_recover(recover_type)
_submit_jobs(jids_to_recover, recover_type)
# Step 2: recover jobs that are running, note that there are several steps
# to recover this group: 2.1. check if they have validators,
# 2.2. if so, recover validators, 2. recover failed jobs
with TRN:
recover_type = 'running'
_flush_queues(recover_type)
jids_to_recover = _get_jids_to_recover(recover_type)
# 3.1, and 3.2: checking which jobs have validators, and recover them
jobs_with_validators = []
for j in jids_to_recover:
job = ProcessingJob(j)
validators = list(job.validator_jobs)
if not validators:
jobs_with_validators.append(j)
continue
else:
# adding validators to jobs_with_validators to ignore them
# in the next code of block
for vj in validators:
jobs_with_validators.append(vj.id)
status = set([v.status for v in validators
if v.id not in _retrieve_queue_jobs()])
# if there are no status, that means that the validators weren't
# created and we should rerun from scratch (Step 4)
if not bool(status):
continue
# it multiple status in the validators, it's a complex behaivor
# and needs a case by case solution
if len(status) != 1:
print("Job '%s' has too many validators status (%d), check "
"them by hand" % (j, len(status)))
continue
status = list(status)[0]
if status == 'waiting':
print("releasing job validators: %s" % j)
try:
job.release_validators()
except Exception:
print("ERROR, releasing %s validators" % j)
sleep(SLEEP_TIME)
elif status == 'running':
_submit_jobs(validators, recover_type + ' validator, running')
elif status == 'error':
# in this case is the same process than before but we need
# to split the set in_construction and submit in 2 steps,
# however, we can still submit via _submit_jobs
for v in validators:
vjob = ProcessingJob(v)
vjob._set_status('in_construction')
_submit_jobs(validators, recover_type + ' validator, error')
else:
print("Check the status of this job %s : %s and validators"
"%s." % (j, status, validators))
jids_to_recover = set(jids_to_recover) - set(jobs_with_validators)
# Step 3: Finally, we recover all the leftover jobs
for i, j in enumerate(jids_to_recover):
job = ProcessingJob(j)
status = job.status
if status == 'waiting':
print("releasing job validators: %s" % j)
job.release_validators()
sleep(SLEEP_TIME)
elif 'running' == status:
_submit_jobs([j], 'main_job, running')
if __name__ == '__main__':
raise ValueError('This script should never be called directly but should '
'be used as a reference if we need to recover jobs, '
'see: qiita_recover_jobs')