Switch to side-by-side view

--- a
+++ b/scripts/qiita-recover-jobs
@@ -0,0 +1,240 @@
+#!/usr/bin/env python
+
+# -----------------------------------------------------------------------------
+# Copyright (c) 2014--, The Qiita Development Team.
+#
+# Distributed under the terms of the BSD 3-clause License.
+#
+# The full license is in the file LICENSE, distributed with this software.
+# -----------------------------------------------------------------------------
+from subprocess import check_output
+from qiita_db.sql_connection import TRN
+from qiita_db.processing_job import ProcessingJob
+import pandas as pd
+from time import sleep
+from math import ceil
+from io import StringIO
+
+
+SLEEP_TIME = 6
+CHANCES = 3
+SQL = """SELECT processing_job_id
+         FROM qiita.processing_job
+         JOIN qiita.processing_job_status
+         USING (processing_job_status_id)
+         WHERE processing_job_status = %s"""
+
+
+def _submit_jobs(jids_to_recover, recover_type):
+    # we are going to split the SLEEP_TIME by CHANCES so we can ctrl-c
+    # ... just in case
+    st = int(ceil(SLEEP_TIME/CHANCES))
+    len_jids_to_recover = len(jids_to_recover)
+    for i, j in enumerate(jids_to_recover):
+        print(f'recovering {j} {recover_type}: {len_jids_to_recover}/{i}')
+        job = ProcessingJob(j)
+        job._set_status('in_construction')
+        job.submit()
+        for i in range(CHANCES):
+            print('You can ctrl-c now, iteration %d' % i)
+            sleep(st)
+
+
+def _retrieve_queue_jobs():
+    # getting all the jobs in the queues
+    all_jobs = pd.read_csv(StringIO(
+        check_output(['squeue', '-o', '%all']).decode('ascii')), sep='|')
+
+    # just keeping the qiita jobs
+    jobs = all_jobs[all_jobs.GROUP == 'qiita']
+
+    # ignore the merge-jobs and get unique values
+    qiita_jids = jobs.NAME.str.replace('merge-', '').unique()
+    qiita_jids = [x.replace(
+        'finish-', '').replace('.txt', '') for x in qiita_jids]
+
+    return set(qiita_jids)
+
+
+def _get_jids_to_recover(recover_type):
+    with TRN:
+        TRN.add(SQL, [recover_type])
+        jids = set(TRN.execute_fetchflatten())
+        jids_to_recover = list(jids - _retrieve_queue_jobs())
+        print('Total %s: %d' % (recover_type, len(jids_to_recover)))
+        return jids_to_recover
+
+
+def _qiita_queue_log_parse(jids_to_recover):
+    results = []
+    for jid in jids_to_recover:
+        job = ProcessingJob(jid)
+        if job.external_id:
+            bvals = pd.read_csv(StringIO(check_output([
+                'sacct', '-p',
+                '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw',
+                '-j', f'{job.external_id}.batch']).decode(
+                'ascii')), sep='|').iloc[0].to_dict()
+            vals = pd.read_csv(StringIO(check_output([
+                'sacct', '-p',
+                '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw',
+                '-j', f'{job.external_id}']).decode(
+                'ascii')), sep='|').iloc[0].to_dict()
+            data = {
+                'exit-code': bvals['ExitCode'],
+                'mem-requested': bvals['ReqMem'],
+                'time-requested': vals['TimelimitRaw'],
+                'mem-used': bvals['MaxRSS'],
+                'time-used': bvals['CPUTimeRAW']}
+        else:
+            data = {
+                'exit-code': None,
+                'mem-requested': None,
+                'time-requested': None,
+                'mem-used': None,
+                'time-used': None}
+        results.append(job, data)
+
+    return results
+
+
+def _flush_queues(recover_type):
+    # README 1: in theory we should be able to submit all recover_type jobs
+    # one after the other but in reality that's not possible. The issue
+    # is that a job is going to stay as running/waiting until is completed.
+    # Thus, we need to run complete_job first, wait for everything to finish,
+    # then continue with validate, then release_validators, and
+    # finally everything else. Note that is suggested to wait for the
+    # full recovery type to finish before moving to the next one
+    # README 2: we now have a logging file for all submitted jobs, so let's
+    # start checking for those that failed for system crashes or cause the
+    # workers were busy, error-codes: 1-2
+
+    # first start with completing jobs that are not running
+    jids_to_recover = _get_jids_to_recover(recover_type)
+    review_jobs = _qiita_queue_log_parse(jids_to_recover)
+    jids_review_jobs = [j.id for j, r in review_jobs
+                        if {rr['exit-code'] for rr in r} == {'1'}]
+    _submit_jobs(jids_review_jobs, recover_type + '/queue_log/1')
+
+    jids_to_recover = _get_jids_to_recover(recover_type)
+    review_jobs = _qiita_queue_log_parse(jids_to_recover)
+    jids_review_jobs = [j.id for j, r in review_jobs
+                        if {rr['exit-code'] for rr in r} == {'0'}]
+    _submit_jobs(jids_review_jobs, recover_type + '/queue_log/0')
+
+    jids_to_recover = _get_jids_to_recover(recover_type)
+    complete_job = [j for j in jids_to_recover
+                    if ProcessingJob(j).command.name == 'complete_job']
+    _submit_jobs(complete_job, recover_type + '/complete_job')
+
+    # first start validators that are not running
+    jids_to_recover = _get_jids_to_recover(recover_type)
+    validate = [j for j in jids_to_recover
+                if ProcessingJob(j).command.name == 'Validate']
+    _submit_jobs(validate, recover_type + '/validate')
+
+    # then the release validator
+    jids_to_recover = _get_jids_to_recover(recover_type)
+    release_validators = [
+        j for j in jids_to_recover
+        if ProcessingJob(j).command.name == 'release_validators']
+    _submit_jobs(release_validators, recover_type + '/release_validators')
+
+
+def qiita_recover_jobs():
+    # general full processing pipeline, as an example a deblur job as it yields
+    # two artifacts, each new line represents a new job, each idented block a
+    # waiting job
+    # -> deblur
+    # -> complete_job -> release_validator
+    #     -> validate biom 1
+    #         -> release_validator
+    #         -> complete_job -> create artifact
+    #     -> validate biom 2
+    #         -> release_validator
+    #         -> complete_job -> create artifact
+
+    # Step 1: recover jobs that are in queue status
+    recover_type = 'queued'
+    _flush_queues(recover_type)
+
+    # then we recover what's left
+    jids_to_recover = _get_jids_to_recover(recover_type)
+    _submit_jobs(jids_to_recover, recover_type)
+
+    # Step 2: recover jobs that are running, note that there are several steps
+    #         to recover this group: 2.1. check if they have validators,
+    #         2.2. if so, recover validators, 2. recover failed jobs
+    with TRN:
+        recover_type = 'running'
+        _flush_queues(recover_type)
+        jids_to_recover = _get_jids_to_recover(recover_type)
+
+        # 3.1, and 3.2: checking which jobs have validators, and recover them
+        jobs_with_validators = []
+        for j in jids_to_recover:
+            job = ProcessingJob(j)
+            validators = list(job.validator_jobs)
+            if not validators:
+                jobs_with_validators.append(j)
+                continue
+            else:
+                # adding validators to jobs_with_validators to ignore them
+                # in the next code of block
+                for vj in validators:
+                    jobs_with_validators.append(vj.id)
+            status = set([v.status for v in validators
+                          if v.id not in _retrieve_queue_jobs()])
+            # if there are no status, that means that the validators weren't
+            # created and we should rerun from scratch (Step 4)
+            if not bool(status):
+                continue
+            # it multiple status in the validators, it's a complex behaivor
+            # and needs a case by case solution
+            if len(status) != 1:
+                print("Job '%s' has too many validators status (%d), check "
+                      "them by hand" % (j, len(status)))
+                continue
+            status = list(status)[0]
+
+            if status == 'waiting':
+                print("releasing job validators: %s" % j)
+                try:
+                    job.release_validators()
+                except Exception:
+                    print("ERROR, releasing %s validators" % j)
+                sleep(SLEEP_TIME)
+            elif status == 'running':
+                _submit_jobs(validators, recover_type + ' validator, running')
+            elif status == 'error':
+                # in this case is the same process than before but we need
+                # to split the set in_construction and submit in 2 steps,
+                # however, we can still submit via _submit_jobs
+                for v in validators:
+                    vjob = ProcessingJob(v)
+                    vjob._set_status('in_construction')
+                _submit_jobs(validators, recover_type + ' validator, error')
+            else:
+                print("Check the status of this job %s : %s and validators"
+                      "%s." % (j, status, validators))
+
+        jids_to_recover = set(jids_to_recover) - set(jobs_with_validators)
+
+    # Step 3: Finally, we recover all the leftover jobs
+    for i, j in enumerate(jids_to_recover):
+        job = ProcessingJob(j)
+        status = job.status
+
+        if status == 'waiting':
+            print("releasing job validators: %s" % j)
+            job.release_validators()
+            sleep(SLEEP_TIME)
+        elif 'running' == status:
+            _submit_jobs([j], 'main_job, running')
+
+
+if __name__ == '__main__':
+    raise ValueError('This script should never be called directly but should '
+                     'be used as a reference if we need to recover jobs, '
+                     'see: qiita_recover_jobs')