Switch to unified view

a b/notebooks/resource-allocation/generate-allocation-summary.py
1
from subprocess import check_output
2
import pandas as pd
3
from datetime import datetime, timedelta
4
from io import StringIO
5
from json import loads
6
from os.path import join
7
8
from qiita_db.util import MaxRSS_helper
9
from qiita_db.exceptions import QiitaDBUnknownIDError
10
from qiita_db.processing_job import ProcessingJob
11
from qiita_db.software import Software
12
13
14
all_commands = [c for s in Software.iter(False) for c in s.commands]
15
16
# retrieving only the numerice external_id means that we are only focusing
17
# on barnacle2/slurm jobs
18
main_jobs = [j for c in all_commands for j in c.processing_jobs
19
             if j.status == 'success' and j.external_id.isnumeric()]
20
21
sacct = ['sacct', '-p', '--format=JobID,ElapsedRaw,MaxRSS,Submit,Start,MaxRSS,'
22
         'CPUTimeRAW,ReqMem,AllocCPUs,AveVMSize', '-j']
23
24
data = []
25
for i, j in enumerate(main_jobs):
26
    if i % 1000 == 0:
27
        print(f'{i}/{len(main_jobs)}')
28
    eid = j.external_id
29
    extra_info = ''
30
    rvals = StringIO(check_output(sacct + [eid]).decode('ascii'))
31
    _d = pd.read_csv(rvals, sep='|')
32
    _d['QiitaID'] = j.id
33
    cmd = j.command
34
    s = j.command.software
35
    try:
36
        samples, columns, input_size = j.shape
37
    except QiitaDBUnknownIDError:
38
        # this will be raised if the study or the analysis has been deleted;
39
        # in other words, the processing_job was ran but the details about it
40
        # were erased when the user deleted them - however, we keep the job for
41
        # the record
42
        continue
43
    except TypeError as e:
44
        # similar to the except above, exept that for these 2 commands, we have
45
        # the study_id as None
46
        if cmd.name in {'create_sample_template', 'delete_sample_template',
47
                        'list_remote_files'}:
48
            continue
49
        else:
50
            raise e
51
52
    sname = s.name
53
54
    if cmd.name == 'release_validators':
55
        ej = ProcessingJob(j.parameters.values['job'])
56
        extra_info = ej.command.name
57
        samples, columns, input_size = ej.shape
58
    elif cmd.name == 'complete_job':
59
        artifacts = loads(j.parameters.values['payload'])['artifacts']
60
        if artifacts is not None:
61
            extra_info = ','.join({
62
                x['artifact_type'] for x in artifacts.values()
63
                if 'artifact_type' in x})
64
    elif cmd.name == 'Validate':
65
        input_size = sum([len(x) for x in loads(
66
            j.parameters.values['files']).values()])
67
        sname = f"{sname} - {j.parameters.values['artifact_type']}"
68
    elif cmd.name == 'Alpha rarefaction curves [alpha_rarefaction]':
69
        extra_info = j.parameters.values[
70
            ('The number of rarefaction depths to include between min_depth '
71
             'and max_depth. (steps)')]
72
73
    _d['external_id'] = eid
74
    _d['sId'] = s.id
75
    _d['sName'] = sname
76
    _d['sVersion'] = s.version
77
    _d['cId'] = cmd.id
78
    _d['cName'] = cmd.name
79
    _d['samples'] = samples
80
    _d['columns'] = columns
81
    _d['input_size'] = input_size
82
    _d['extra_info'] = extra_info
83
    _d.drop(columns=['Unnamed: 10'], inplace=True)
84
    data.append(_d)
85
86
data = pd.concat(data)
87
88
# In slurm, each JobID is represented by 3 rows in the dataframe:
89
#   - external_id:  overall container for the job and its associated
90
#                   requests. When the Timelimit is hit, the container
91
#                   would take care of completing/stopping the
92
#                   external_id.batch job.
93
#   - external_id.batch: it's a container job, it provides how
94
#                        much memory it uses and cpus allocated, etc.
95
#   - external_id.extern: takes into account anything that happens
96
#                         outside processing but yet is included in
97
#                         the container resources. As in, if you ssh
98
#                         to the node and do something additional or run
99
#                         a prolog script, that processing would be under
100
#                         external_id but separate from external_id.batch.
101
# Here we are going to merge all this info into a single row + some
102
# other columns
103
date_fmt = '%Y-%m-%dT%H:%M:%S'
104
105
df = []
106
for eid, __df in data.groupby('external_id'):
107
    tmp = __df.iloc[1].copy()
108
    # Calculating WaitTime, basically how long did the job took to start
109
    # this is useful for some general profiling
110
    tmp['WaitTime'] = datetime.strptime(
111
        __df.iloc[0].Start, date_fmt) - datetime.strptime(
112
        __df.iloc[0].Submit, date_fmt)
113
    df.append(tmp)
114
df = pd.DataFrame(df)
115
116
# This is important as we are transforming the MaxRSS to raw value
117
# so we need to confirm that there is no other suffixes
118
print('Make sure that only 0/K/M exist', set(
119
    df.MaxRSS.apply(lambda x: str(x)[-1])))
120
121
# Generating new columns
122
df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x)))
123
df['ElapsedRawTime'] = df.ElapsedRaw.apply(
124
    lambda x: timedelta(seconds=float(x)))
125
126
# Thu, Apr 27, 2023 was the first time Jeff and I changed the old allocations
127
# (from barnacle) to a better allocation so using job 1265533 as the
128
# before/after so we only use the latests for the newest version
129
df['updated'] = df.external_id.apply(
130
    lambda x: 'after' if int(x) >= 1265533 else 'before')
131
132
fn = join('/panfs', 'qiita', f'jobs_{df.Start.max()[:10]}.tsv.gz')
133
df.to_csv(fn, sep='\t', index=False)