Switch to unified view

a b/scripts/qiita-recover-jobs
1
#!/usr/bin/env python
2
3
# -----------------------------------------------------------------------------
4
# Copyright (c) 2014--, The Qiita Development Team.
5
#
6
# Distributed under the terms of the BSD 3-clause License.
7
#
8
# The full license is in the file LICENSE, distributed with this software.
9
# -----------------------------------------------------------------------------
10
from subprocess import check_output
11
from qiita_db.sql_connection import TRN
12
from qiita_db.processing_job import ProcessingJob
13
import pandas as pd
14
from time import sleep
15
from math import ceil
16
from io import StringIO
17
18
19
SLEEP_TIME = 6
20
CHANCES = 3
21
SQL = """SELECT processing_job_id
22
         FROM qiita.processing_job
23
         JOIN qiita.processing_job_status
24
         USING (processing_job_status_id)
25
         WHERE processing_job_status = %s"""
26
27
28
def _submit_jobs(jids_to_recover, recover_type):
29
    # we are going to split the SLEEP_TIME by CHANCES so we can ctrl-c
30
    # ... just in case
31
    st = int(ceil(SLEEP_TIME/CHANCES))
32
    len_jids_to_recover = len(jids_to_recover)
33
    for i, j in enumerate(jids_to_recover):
34
        print(f'recovering {j} {recover_type}: {len_jids_to_recover}/{i}')
35
        job = ProcessingJob(j)
36
        job._set_status('in_construction')
37
        job.submit()
38
        for i in range(CHANCES):
39
            print('You can ctrl-c now, iteration %d' % i)
40
            sleep(st)
41
42
43
def _retrieve_queue_jobs():
44
    # getting all the jobs in the queues
45
    all_jobs = pd.read_csv(StringIO(
46
        check_output(['squeue', '-o', '%all']).decode('ascii')), sep='|')
47
48
    # just keeping the qiita jobs
49
    jobs = all_jobs[all_jobs.GROUP == 'qiita']
50
51
    # ignore the merge-jobs and get unique values
52
    qiita_jids = jobs.NAME.str.replace('merge-', '').unique()
53
    qiita_jids = [x.replace(
54
        'finish-', '').replace('.txt', '') for x in qiita_jids]
55
56
    return set(qiita_jids)
57
58
59
def _get_jids_to_recover(recover_type):
60
    with TRN:
61
        TRN.add(SQL, [recover_type])
62
        jids = set(TRN.execute_fetchflatten())
63
        jids_to_recover = list(jids - _retrieve_queue_jobs())
64
        print('Total %s: %d' % (recover_type, len(jids_to_recover)))
65
        return jids_to_recover
66
67
68
def _qiita_queue_log_parse(jids_to_recover):
69
    results = []
70
    for jid in jids_to_recover:
71
        job = ProcessingJob(jid)
72
        if job.external_id:
73
            bvals = pd.read_csv(StringIO(check_output([
74
                'sacct', '-p',
75
                '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw',
76
                '-j', f'{job.external_id}.batch']).decode(
77
                'ascii')), sep='|').iloc[0].to_dict()
78
            vals = pd.read_csv(StringIO(check_output([
79
                'sacct', '-p',
80
                '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw',
81
                '-j', f'{job.external_id}']).decode(
82
                'ascii')), sep='|').iloc[0].to_dict()
83
            data = {
84
                'exit-code': bvals['ExitCode'],
85
                'mem-requested': bvals['ReqMem'],
86
                'time-requested': vals['TimelimitRaw'],
87
                'mem-used': bvals['MaxRSS'],
88
                'time-used': bvals['CPUTimeRAW']}
89
        else:
90
            data = {
91
                'exit-code': None,
92
                'mem-requested': None,
93
                'time-requested': None,
94
                'mem-used': None,
95
                'time-used': None}
96
        results.append(job, data)
97
98
    return results
99
100
101
def _flush_queues(recover_type):
102
    # README 1: in theory we should be able to submit all recover_type jobs
103
    # one after the other but in reality that's not possible. The issue
104
    # is that a job is going to stay as running/waiting until is completed.
105
    # Thus, we need to run complete_job first, wait for everything to finish,
106
    # then continue with validate, then release_validators, and
107
    # finally everything else. Note that is suggested to wait for the
108
    # full recovery type to finish before moving to the next one
109
    # README 2: we now have a logging file for all submitted jobs, so let's
110
    # start checking for those that failed for system crashes or cause the
111
    # workers were busy, error-codes: 1-2
112
113
    # first start with completing jobs that are not running
114
    jids_to_recover = _get_jids_to_recover(recover_type)
115
    review_jobs = _qiita_queue_log_parse(jids_to_recover)
116
    jids_review_jobs = [j.id for j, r in review_jobs
117
                        if {rr['exit-code'] for rr in r} == {'1'}]
118
    _submit_jobs(jids_review_jobs, recover_type + '/queue_log/1')
119
120
    jids_to_recover = _get_jids_to_recover(recover_type)
121
    review_jobs = _qiita_queue_log_parse(jids_to_recover)
122
    jids_review_jobs = [j.id for j, r in review_jobs
123
                        if {rr['exit-code'] for rr in r} == {'0'}]
124
    _submit_jobs(jids_review_jobs, recover_type + '/queue_log/0')
125
126
    jids_to_recover = _get_jids_to_recover(recover_type)
127
    complete_job = [j for j in jids_to_recover
128
                    if ProcessingJob(j).command.name == 'complete_job']
129
    _submit_jobs(complete_job, recover_type + '/complete_job')
130
131
    # first start validators that are not running
132
    jids_to_recover = _get_jids_to_recover(recover_type)
133
    validate = [j for j in jids_to_recover
134
                if ProcessingJob(j).command.name == 'Validate']
135
    _submit_jobs(validate, recover_type + '/validate')
136
137
    # then the release validator
138
    jids_to_recover = _get_jids_to_recover(recover_type)
139
    release_validators = [
140
        j for j in jids_to_recover
141
        if ProcessingJob(j).command.name == 'release_validators']
142
    _submit_jobs(release_validators, recover_type + '/release_validators')
143
144
145
def qiita_recover_jobs():
146
    # general full processing pipeline, as an example a deblur job as it yields
147
    # two artifacts, each new line represents a new job, each idented block a
148
    # waiting job
149
    # -> deblur
150
    # -> complete_job -> release_validator
151
    #     -> validate biom 1
152
    #         -> release_validator
153
    #         -> complete_job -> create artifact
154
    #     -> validate biom 2
155
    #         -> release_validator
156
    #         -> complete_job -> create artifact
157
158
    # Step 1: recover jobs that are in queue status
159
    recover_type = 'queued'
160
    _flush_queues(recover_type)
161
162
    # then we recover what's left
163
    jids_to_recover = _get_jids_to_recover(recover_type)
164
    _submit_jobs(jids_to_recover, recover_type)
165
166
    # Step 2: recover jobs that are running, note that there are several steps
167
    #         to recover this group: 2.1. check if they have validators,
168
    #         2.2. if so, recover validators, 2. recover failed jobs
169
    with TRN:
170
        recover_type = 'running'
171
        _flush_queues(recover_type)
172
        jids_to_recover = _get_jids_to_recover(recover_type)
173
174
        # 3.1, and 3.2: checking which jobs have validators, and recover them
175
        jobs_with_validators = []
176
        for j in jids_to_recover:
177
            job = ProcessingJob(j)
178
            validators = list(job.validator_jobs)
179
            if not validators:
180
                jobs_with_validators.append(j)
181
                continue
182
            else:
183
                # adding validators to jobs_with_validators to ignore them
184
                # in the next code of block
185
                for vj in validators:
186
                    jobs_with_validators.append(vj.id)
187
            status = set([v.status for v in validators
188
                          if v.id not in _retrieve_queue_jobs()])
189
            # if there are no status, that means that the validators weren't
190
            # created and we should rerun from scratch (Step 4)
191
            if not bool(status):
192
                continue
193
            # it multiple status in the validators, it's a complex behaivor
194
            # and needs a case by case solution
195
            if len(status) != 1:
196
                print("Job '%s' has too many validators status (%d), check "
197
                      "them by hand" % (j, len(status)))
198
                continue
199
            status = list(status)[0]
200
201
            if status == 'waiting':
202
                print("releasing job validators: %s" % j)
203
                try:
204
                    job.release_validators()
205
                except Exception:
206
                    print("ERROR, releasing %s validators" % j)
207
                sleep(SLEEP_TIME)
208
            elif status == 'running':
209
                _submit_jobs(validators, recover_type + ' validator, running')
210
            elif status == 'error':
211
                # in this case is the same process than before but we need
212
                # to split the set in_construction and submit in 2 steps,
213
                # however, we can still submit via _submit_jobs
214
                for v in validators:
215
                    vjob = ProcessingJob(v)
216
                    vjob._set_status('in_construction')
217
                _submit_jobs(validators, recover_type + ' validator, error')
218
            else:
219
                print("Check the status of this job %s : %s and validators"
220
                      "%s." % (j, status, validators))
221
222
        jids_to_recover = set(jids_to_recover) - set(jobs_with_validators)
223
224
    # Step 3: Finally, we recover all the leftover jobs
225
    for i, j in enumerate(jids_to_recover):
226
        job = ProcessingJob(j)
227
        status = job.status
228
229
        if status == 'waiting':
230
            print("releasing job validators: %s" % j)
231
            job.release_validators()
232
            sleep(SLEEP_TIME)
233
        elif 'running' == status:
234
            _submit_jobs([j], 'main_job, running')
235
236
237
if __name__ == '__main__':
238
    raise ValueError('This script should never be called directly but should '
239
                     'be used as a reference if we need to recover jobs, '
240
                     'see: qiita_recover_jobs')