[973924]: / scripts / qiita-recover-jobs

Download this file

241 lines (206 with data), 9.7 kB

#!/usr/bin/env python

# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from subprocess import check_output
from qiita_db.sql_connection import TRN
from qiita_db.processing_job import ProcessingJob
import pandas as pd
from time import sleep
from math import ceil
from io import StringIO


SLEEP_TIME = 6
CHANCES = 3
SQL = """SELECT processing_job_id
         FROM qiita.processing_job
         JOIN qiita.processing_job_status
         USING (processing_job_status_id)
         WHERE processing_job_status = %s"""


def _submit_jobs(jids_to_recover, recover_type):
    # we are going to split the SLEEP_TIME by CHANCES so we can ctrl-c
    # ... just in case
    st = int(ceil(SLEEP_TIME/CHANCES))
    len_jids_to_recover = len(jids_to_recover)
    for i, j in enumerate(jids_to_recover):
        print(f'recovering {j} {recover_type}: {len_jids_to_recover}/{i}')
        job = ProcessingJob(j)
        job._set_status('in_construction')
        job.submit()
        for i in range(CHANCES):
            print('You can ctrl-c now, iteration %d' % i)
            sleep(st)


def _retrieve_queue_jobs():
    # getting all the jobs in the queues
    all_jobs = pd.read_csv(StringIO(
        check_output(['squeue', '-o', '%all']).decode('ascii')), sep='|')

    # just keeping the qiita jobs
    jobs = all_jobs[all_jobs.GROUP == 'qiita']

    # ignore the merge-jobs and get unique values
    qiita_jids = jobs.NAME.str.replace('merge-', '').unique()
    qiita_jids = [x.replace(
        'finish-', '').replace('.txt', '') for x in qiita_jids]

    return set(qiita_jids)


def _get_jids_to_recover(recover_type):
    with TRN:
        TRN.add(SQL, [recover_type])
        jids = set(TRN.execute_fetchflatten())
        jids_to_recover = list(jids - _retrieve_queue_jobs())
        print('Total %s: %d' % (recover_type, len(jids_to_recover)))
        return jids_to_recover


def _qiita_queue_log_parse(jids_to_recover):
    results = []
    for jid in jids_to_recover:
        job = ProcessingJob(jid)
        if job.external_id:
            bvals = pd.read_csv(StringIO(check_output([
                'sacct', '-p',
                '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw',
                '-j', f'{job.external_id}.batch']).decode(
                'ascii')), sep='|').iloc[0].to_dict()
            vals = pd.read_csv(StringIO(check_output([
                'sacct', '-p',
                '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw',
                '-j', f'{job.external_id}']).decode(
                'ascii')), sep='|').iloc[0].to_dict()
            data = {
                'exit-code': bvals['ExitCode'],
                'mem-requested': bvals['ReqMem'],
                'time-requested': vals['TimelimitRaw'],
                'mem-used': bvals['MaxRSS'],
                'time-used': bvals['CPUTimeRAW']}
        else:
            data = {
                'exit-code': None,
                'mem-requested': None,
                'time-requested': None,
                'mem-used': None,
                'time-used': None}
        results.append(job, data)

    return results


def _flush_queues(recover_type):
    # README 1: in theory we should be able to submit all recover_type jobs
    # one after the other but in reality that's not possible. The issue
    # is that a job is going to stay as running/waiting until is completed.
    # Thus, we need to run complete_job first, wait for everything to finish,
    # then continue with validate, then release_validators, and
    # finally everything else. Note that is suggested to wait for the
    # full recovery type to finish before moving to the next one
    # README 2: we now have a logging file for all submitted jobs, so let's
    # start checking for those that failed for system crashes or cause the
    # workers were busy, error-codes: 1-2

    # first start with completing jobs that are not running
    jids_to_recover = _get_jids_to_recover(recover_type)
    review_jobs = _qiita_queue_log_parse(jids_to_recover)
    jids_review_jobs = [j.id for j, r in review_jobs
                        if {rr['exit-code'] for rr in r} == {'1'}]
    _submit_jobs(jids_review_jobs, recover_type + '/queue_log/1')

    jids_to_recover = _get_jids_to_recover(recover_type)
    review_jobs = _qiita_queue_log_parse(jids_to_recover)
    jids_review_jobs = [j.id for j, r in review_jobs
                        if {rr['exit-code'] for rr in r} == {'0'}]
    _submit_jobs(jids_review_jobs, recover_type + '/queue_log/0')

    jids_to_recover = _get_jids_to_recover(recover_type)
    complete_job = [j for j in jids_to_recover
                    if ProcessingJob(j).command.name == 'complete_job']
    _submit_jobs(complete_job, recover_type + '/complete_job')

    # first start validators that are not running
    jids_to_recover = _get_jids_to_recover(recover_type)
    validate = [j for j in jids_to_recover
                if ProcessingJob(j).command.name == 'Validate']
    _submit_jobs(validate, recover_type + '/validate')

    # then the release validator
    jids_to_recover = _get_jids_to_recover(recover_type)
    release_validators = [
        j for j in jids_to_recover
        if ProcessingJob(j).command.name == 'release_validators']
    _submit_jobs(release_validators, recover_type + '/release_validators')


def qiita_recover_jobs():
    # general full processing pipeline, as an example a deblur job as it yields
    # two artifacts, each new line represents a new job, each idented block a
    # waiting job
    # -> deblur
    # -> complete_job -> release_validator
    #     -> validate biom 1
    #         -> release_validator
    #         -> complete_job -> create artifact
    #     -> validate biom 2
    #         -> release_validator
    #         -> complete_job -> create artifact

    # Step 1: recover jobs that are in queue status
    recover_type = 'queued'
    _flush_queues(recover_type)

    # then we recover what's left
    jids_to_recover = _get_jids_to_recover(recover_type)
    _submit_jobs(jids_to_recover, recover_type)

    # Step 2: recover jobs that are running, note that there are several steps
    #         to recover this group: 2.1. check if they have validators,
    #         2.2. if so, recover validators, 2. recover failed jobs
    with TRN:
        recover_type = 'running'
        _flush_queues(recover_type)
        jids_to_recover = _get_jids_to_recover(recover_type)

        # 3.1, and 3.2: checking which jobs have validators, and recover them
        jobs_with_validators = []
        for j in jids_to_recover:
            job = ProcessingJob(j)
            validators = list(job.validator_jobs)
            if not validators:
                jobs_with_validators.append(j)
                continue
            else:
                # adding validators to jobs_with_validators to ignore them
                # in the next code of block
                for vj in validators:
                    jobs_with_validators.append(vj.id)
            status = set([v.status for v in validators
                          if v.id not in _retrieve_queue_jobs()])
            # if there are no status, that means that the validators weren't
            # created and we should rerun from scratch (Step 4)
            if not bool(status):
                continue
            # it multiple status in the validators, it's a complex behaivor
            # and needs a case by case solution
            if len(status) != 1:
                print("Job '%s' has too many validators status (%d), check "
                      "them by hand" % (j, len(status)))
                continue
            status = list(status)[0]

            if status == 'waiting':
                print("releasing job validators: %s" % j)
                try:
                    job.release_validators()
                except Exception:
                    print("ERROR, releasing %s validators" % j)
                sleep(SLEEP_TIME)
            elif status == 'running':
                _submit_jobs(validators, recover_type + ' validator, running')
            elif status == 'error':
                # in this case is the same process than before but we need
                # to split the set in_construction and submit in 2 steps,
                # however, we can still submit via _submit_jobs
                for v in validators:
                    vjob = ProcessingJob(v)
                    vjob._set_status('in_construction')
                _submit_jobs(validators, recover_type + ' validator, error')
            else:
                print("Check the status of this job %s : %s and validators"
                      "%s." % (j, status, validators))

        jids_to_recover = set(jids_to_recover) - set(jobs_with_validators)

    # Step 3: Finally, we recover all the leftover jobs
    for i, j in enumerate(jids_to_recover):
        job = ProcessingJob(j)
        status = job.status

        if status == 'waiting':
            print("releasing job validators: %s" % j)
            job.release_validators()
            sleep(SLEEP_TIME)
        elif 'running' == status:
            _submit_jobs([j], 'main_job, running')


if __name__ == '__main__':
    raise ValueError('This script should never be called directly but should '
                     'be used as a reference if we need to recover jobs, '
                     'see: qiita_recover_jobs')