--- a +++ b/scripts/qiita-recover-jobs @@ -0,0 +1,240 @@ +#!/usr/bin/env python + +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- +from subprocess import check_output +from qiita_db.sql_connection import TRN +from qiita_db.processing_job import ProcessingJob +import pandas as pd +from time import sleep +from math import ceil +from io import StringIO + + +SLEEP_TIME = 6 +CHANCES = 3 +SQL = """SELECT processing_job_id + FROM qiita.processing_job + JOIN qiita.processing_job_status + USING (processing_job_status_id) + WHERE processing_job_status = %s""" + + +def _submit_jobs(jids_to_recover, recover_type): + # we are going to split the SLEEP_TIME by CHANCES so we can ctrl-c + # ... just in case + st = int(ceil(SLEEP_TIME/CHANCES)) + len_jids_to_recover = len(jids_to_recover) + for i, j in enumerate(jids_to_recover): + print(f'recovering {j} {recover_type}: {len_jids_to_recover}/{i}') + job = ProcessingJob(j) + job._set_status('in_construction') + job.submit() + for i in range(CHANCES): + print('You can ctrl-c now, iteration %d' % i) + sleep(st) + + +def _retrieve_queue_jobs(): + # getting all the jobs in the queues + all_jobs = pd.read_csv(StringIO( + check_output(['squeue', '-o', '%all']).decode('ascii')), sep='|') + + # just keeping the qiita jobs + jobs = all_jobs[all_jobs.GROUP == 'qiita'] + + # ignore the merge-jobs and get unique values + qiita_jids = jobs.NAME.str.replace('merge-', '').unique() + qiita_jids = [x.replace( + 'finish-', '').replace('.txt', '') for x in qiita_jids] + + return set(qiita_jids) + + +def _get_jids_to_recover(recover_type): + with TRN: + TRN.add(SQL, [recover_type]) + jids = set(TRN.execute_fetchflatten()) + jids_to_recover = list(jids - _retrieve_queue_jobs()) + print('Total %s: %d' % (recover_type, len(jids_to_recover))) + return jids_to_recover + + +def _qiita_queue_log_parse(jids_to_recover): + results = [] + for jid in jids_to_recover: + job = ProcessingJob(jid) + if job.external_id: + bvals = pd.read_csv(StringIO(check_output([ + 'sacct', '-p', + '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw', + '-j', f'{job.external_id}.batch']).decode( + 'ascii')), sep='|').iloc[0].to_dict() + vals = pd.read_csv(StringIO(check_output([ + 'sacct', '-p', + '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw', + '-j', f'{job.external_id}']).decode( + 'ascii')), sep='|').iloc[0].to_dict() + data = { + 'exit-code': bvals['ExitCode'], + 'mem-requested': bvals['ReqMem'], + 'time-requested': vals['TimelimitRaw'], + 'mem-used': bvals['MaxRSS'], + 'time-used': bvals['CPUTimeRAW']} + else: + data = { + 'exit-code': None, + 'mem-requested': None, + 'time-requested': None, + 'mem-used': None, + 'time-used': None} + results.append(job, data) + + return results + + +def _flush_queues(recover_type): + # README 1: in theory we should be able to submit all recover_type jobs + # one after the other but in reality that's not possible. The issue + # is that a job is going to stay as running/waiting until is completed. + # Thus, we need to run complete_job first, wait for everything to finish, + # then continue with validate, then release_validators, and + # finally everything else. Note that is suggested to wait for the + # full recovery type to finish before moving to the next one + # README 2: we now have a logging file for all submitted jobs, so let's + # start checking for those that failed for system crashes or cause the + # workers were busy, error-codes: 1-2 + + # first start with completing jobs that are not running + jids_to_recover = _get_jids_to_recover(recover_type) + review_jobs = _qiita_queue_log_parse(jids_to_recover) + jids_review_jobs = [j.id for j, r in review_jobs + if {rr['exit-code'] for rr in r} == {'1'}] + _submit_jobs(jids_review_jobs, recover_type + '/queue_log/1') + + jids_to_recover = _get_jids_to_recover(recover_type) + review_jobs = _qiita_queue_log_parse(jids_to_recover) + jids_review_jobs = [j.id for j, r in review_jobs + if {rr['exit-code'] for rr in r} == {'0'}] + _submit_jobs(jids_review_jobs, recover_type + '/queue_log/0') + + jids_to_recover = _get_jids_to_recover(recover_type) + complete_job = [j for j in jids_to_recover + if ProcessingJob(j).command.name == 'complete_job'] + _submit_jobs(complete_job, recover_type + '/complete_job') + + # first start validators that are not running + jids_to_recover = _get_jids_to_recover(recover_type) + validate = [j for j in jids_to_recover + if ProcessingJob(j).command.name == 'Validate'] + _submit_jobs(validate, recover_type + '/validate') + + # then the release validator + jids_to_recover = _get_jids_to_recover(recover_type) + release_validators = [ + j for j in jids_to_recover + if ProcessingJob(j).command.name == 'release_validators'] + _submit_jobs(release_validators, recover_type + '/release_validators') + + +def qiita_recover_jobs(): + # general full processing pipeline, as an example a deblur job as it yields + # two artifacts, each new line represents a new job, each idented block a + # waiting job + # -> deblur + # -> complete_job -> release_validator + # -> validate biom 1 + # -> release_validator + # -> complete_job -> create artifact + # -> validate biom 2 + # -> release_validator + # -> complete_job -> create artifact + + # Step 1: recover jobs that are in queue status + recover_type = 'queued' + _flush_queues(recover_type) + + # then we recover what's left + jids_to_recover = _get_jids_to_recover(recover_type) + _submit_jobs(jids_to_recover, recover_type) + + # Step 2: recover jobs that are running, note that there are several steps + # to recover this group: 2.1. check if they have validators, + # 2.2. if so, recover validators, 2. recover failed jobs + with TRN: + recover_type = 'running' + _flush_queues(recover_type) + jids_to_recover = _get_jids_to_recover(recover_type) + + # 3.1, and 3.2: checking which jobs have validators, and recover them + jobs_with_validators = [] + for j in jids_to_recover: + job = ProcessingJob(j) + validators = list(job.validator_jobs) + if not validators: + jobs_with_validators.append(j) + continue + else: + # adding validators to jobs_with_validators to ignore them + # in the next code of block + for vj in validators: + jobs_with_validators.append(vj.id) + status = set([v.status for v in validators + if v.id not in _retrieve_queue_jobs()]) + # if there are no status, that means that the validators weren't + # created and we should rerun from scratch (Step 4) + if not bool(status): + continue + # it multiple status in the validators, it's a complex behaivor + # and needs a case by case solution + if len(status) != 1: + print("Job '%s' has too many validators status (%d), check " + "them by hand" % (j, len(status))) + continue + status = list(status)[0] + + if status == 'waiting': + print("releasing job validators: %s" % j) + try: + job.release_validators() + except Exception: + print("ERROR, releasing %s validators" % j) + sleep(SLEEP_TIME) + elif status == 'running': + _submit_jobs(validators, recover_type + ' validator, running') + elif status == 'error': + # in this case is the same process than before but we need + # to split the set in_construction and submit in 2 steps, + # however, we can still submit via _submit_jobs + for v in validators: + vjob = ProcessingJob(v) + vjob._set_status('in_construction') + _submit_jobs(validators, recover_type + ' validator, error') + else: + print("Check the status of this job %s : %s and validators" + "%s." % (j, status, validators)) + + jids_to_recover = set(jids_to_recover) - set(jobs_with_validators) + + # Step 3: Finally, we recover all the leftover jobs + for i, j in enumerate(jids_to_recover): + job = ProcessingJob(j) + status = job.status + + if status == 'waiting': + print("releasing job validators: %s" % j) + job.release_validators() + sleep(SLEEP_TIME) + elif 'running' == status: + _submit_jobs([j], 'main_job, running') + + +if __name__ == '__main__': + raise ValueError('This script should never be called directly but should ' + 'be used as a reference if we need to recover jobs, ' + 'see: qiita_recover_jobs')