Diff of /qiita_db/util.py [000000] .. [879b32]

Switch to unified view

a b/qiita_db/util.py
1
r"""
2
Util functions (:mod: `qiita_db.util`)
3
======================================
4
5
..currentmodule:: qiita_db.util
6
7
This module provides different util functions.
8
9
Methods
10
-------
11
12
..autosummary::
13
    :toctree: generated/
14
15
    quote_data_value
16
    scrub_data
17
    exists_table
18
    get_db_files_base_dir
19
    compute_checksum
20
    get_files_from_uploads_folders
21
    filepath_id_to_rel_path
22
    filepath_id_to_object_id
23
    get_mountpoint
24
    insert_filepaths
25
    check_table_cols
26
    check_required_columns
27
    convert_from_id
28
    convert_to_id
29
    get_environmental_packages
30
    get_visibilities
31
    purge_filepaths
32
    move_filepaths_to_upload_folder
33
    move_upload_files_to_trash
34
    add_message
35
    get_pubmed_ids_from_dois
36
    generate_analysis_list
37
    human_merging_scheme
38
"""
39
# -----------------------------------------------------------------------------
40
# Copyright (c) 2014--, The Qiita Development Team.
41
#
42
# Distributed under the terms of the BSD 3-clause License.
43
#
44
# The full license is in the file LICENSE, distributed with this software.
45
# -----------------------------------------------------------------------------
46
from random import SystemRandom
47
from string import ascii_letters, digits, punctuation
48
from binascii import crc32
49
from bcrypt import hashpw, gensalt
50
from functools import partial
51
from os.path import join, basename, isdir, exists, getsize
52
from os import walk, remove, listdir, stat, makedirs
53
from glob import glob
54
from shutil import move, rmtree, copy as shutil_copy
55
from openpyxl import load_workbook
56
from tempfile import mkstemp
57
from csv import writer as csv_writer
58
from datetime import datetime, timedelta
59
from time import time as now
60
from itertools import chain
61
from contextlib import contextmanager
62
import h5py
63
from humanize import naturalsize
64
import hashlib
65
from smtplib import SMTP, SMTP_SSL, SMTPException
66
67
from errno import EEXIST
68
from qiita_core.exceptions import IncompetentQiitaDeveloperError
69
from qiita_core.qiita_settings import qiita_config
70
from subprocess import check_output
71
import qiita_db as qdb
72
73
from email.mime.multipart import MIMEMultipart
74
from email.mime.text import MIMEText
75
76
import matplotlib.pyplot as plt
77
from matplotlib import colormaps
78
import numpy as np
79
import pandas as pd
80
from io import StringIO
81
from json import loads
82
from scipy.optimize import minimize
83
84
85
def scrub_data(s):
86
    r"""Scrubs data fields of characters not allowed by PostgreSQL
87
88
    disallowed characters:
89
        '   ;
90
91
    Parameters
92
    ----------
93
    s : str
94
        The string to clean up
95
96
    Returns
97
    -------
98
    str
99
        The scrubbed string
100
    """
101
    ret = s.replace("'", "")
102
    ret = ret.replace(";", "")
103
    return ret
104
105
106
def convert_type(obj):
107
    """Converts a passed item to int, float, or str in that order
108
109
    Parameters
110
    ----------
111
    obj : object
112
        object to evaluate
113
114
    Returns
115
    -------
116
    int, float, or str
117
        Re-typed information from obj
118
119
    Raises
120
    ------
121
    IncompetentQiitaDeveloperError
122
        If the object can't be converted to int, float, or string
123
124
    Notes
125
    -----
126
    The function first tries to convert to an int. If that fails, it tries to
127
    convert to a float. If that fails it returns the original string.
128
    """
129
    item = None
130
    if isinstance(obj, datetime):
131
        item = str(obj)
132
    else:
133
        for fn in (int, float, str):
134
            try:
135
                item = fn(obj)
136
            except ValueError:
137
                continue
138
            else:
139
                break
140
    if item is None:
141
        raise IncompetentQiitaDeveloperError("Can't convert item of type %s!" %
142
                                             str(type(obj)))
143
    return item
144
145
146
def get_artifact_types(key_by_id=False):
147
    """Gets the list of possible artifact types
148
149
    Parameters
150
    ----------
151
    key : bool, optional
152
        Determines the format of the returned dict. Defaults to false.
153
154
    Returns
155
    -------
156
    dict
157
        If key_by_id is True, dict is of the form
158
        {artifact_type_id: artifact_type}
159
        If key_by_id is False, dict is of the form
160
        {artifact_type: artifact_type_id}
161
    """
162
    with qdb.sql_connection.TRN:
163
        cols = ('artifact_type_id, artifact_type'
164
                if key_by_id else 'artifact_type, artifact_type_id')
165
        sql = "SELECT {} FROM qiita.artifact_type".format(cols)
166
        qdb.sql_connection.TRN.add(sql)
167
        return dict(qdb.sql_connection.TRN.execute_fetchindex())
168
169
170
def get_filepath_types(key='filepath_type'):
171
    """Gets the list of possible filepath types from the filetype table
172
173
    Parameters
174
    ----------
175
    key : {'filepath_type', 'filepath_type_id'}, optional
176
        Defaults to "filepath_type". Determines the format of the returned
177
        dict.
178
179
    Returns
180
    -------
181
    dict
182
        - If `key` is "filepath_type", dict is of the form
183
          {filepath_type: filepath_type_id}
184
        - If `key` is "filepath_type_id", dict is of the form
185
          {filepath_type_id: filepath_type}
186
    """
187
    with qdb.sql_connection.TRN:
188
        if key == 'filepath_type':
189
            cols = 'filepath_type, filepath_type_id'
190
        elif key == 'filepath_type_id':
191
            cols = 'filepath_type_id, filepath_type'
192
        else:
193
            raise qdb.exceptions.QiitaDBColumnError(
194
                "Unknown key. Pass either 'filepath_type' or "
195
                "'filepath_type_id'.")
196
        sql = 'SELECT {} FROM qiita.filepath_type'.format(cols)
197
        qdb.sql_connection.TRN.add(sql)
198
        return dict(qdb.sql_connection.TRN.execute_fetchindex())
199
200
201
def get_data_types(key='data_type'):
202
    """Gets the list of possible data types from the data_type table
203
204
    Parameters
205
    ----------
206
    key : {'data_type', 'data_type_id'}, optional
207
        Defaults to "data_type". Determines the format of the returned dict.
208
209
    Returns
210
    -------
211
    dict
212
        - If `key` is "data_type", dict is of the form
213
          {data_type: data_type_id}
214
        - If `key` is "data_type_id", dict is of the form
215
          {data_type_id: data_type}
216
    """
217
    with qdb.sql_connection.TRN:
218
        if key == 'data_type':
219
            cols = 'data_type, data_type_id'
220
        elif key == 'data_type_id':
221
            cols = 'data_type_id, data_type'
222
        else:
223
            raise qdb.exceptions.QiitaDBColumnError(
224
                "Unknown key. Pass either 'data_type_id' or 'data_type'.")
225
        sql = 'SELECT {} FROM qiita.data_type'.format(cols)
226
        qdb.sql_connection.TRN.add(sql)
227
        return dict(qdb.sql_connection.TRN.execute_fetchindex())
228
229
230
def create_rand_string(length, punct=True):
231
    """Returns a string of random ascii characters
232
233
    Parameters
234
    ----------
235
    length: int
236
        Length of string to return
237
    punct: bool, optional
238
        Include punctuation as well as letters and numbers. Default True.
239
    """
240
    chars = ascii_letters + digits
241
    if punct:
242
        chars += punctuation
243
    sr = SystemRandom()
244
    return ''.join(sr.choice(chars) for i in range(length))
245
246
247
def hash_password(password, hashedpw=None):
248
    """Hashes password
249
250
    Parameters
251
    ----------
252
    password: str
253
        Plaintext password
254
    hashedpw: str, optional
255
        Previously hashed password for bcrypt to pull salt from. If not
256
        given, salt generated before hash
257
258
    Returns
259
    -------
260
    str
261
        Hashed password
262
263
    Notes
264
    -----
265
    Relies on bcrypt library to hash passwords, which stores the salt as
266
    part of the hashed password. Don't need to actually store the salt
267
    because of this.
268
    """
269
    # all the encode/decode as a python 3 workaround for bcrypt
270
    if hashedpw is None:
271
        hashedpw = gensalt()
272
    else:
273
        hashedpw = hashedpw.encode('utf-8')
274
    password = password.encode('utf-8')
275
    output = hashpw(password, hashedpw)
276
    if isinstance(output, bytes):
277
        output = output.decode("utf-8")
278
    return output
279
280
281
def check_required_columns(keys, table):
282
    """Makes sure all required columns in database table are in keys
283
284
    Parameters
285
    ----------
286
    keys: iterable
287
        Holds the keys in the dictionary
288
    table: str
289
        name of the table to check required columns
290
291
    Raises
292
    ------
293
    QiitaDBColumnError
294
        If keys exist that are not in the table
295
    RuntimeError
296
        Unable to get columns from database
297
    """
298
    with qdb.sql_connection.TRN:
299
        sql = """SELECT is_nullable, column_name, column_default
300
                 FROM information_schema.columns WHERE table_name = %s"""
301
        qdb.sql_connection.TRN.add(sql, [table])
302
        cols = qdb.sql_connection.TRN.execute_fetchindex()
303
        # Test needed because a user with certain permissions can query without
304
        # error but be unable to get the column names
305
        if len(cols) == 0:
306
            raise RuntimeError("Unable to fetch column names for table %s"
307
                               % table)
308
        required = set(x[1] for x in cols if x[0] == 'NO' and x[2] is None)
309
        if len(required.difference(keys)) > 0:
310
            raise qdb.exceptions.QiitaDBColumnError(
311
                "Required keys missing: %s" % required.difference(keys))
312
313
314
def check_table_cols(keys, table):
315
    """Makes sure all keys correspond to column headers in a table
316
317
    Parameters
318
    ----------
319
    keys: iterable
320
        Holds the keys in the dictionary
321
    table: str
322
        name of the table to check column names
323
324
    Raises
325
    ------
326
    QiitaDBColumnError
327
        If a key is found that is not in table columns
328
    RuntimeError
329
        Unable to get columns from database
330
    """
331
    with qdb.sql_connection.TRN:
332
        sql = """SELECT column_name FROM information_schema.columns
333
                 WHERE table_name = %s"""
334
        qdb.sql_connection.TRN.add(sql, [table])
335
        cols = qdb.sql_connection.TRN.execute_fetchflatten()
336
        # Test needed because a user with certain permissions can query without
337
        # error but be unable to get the column names
338
        if len(cols) == 0:
339
            raise RuntimeError("Unable to fetch column names for table %s"
340
                               % table)
341
        if len(set(keys).difference(cols)) > 0:
342
            raise qdb.exceptions.QiitaDBColumnError(
343
                "Non-database keys found: %s" % set(keys).difference(cols))
344
345
346
def get_table_cols(table):
347
    """Returns the column headers of table
348
349
    Parameters
350
    ----------
351
    table : str
352
        The table name
353
354
    Returns
355
    -------
356
    list of str
357
        The column headers of `table`
358
    """
359
    with qdb.sql_connection.TRN:
360
        sql = """SELECT column_name FROM information_schema.columns
361
                 WHERE table_name=%s AND table_schema='qiita'"""
362
        qdb.sql_connection.TRN.add(sql, [table])
363
        return qdb.sql_connection.TRN.execute_fetchflatten()
364
365
366
def exists_table(table):
367
    r"""Checks if `table` exists on the database
368
369
    Parameters
370
    ----------
371
    table : str
372
        The table name to check if exists
373
374
    Returns
375
    -------
376
    bool
377
        Whether `table` exists on the database or not
378
    """
379
    with qdb.sql_connection.TRN:
380
        sql = """SELECT exists(
381
                    SELECT table_name FROM information_schema.tables
382
                    WHERE table_name=%s)"""
383
        qdb.sql_connection.TRN.add(sql, [table])
384
        return qdb.sql_connection.TRN.execute_fetchlast()
385
386
387
def get_db_files_base_dir():
388
    r"""Returns the path to the base directory of all db files
389
390
    Returns
391
    -------
392
    str
393
        The path to the base directory of all db files
394
    """
395
    with qdb.sql_connection.TRN:
396
        qdb.sql_connection.TRN.add("SELECT base_data_dir FROM settings")
397
        basedir = qdb.sql_connection.TRN.execute_fetchlast()
398
        # making sure that it never ends in a "/" as most tests expect this
399
        if basedir.endswith("/"):
400
            basedir = basedir[:-1]
401
        return basedir
402
403
404
def get_work_base_dir():
405
    r"""Returns the path to the base directory of all db files
406
407
    Returns
408
    -------
409
    str
410
        The path to the base directory of all db files
411
    """
412
    with qdb.sql_connection.TRN:
413
        qdb.sql_connection.TRN.add("SELECT base_work_dir FROM settings")
414
        return qdb.sql_connection.TRN.execute_fetchlast()
415
416
417
def max_preparation_samples():
418
    r"""Returns the max number of samples allowed in a single preparation
419
420
    Returns
421
    -------
422
    int
423
        The max number of samples allowed in a single preparation
424
    """
425
    with qdb.sql_connection.TRN:
426
        qdb.sql_connection.TRN.add(
427
            "SELECT max_preparation_samples FROM settings")
428
        return qdb.sql_connection.TRN.execute_fetchlast()
429
430
431
def max_artifacts_in_workflow():
432
    r"""Returns the max number of artifacts allowed in a single workflow
433
434
    Returns
435
    -------
436
    int
437
        The max number of artifacts allowed in a single workflow
438
    """
439
    with qdb.sql_connection.TRN:
440
        qdb.sql_connection.TRN.add(
441
            "SELECT max_artifacts_in_workflow FROM settings")
442
        return qdb.sql_connection.TRN.execute_fetchlast()
443
444
445
def compute_checksum(path):
446
    r"""Returns the checksum of the file pointed by path
447
448
    Parameters
449
    ----------
450
    path : str
451
        The path to compute the checksum
452
453
    Returns
454
    -------
455
    int
456
        The file checksum
457
    """
458
    filepaths = []
459
    if isdir(path):
460
        for name, dirs, files in walk(path):
461
            join_f = partial(join, name)
462
            filepaths.extend(list(map(join_f, files)))
463
    else:
464
        filepaths.append(path)
465
466
    buffersize = 65536
467
    crcvalue = 0
468
    for fp in filepaths:
469
        with open(fp, 'rb') as f:
470
            buffr = f.read(buffersize)
471
            while len(buffr) > 0:
472
                crcvalue = crc32(buffr, crcvalue)
473
                buffr = f.read(buffersize)
474
    # We need the & 0xFFFFFFFF in order to get the same numeric value across
475
    # all python versions and platforms
476
    return crcvalue & 0xFFFFFFFF
477
478
479
def get_files_from_uploads_folders(study_id):
480
    """Retrieve files in upload folders
481
482
    Parameters
483
    ----------
484
    study_id : str
485
        The study id of which to retrieve all upload folders
486
487
    Returns
488
    -------
489
    list
490
        List of the filepaths for upload for that study
491
    """
492
    study_id = str(study_id)
493
    fp = []
494
    for pid, p in get_mountpoint("uploads", retrieve_all=True):
495
        t = join(p, study_id)
496
        if exists(t):
497
            for f in listdir(t):
498
                d = join(t, f)
499
                if not f.startswith('.') and not isdir(d):
500
                    fp.append((pid, f, naturalsize(getsize(d), gnu=True)))
501
502
    return fp
503
504
505
def move_upload_files_to_trash(study_id, files_to_move):
506
    """Move files to a trash folder within the study_id upload folder
507
508
    Parameters
509
    ----------
510
    study_id : int
511
        The study id
512
    files_to_move : list
513
        List of tuples (folder_id, filename)
514
515
    Raises
516
    ------
517
    QiitaDBError
518
        If folder_id or the study folder don't exist and if the filename to
519
        erase matches the trash_folder, internal variable
520
    """
521
    trash_folder = 'trash'
522
    folders = {k: v for k, v in get_mountpoint("uploads", retrieve_all=True)}
523
524
    for fid, filename in files_to_move:
525
        if filename == trash_folder:
526
            raise qdb.exceptions.QiitaDBError(
527
                "You can not erase the trash folder: %s" % trash_folder)
528
529
        if fid not in folders:
530
            raise qdb.exceptions.QiitaDBError(
531
                "The filepath id: %d doesn't exist in the database" % fid)
532
533
        foldername = join(folders[fid], str(study_id))
534
        if not exists(foldername):
535
            raise qdb.exceptions.QiitaDBError(
536
                "The upload folder for study id: %d doesn't exist" % study_id)
537
538
        trashpath = join(foldername, trash_folder)
539
        create_nested_path(trashpath)
540
541
        fullpath = join(foldername, filename)
542
        new_fullpath = join(foldername, trash_folder, filename)
543
544
        if exists(fullpath):
545
            move(fullpath, new_fullpath)
546
547
548
def get_mountpoint(mount_type, retrieve_all=False, retrieve_subdir=False):
549
    r""" Returns the most recent values from data directory for the given type
550
551
    Parameters
552
    ----------
553
    mount_type : str
554
        The data mount type
555
    retrieve_all : bool, optional
556
        Retrieve all the available mount points or just the active one.
557
        Default: False.
558
    retrieve_subdir : bool, optional
559
        Retrieve the subdirectory column. Default: False.
560
561
    Returns
562
    -------
563
    list
564
        List of tuple, where: [(id_mountpoint, filepath_of_mountpoint)]
565
    """
566
    with qdb.sql_connection.TRN:
567
        if retrieve_all:
568
            sql = """SELECT data_directory_id, mountpoint, subdirectory
569
                     FROM qiita.data_directory
570
                     WHERE data_type=%s ORDER BY active DESC"""
571
        else:
572
            sql = """SELECT data_directory_id, mountpoint, subdirectory
573
                     FROM qiita.data_directory
574
                     WHERE data_type=%s AND active=true"""
575
        qdb.sql_connection.TRN.add(sql, [mount_type])
576
        db_result = qdb.sql_connection.TRN.execute_fetchindex()
577
        basedir = get_db_files_base_dir()
578
        if retrieve_subdir:
579
            result = [(d, join(basedir, m), s) for d, m, s in db_result]
580
        else:
581
            result = [(d, join(basedir, m)) for d, m, _ in db_result]
582
        return result
583
584
585
def get_mountpoint_path_by_id(mount_id):
586
    r""" Returns the mountpoint path for the mountpoint with id = mount_id
587
588
    Parameters
589
    ----------
590
    mount_id : int
591
        The mountpoint id
592
593
    Returns
594
    -------
595
    str
596
        The mountpoint path
597
    """
598
    with qdb.sql_connection.TRN:
599
        sql = """SELECT mountpoint FROM qiita.data_directory
600
                 WHERE data_directory_id=%s"""
601
        qdb.sql_connection.TRN.add(sql, [mount_id])
602
        mountpoint = qdb.sql_connection.TRN.execute_fetchlast()
603
        return join(get_db_files_base_dir(), mountpoint)
604
605
606
def insert_filepaths(filepaths, obj_id, table, move_files=True, copy=False):
607
    r"""Inserts `filepaths` in the database.
608
609
    Since the files live outside the database, the directory in which the files
610
    lives is controlled by the database, so it moves the filepaths from
611
    its original location to the controlled directory.
612
613
    Parameters
614
    ----------
615
    filepaths : iterable of tuples (str, int)
616
        The list of paths to the raw files and its filepath type identifier
617
    obj_id : int
618
        Id of the object calling the functions. Disregarded if move_files
619
        is False
620
    table : str
621
        Table that holds the file data
622
    move_files : bool, optional
623
        Whether or not to move the given filepaths to the db filepaths
624
        default: True
625
    copy : bool, optional
626
        If `move_files` is true, whether to actually move the files or just
627
        copy them
628
629
    Returns
630
    -------
631
    list of int
632
        List of the filepath_id in the database for each added filepath
633
    """
634
    with qdb.sql_connection.TRN:
635
        new_filepaths = filepaths
636
637
        dd_id, mp, subdir = get_mountpoint(table, retrieve_subdir=True)[0]
638
        base_fp = join(get_db_files_base_dir(), mp)
639
640
        if move_files or copy:
641
            db_path = partial(join, base_fp)
642
            if subdir:
643
                # Generate the new filepaths, format:
644
                # mountpoint/obj_id/original_name
645
                dirname = db_path(str(obj_id))
646
                create_nested_path(dirname)
647
                new_filepaths = [
648
                    (join(dirname, basename(path)), id_)
649
                    for path, id_ in filepaths]
650
            else:
651
                # Generate the new fileapths. format:
652
                # mountpoint/DataId_OriginalName
653
                new_filepaths = [
654
                    (db_path("%s_%s" % (obj_id, basename(path))), id_)
655
                    for path, id_ in filepaths]
656
            # Move the original files to the controlled DB directory
657
            transfer_function = shutil_copy if copy else move
658
            for old_fp, new_fp in zip(filepaths, new_filepaths):
659
                transfer_function(old_fp[0], new_fp[0])
660
                # In case the transaction executes a rollback, we need to
661
                # make sure the files have not been moved
662
                qdb.sql_connection.TRN.add_post_rollback_func(
663
                    move, new_fp[0], old_fp[0])
664
665
        def str_to_id(x):
666
            return (x if isinstance(x, int)
667
                    else convert_to_id(x, "filepath_type"))
668
        # 1 is the checksum algorithm, which we only have one implemented
669
        values = [[basename(path), str_to_id(id_), compute_checksum(path),
670
                   getsize(path), 1, dd_id] for path, id_ in new_filepaths]
671
        # Insert all the filepaths at once and get the filepath_id back
672
        sql = """INSERT INTO qiita.filepath
673
                    (filepath, filepath_type_id, checksum, fp_size,
674
                     checksum_algorithm_id, data_directory_id)
675
                 VALUES (%s, %s, %s, %s, %s, %s)
676
                 RETURNING filepath_id"""
677
        idx = qdb.sql_connection.TRN.index
678
        qdb.sql_connection.TRN.add(sql, values, many=True)
679
        # Since we added the query with many=True, we've added len(values)
680
        # queries to the transaction, so the ids are in the last idx queries
681
        return list(chain.from_iterable(
682
            chain.from_iterable(qdb.sql_connection.TRN.execute()[idx:])))
683
684
685
def _path_builder(db_dir, filepath, mountpoint, subdirectory, obj_id):
686
    """Builds the path of a DB stored file
687
688
    Parameters
689
    ----------
690
    db_dir : str
691
        The DB base dir
692
    filepath : str
693
        The path stored in the DB
694
    mountpoint : str
695
        The mountpoint of the given file
696
    subdirectory : bool
697
        Whether the file is stored in a subdirectory in the mountpoint or not
698
    obj_id : int
699
        The id of the object to which the file is attached
700
701
    Returns
702
    -------
703
    str
704
        The full path of the given file
705
    """
706
    if subdirectory:
707
        return join(db_dir, mountpoint, str(obj_id), filepath)
708
    else:
709
        return join(db_dir, mountpoint, filepath)
710
711
712
def retrieve_filepaths(obj_fp_table, obj_id_column, obj_id, sort=None,
713
                       fp_type=None):
714
    """Retrieves the filepaths for the given object id
715
716
    Parameters
717
    ----------
718
    obj_fp_table : str
719
        The name of the table that links the object and the filepath
720
    obj_id_column : str
721
        The name of the column that represents the object id
722
    obj_id : int
723
        The object id
724
    sort : {'ascending', 'descending'}, optional
725
        The direction in which the results are sorted, using the filepath id
726
        as sorting key. Default: None, no sorting is applied
727
    fp_type: str, optional
728
        Retrieve only the filepaths of the matching filepath type
729
730
    Returns
731
    -------
732
    list of dict {fp_id, fp, ft_type, checksum, fp_size}
733
        The list of dict with the properties of the filepaths
734
    """
735
736
    sql_sort = ""
737
    if sort == 'ascending':
738
        sql_sort = " ORDER BY filepath_id"
739
    elif sort == 'descending':
740
        sql_sort = " ORDER BY filepath_id DESC"
741
    elif sort is not None:
742
        raise qdb.exceptions.QiitaDBError(
743
            "Unknown sorting direction: %s. Please choose from 'ascending' or "
744
            "'descending'" % sort)
745
746
    sql_args = [obj_id]
747
748
    sql_type = ""
749
    if fp_type:
750
        sql_type = " AND filepath_type=%s"
751
        sql_args.append(fp_type)
752
753
    with qdb.sql_connection.TRN:
754
        sql = """SELECT filepath_id, filepath, filepath_type, mountpoint,
755
                        subdirectory, checksum, fp_size
756
                 FROM qiita.filepath
757
                    JOIN qiita.filepath_type USING (filepath_type_id)
758
                    JOIN qiita.data_directory USING (data_directory_id)
759
                    JOIN qiita.{0} USING (filepath_id)
760
                 WHERE {1} = %s{2}{3}""".format(obj_fp_table, obj_id_column,
761
                                                sql_type, sql_sort)
762
        qdb.sql_connection.TRN.add(sql, sql_args)
763
        results = qdb.sql_connection.TRN.execute_fetchindex()
764
        db_dir = get_db_files_base_dir()
765
766
        return [{'fp_id': fpid, 'fp': _path_builder(db_dir, fp, m, s, obj_id),
767
                 'fp_type': fp_type_, 'checksum': c, 'fp_size': fpsize}
768
                for fpid, fp, fp_type_, m, s, c, fpsize in results]
769
770
771
def _rm_files(TRN, fp):
772
    # Remove the data
773
    if exists(fp):
774
        if isdir(fp):
775
            func = rmtree
776
        else:
777
            func = remove
778
        TRN.add_post_commit_func(func, fp)
779
780
781
def purge_filepaths(delete_files=True):
782
    r"""Goes over the filepath table and removes all the filepaths that are not
783
    used in any place
784
785
    Parameters
786
    ----------
787
    delete_files : bool
788
        if True it will actually delete the files, if False print
789
    """
790
    with qdb.sql_connection.TRN:
791
        files_to_remove = []
792
        # qiita can basically download 5 things: references, info files,
793
        # artifacts, analyses & working_dir.
794
        # 1. references are not longer used so we can skip
795
796
        # 2. info files: here we could remove all old info files (the backup we
797
        #    keep when a user uploads a new file) and all info files from
798
        #    studies that no longer exist. We want to keep the old templates
799
        #    so we can recover them (this has happened before) but let's remove
800
        #    those from deleted studies. Note that we need to check for sample,
801
        #    prep and qiime info files
802
        st_id = qdb.util.convert_to_id('sample_template', "filepath_type")
803
        pt_id = qdb.util.convert_to_id('prep_template', "filepath_type")
804
        qt_id = qdb.util.convert_to_id('qiime_map', "filepath_type")
805
        sql = """SELECT filepath_id, filepath FROM qiita.filepath
806
                 WHERE filepath_type_id IN %s AND filepath ~ '^[0-9]' AND
807
                    data_directory_id = %s AND filepath_id NOT IN (
808
                        SELECT filepath_id FROM qiita.prep_template_filepath
809
                        UNION
810
                        SELECT filepath_id FROM qiita.sample_template_filepath)
811
              """
812
        for mp_id, mp in get_mountpoint('templates'):
813
            qdb.sql_connection.TRN.add(
814
                sql, [tuple([st_id, pt_id, qt_id]), mp_id])
815
            studies_exits = []
816
            studies_erased = []
817
            for fid, fp in qdb.sql_connection.TRN.execute_fetchindex():
818
                # making sure the studies do _not_ exist, remember info files
819
                # are prepended by the study id
820
                study_id = int(fp.split('_')[0])
821
                if study_id in studies_exits:
822
                    continue
823
                elif study_id in studies_erased:
824
                    fpath = qdb.util.get_filepath_information(
825
                        fid)['fullpath']
826
                    files_to_remove.append([fid, fpath])
827
                else:
828
                    try:
829
                        qdb.study.Study(study_id)
830
                    except qdb.exceptions.QiitaDBUnknownIDError:
831
                        fpath = qdb.util.get_filepath_information(
832
                            fid)['fullpath']
833
                        files_to_remove.append([fid, fpath])
834
                        studies_erased.append(study_id)
835
                    else:
836
                        studies_exits.append(study_id)
837
838
        # 3. artifacts: [A] the difficulty of deleting artifacts is that (1)
839
        #    they live in different mounts, (2) as inidividual folders [the
840
        #    artifact id], (3) and the artifact id within the database has
841
        #    been lost. Thus, the easiest is to loop over the different data
842
        #    directories (mounts), get the folder names (artifact ids), and
843
        #    check if they exist; if they don't let's delete them. [B] As an
844
        #    additional and final step, we need to purge these filepaths from
845
        #    the DB.
846
        #    [A]
847
        main_sql = """SELECT data_directory_id FROM qiita.artifact_type at
848
                        LEFT JOIN qiita.data_directory dd ON (
849
                            dd.data_type = at.artifact_type)
850
                        WHERE subdirectory = true"""
851
        qdb.sql_connection.TRN.add(main_sql)
852
        for mp_id in qdb.sql_connection.TRN.execute_fetchflatten():
853
            mount = get_mountpoint_path_by_id(mp_id)
854
            for fpath in listdir(mount):
855
                full_fpath = join(mount, fpath)
856
                if isdir(full_fpath):
857
                    try:
858
                        qdb.artifact.Artifact(int(fpath))
859
                    except qdb.exceptions.QiitaDBUnknownIDError:
860
                        files_to_remove.append([None, full_fpath])
861
                    else:
862
                        continue
863
        #    [B]
864
        sql = """SELECT filepath_id FROM qiita.filepath
865
                 WHERE filepath_id not in (
866
                    SELECT filepath_id FROM qiita.artifact_filepath) AND
867
                data_directory_id in (
868
                    SELECT data_directory_id FROM qiita.artifact_type at
869
                        LEFT JOIN qiita.data_directory dd ON (
870
                            dd.data_type = at.artifact_type)
871
                    WHERE subdirectory = true)
872
              """
873
        qdb.sql_connection.TRN.add(sql)
874
        for fid in qdb.sql_connection.TRN.execute_fetchflatten():
875
            fpath = qdb.util.get_filepath_information(fid)['fullpath']
876
            aid = fpath.split('/')[-2]
877
            # making sure the artifact doesn't exist any more
878
            if aid == 'None':
879
                files_to_remove.append([fid, None])
880
881
        # 4. analysis: we need to select all the filepaths that are not in
882
        #    the analysis_filepath, this will return both all filepaths not
883
        #    from analyses and those that are not being used, thus, we need
884
        #    to also not select those files that are not part of the artifacts
885
        #    by ignoring those files paths not stored in a data_directory from
886
        #    an artifact:
887
        sql = """SELECT filepath_id FROM qiita.filepath
888
                 WHERE filepath_id not in (
889
                    SELECT filepath_id FROM qiita.analysis_filepath) AND
890
                data_directory_id in (
891
                    SELECT data_directory_id FROM qiita.data_directory
892
                    WHERE data_type = 'analysis')
893
              """
894
        qdb.sql_connection.TRN.add(sql)
895
        for fid in qdb.sql_connection.TRN.execute_fetchflatten():
896
            fdata = qdb.util.get_filepath_information(fid)
897
            analysis_id = int(fdata['filepath'].split('_')[0])
898
            # making sure the Analysis doesn't exist
899
            if not qdb.analysis.Analysis.exists(analysis_id):
900
                fpath = fdata['fullpath']
901
                files_to_remove.append([fid, fpath])
902
903
        # 5. working directory: this is done internally in the Qiita system via
904
        #    a cron job
905
906
        # Deleting the files!
907
        sql = "DELETE FROM qiita.filepath WHERE filepath_id = %s"
908
        for fid, fpath in files_to_remove:
909
            if delete_files:
910
                if fid is not None:
911
                    qdb.sql_connection.TRN.add(sql, [fid])
912
                if fpath is not None:
913
                    _rm_files(qdb.sql_connection.TRN, fpath)
914
            else:
915
                print('%s: %s' % (fid, fpath))
916
917
        if delete_files:
918
            # there is a chance that we will never enter the above
919
            # "if fid is not None" statement so we will add an extra SQL
920
            # command just to make sure that something gets executed
921
            qdb.sql_connection.TRN.add("SELECT 42")
922
923
            qdb.sql_connection.TRN.execute()
924
925
926
def quick_mounts_purge():
927
    r"""This is a quick mount purge as it only slightly relies on the database
928
929
    Notes
930
    -----
931
        Currently we delete anything older than 30 days that is not linked
932
        to the database. This number is intentionally hardcoded in the code.
933
        At the time of this writing this number seem high but keeping it
934
        this way to be safe. In the future, if needed, it can be changed.
935
    """
936
    with qdb.sql_connection.TRN:
937
        main_sql = """SELECT data_directory_id FROM qiita.artifact_type at
938
                  LEFT JOIN qiita.data_directory dd ON (
939
                      dd.data_type = at.artifact_type)
940
                  WHERE subdirectory = true"""
941
        qdb.sql_connection.TRN.add(main_sql)
942
        mp_ids = qdb.sql_connection.TRN.execute_fetchflatten()
943
        mounts = [qdb.util.get_mountpoint_path_by_id(x) for x in mp_ids]
944
        folders = [join(x, f) for x in mounts for f in listdir(x)
945
                   if f.isnumeric()]
946
947
    # getting all unlinked folders
948
    to_delete = []
949
    for i, f in enumerate(folders):
950
        vals = f.split('/')
951
        aid = int(vals[-1])
952
        artifact_type = vals[-2]
953
        if artifact_type == 'FeatureData[Taxonomy]':
954
            continue
955
956
        try:
957
            a = qdb.artifact.Artifact(aid)
958
        except qdb.exceptions.QiitaDBUnknownIDError:
959
            to_delete.append(f)
960
            continue
961
        if not a.artifact_type.startswith(artifact_type):
962
            raise ValueError('Review artifact type: '
963
                             f'{a.id} {artifact_type} {a.artifact_type}')
964
965
    # now, let's just keep those older than 30 days (in seconds)
966
    ignore = now() - (30*86400)
967
    to_keep = [x for x in to_delete if stat(x).st_mtime >= ignore]
968
    to_delete = set(to_delete) - set(to_keep)
969
970
    # get stats to report
971
    stats = dict()
972
    for td in to_delete:
973
        f = td.split('/')[-2]
974
        if f not in stats:
975
            stats[f] = 0
976
        stats[f] += sum([getsize(join(p, fp)) for p, ds, fs in walk(td)
977
                         for fp in fs])
978
979
    report = ['----------------------']
980
    for f, s in stats.items():
981
        report.append(f'{f}\t{naturalsize(s)}')
982
    report.append(
983
        f'Total files {len(to_delete)} {naturalsize(sum(stats.values()))}')
984
    report.append('----------------------')
985
986
    for td in list(to_delete):
987
        if exists(td):
988
            rmtree(td)
989
990
    return '\n'.join(report)
991
992
993
def _rm_exists(fp, obj, _id, delete_files):
994
    try:
995
        _id = int(_id)
996
        obj(_id)
997
    except Exception:
998
        _id = str(_id)
999
        if delete_files:
1000
            with qdb.sql_connection.TRN:
1001
                _rm_files(qdb.sql_connection.TRN, fp)
1002
                qdb.sql_connection.TRN.execute()
1003
        else:
1004
            print("Remove %s" % fp)
1005
1006
1007
def empty_trash_upload_folder(delete_files=True):
1008
    r"""Delete all files in the trash folder inside each of the upload
1009
    folders
1010
1011
    Parameters
1012
    ----------
1013
    delete_files : bool
1014
        if True it will actually delete the files, if False print
1015
    """
1016
    gfp = partial(join, get_db_files_base_dir())
1017
    with qdb.sql_connection.TRN:
1018
        sql = """SELECT mountpoint
1019
                 FROM qiita.data_directory
1020
                 WHERE data_type = 'uploads'"""
1021
        qdb.sql_connection.TRN.add(sql)
1022
1023
        for mp in qdb.sql_connection.TRN.execute_fetchflatten():
1024
            for path, dirs, files in walk(gfp(mp)):
1025
                if path.endswith('/trash'):
1026
                    if delete_files:
1027
                        for f in files:
1028
                            fp = join(path, f)
1029
                            _rm_files(qdb.sql_connection.TRN, fp)
1030
                    else:
1031
                        print(files)
1032
1033
        if delete_files:
1034
            qdb.sql_connection.TRN.execute()
1035
1036
1037
def move_filepaths_to_upload_folder(study_id, filepaths):
1038
    r"""Goes over the filepaths list and moves all the filepaths that are not
1039
    used in any place to the upload folder of the study
1040
1041
    Parameters
1042
    ----------
1043
    study_id : int
1044
        The study id to where the files should be returned to
1045
    filepaths : list
1046
        List of filepaths to move to the upload folder
1047
    """
1048
    with qdb.sql_connection.TRN:
1049
        uploads_fp = join(get_mountpoint("uploads")[0][1], str(study_id))
1050
1051
        create_nested_path(uploads_fp)
1052
1053
        path_builder = partial(join, uploads_fp)
1054
1055
        # do not move these files-types back to upload folder.
1056
        do_not_move = ['preprocessed_fasta', 'preprocessed_fastq',
1057
                       'preprocessed_demux', 'directory', 'log',
1058
                       'html_summary', 'tgz', 'html_summary_dir', 'qzv', 'qza']
1059
1060
        # We can now go over and remove all the filepaths
1061
        sql = """DELETE FROM qiita.filepath WHERE filepath_id = %s"""
1062
        for x in filepaths:
1063
            qdb.sql_connection.TRN.add(sql, [x['fp_id']])
1064
1065
            if x['fp_type'] in do_not_move:
1066
                _rm_files(qdb.sql_connection.TRN, x['fp'])
1067
                continue
1068
1069
            # if files were not removed, then they should be moved.
1070
            destination = path_builder(basename(x['fp']))
1071
            qdb.sql_connection.TRN.add_post_rollback_func(move,
1072
                                                          destination,
1073
                                                          x['fp'])
1074
            move(x['fp'], destination)
1075
1076
        qdb.sql_connection.TRN.execute()
1077
1078
1079
def get_filepath_information(filepath_id):
1080
    """Gets the filepath information of filepath_id
1081
1082
    Parameters
1083
    ----------
1084
    filepath_id : int
1085
        The filepath id
1086
1087
    Returns
1088
    -------
1089
    dict
1090
        The filepath information
1091
    """
1092
    with qdb.sql_connection.TRN:
1093
        sql = """SELECT filepath_id, filepath, filepath_type, checksum,
1094
                        data_type, mountpoint, subdirectory, active,
1095
                        artifact_id
1096
                 FROM qiita.filepath
1097
                    JOIN qiita.filepath_type USING (filepath_type_id)
1098
                    JOIN qiita.data_directory USING (data_directory_id)
1099
                    LEFT JOIN qiita.artifact_filepath USING (filepath_id)
1100
                 WHERE filepath_id = %s"""
1101
        qdb.sql_connection.TRN.add(sql, [filepath_id])
1102
        res = dict(qdb.sql_connection.TRN.execute_fetchindex()[0])
1103
1104
        obj_id = res.pop('artifact_id')
1105
        res['fullpath'] = _path_builder(get_db_files_base_dir(),
1106
                                        res['filepath'], res['mountpoint'],
1107
                                        res['subdirectory'], obj_id)
1108
        return res
1109
1110
1111
def filepath_id_to_rel_path(filepath_id):
1112
    """Gets the relative to the base directory of filepath_id
1113
1114
    Returns
1115
    -------
1116
    str
1117
        The relative path for the given filepath id
1118
    """
1119
    with qdb.sql_connection.TRN:
1120
        sql = """SELECT mountpoint, filepath, subdirectory, artifact_id
1121
                 FROM qiita.filepath
1122
                    JOIN qiita.data_directory USING (data_directory_id)
1123
                    LEFT JOIN qiita.artifact_filepath USING (filepath_id)
1124
                 WHERE filepath_id = %s"""
1125
        qdb.sql_connection.TRN.add(sql, [filepath_id])
1126
        # It should be only one row
1127
        mp, fp, sd, a_id = qdb.sql_connection.TRN.execute_fetchindex()[0]
1128
        if sd:
1129
            result = join(mp, str(a_id), fp)
1130
        else:
1131
            result = join(mp, fp)
1132
        return result
1133
1134
1135
def filepath_id_to_object_id(filepath_id):
1136
    """Gets the object id to which the filepath id belongs to
1137
1138
    Returns
1139
    -------
1140
    int
1141
        The object id the filepath id belongs to or None if not found
1142
1143
    Notes
1144
    -----
1145
    This helper function is intended to be used with the download handler so
1146
    we can prepend downloads with the artifact id; thus, we will only look for
1147
    filepath ids in qiita.analysis_filepath and qiita.artifact_filepath as
1148
    search in qiita.reference, qiita.prep_template_filepath and
1149
    qiita.sample_template_filepath will make the naming redundat (those already
1150
    have the study_id in their filename)
1151
    """
1152
    with qdb.sql_connection.TRN:
1153
        sql = """
1154
            SELECT analysis_id FROM qiita.analysis_filepath
1155
                WHERE filepath_id = %s UNION
1156
            SELECT artifact_id FROM qiita.artifact_filepath
1157
                WHERE filepath_id = %s"""
1158
        qdb.sql_connection.TRN.add(sql, [filepath_id, filepath_id])
1159
        fids = sorted(qdb.sql_connection.TRN.execute_fetchflatten())
1160
        if fids:
1161
            return fids[0]
1162
        return None
1163
1164
1165
def filepath_ids_to_rel_paths(filepath_ids):
1166
    """Gets the full paths, relative to the base directory
1167
1168
    Parameters
1169
    ----------
1170
    filepath_ids : list of int
1171
1172
    Returns
1173
    -------
1174
    dict where keys are ints and values are str
1175
        {filepath_id: relative_path}
1176
    """
1177
    if not filepath_ids:
1178
        return {}
1179
1180
    with qdb.sql_connection.TRN:
1181
        sql = """SELECT filepath_id, mountpoint, filepath, subdirectory,
1182
                        artifact_id
1183
                 FROM qiita.filepath
1184
                    JOIN qiita.data_directory USING (data_directory_id)
1185
                    LEFT JOIN qiita.artifact_filepath USING (filepath_id)
1186
                 WHERE filepath_id IN %s"""
1187
        qdb.sql_connection.TRN.add(sql, [tuple(filepath_ids)])
1188
        res = {}
1189
        for row in qdb.sql_connection.TRN.execute_fetchindex():
1190
            if row[3]:
1191
                res[row[0]] = join(row[1], str(row[4]), row[2])
1192
            else:
1193
                res[row[0]] = join(row[1], row[2])
1194
        return res
1195
1196
1197
def convert_to_id(value, table, text_col=None):
1198
    """Converts a string value to its corresponding table identifier
1199
1200
    Parameters
1201
    ----------
1202
    value : str
1203
        The string value to convert
1204
    table : str
1205
        The table that has the conversion
1206
    text_col : str, optional
1207
        Column holding the string value. Defaults to same as table name.
1208
1209
    Returns
1210
    -------
1211
    int
1212
        The id correspinding to the string
1213
1214
    Raises
1215
    ------
1216
    QiitaDBLookupError
1217
        The passed string has no associated id
1218
    """
1219
    text_col = table if text_col is None else text_col
1220
    with qdb.sql_connection.TRN:
1221
        sql = "SELECT {0}_id FROM qiita.{0} WHERE {1} = %s".format(
1222
            table, text_col)
1223
        qdb.sql_connection.TRN.add(sql, [value])
1224
        _id = qdb.sql_connection.TRN.execute_fetchindex()
1225
        if not _id:
1226
            raise qdb.exceptions.QiitaDBLookupError(
1227
                "%s not valid for table %s" % (value, table))
1228
        # If there was a result it was a single row and and single value,
1229
        # hence access to [0][0]
1230
        return _id[0][0]
1231
1232
1233
def convert_from_id(value, table):
1234
    """Converts an id value to its corresponding string value
1235
1236
    Parameters
1237
    ----------
1238
    value : int
1239
        The id value to convert
1240
    table : str
1241
        The table that has the conversion
1242
1243
    Returns
1244
    -------
1245
    str
1246
        The string correspinding to the id
1247
1248
    Raises
1249
    ------
1250
    QiitaDBLookupError
1251
        The passed id has no associated string
1252
    """
1253
    with qdb.sql_connection.TRN:
1254
        sql = "SELECT {0} FROM qiita.{0} WHERE {0}_id = %s".format(table)
1255
        qdb.sql_connection.TRN.add(sql, [value])
1256
        string = qdb.sql_connection.TRN.execute_fetchindex()
1257
        if not string:
1258
            raise qdb.exceptions.QiitaDBLookupError(
1259
                "%s not valid for table %s" % (value, table))
1260
        # If there was a result it was a single row and and single value,
1261
        # hence access to [0][0]
1262
        return string[0][0]
1263
1264
1265
def get_count(table):
1266
    """Counts the number of rows in a table
1267
1268
    Parameters
1269
    ----------
1270
    table : str
1271
        The name of the table of which to count the rows
1272
1273
    Returns
1274
    -------
1275
    int
1276
    """
1277
    with qdb.sql_connection.TRN:
1278
        sql = "SELECT count(1) FROM %s" % table
1279
        qdb.sql_connection.TRN.add(sql)
1280
        return qdb.sql_connection.TRN.execute_fetchlast()
1281
1282
1283
def check_count(table, exp_count):
1284
    """Checks that the number of rows in a table equals the expected count
1285
1286
    Parameters
1287
    ----------
1288
    table : str
1289
        The name of the table of which to count the rows
1290
    exp_count : int
1291
        The expected number of rows in the table
1292
1293
    Returns
1294
    -------
1295
    bool
1296
    """
1297
    obs_count = get_count(table)
1298
    return obs_count == exp_count
1299
1300
1301
def get_environmental_packages():
1302
    """Get the list of available environmental packages
1303
1304
    Returns
1305
    -------
1306
    list of (str, str)
1307
        The available environmental packages. The first string is the
1308
        environmental package name and the second string is the table where
1309
        the metadata for the environmental package is stored
1310
    """
1311
    with qdb.sql_connection.TRN:
1312
        qdb.sql_connection.TRN.add("SELECT * FROM qiita.environmental_package")
1313
        return qdb.sql_connection.TRN.execute_fetchindex()
1314
1315
1316
def get_visibilities():
1317
    """Get the list of available visibilities for artifacts
1318
1319
    Returns
1320
    -------
1321
    list of str
1322
        The available visibilities
1323
    """
1324
    with qdb.sql_connection.TRN:
1325
        qdb.sql_connection.TRN.add("SELECT visibility FROM qiita.visibility")
1326
        return qdb.sql_connection.TRN.execute_fetchflatten()
1327
1328
1329
def get_timeseries_types():
1330
    """Get the list of available timeseries types
1331
1332
    Returns
1333
    -------
1334
    list of (int, str, str)
1335
        The available timeseries types. Each timeseries type is defined by the
1336
        tuple (timeseries_id, timeseries_type, intervention_type)
1337
    """
1338
    with qdb.sql_connection.TRN:
1339
        sql = "SELECT * FROM qiita.timeseries_type ORDER BY timeseries_type_id"
1340
        qdb.sql_connection.TRN.add(sql)
1341
        return qdb.sql_connection.TRN.execute_fetchindex()
1342
1343
1344
def get_pubmed_ids_from_dois(doi_ids):
1345
    """Get the dict of pubmed ids from a list of doi ids
1346
1347
    Parameters
1348
    ----------
1349
    doi_ids : list of str
1350
        The list of doi ids
1351
1352
    Returns
1353
    -------
1354
    dict of {doi: pubmed_id}
1355
        Return dict of doi and pubmed ids
1356
1357
    Notes
1358
    -----
1359
    If doi doesn't exist it will not return that {key: value} pair
1360
    """
1361
    with qdb.sql_connection.TRN:
1362
        sql = "SELECT doi, pubmed_id FROM qiita.publication WHERE doi IN %s"
1363
        qdb.sql_connection.TRN.add(sql, [tuple(doi_ids)])
1364
        return {row[0]: row[1]
1365
                for row in qdb.sql_connection.TRN.execute_fetchindex()}
1366
1367
1368
def infer_status(statuses):
1369
    """Infers an object status from the statuses passed in
1370
1371
    Parameters
1372
    ----------
1373
    statuses : list of lists of strings or empty list
1374
        The list of statuses used to infer the resulting status (the result
1375
        of execute_fetchall)
1376
1377
    Returns
1378
    -------
1379
    str
1380
        The inferred status
1381
1382
    Notes
1383
    -----
1384
    The inference is done in the following priority (high to low):
1385
        (1) public
1386
        (2) private
1387
        (3) awaiting_approval
1388
        (4) sandbox
1389
    """
1390
    if statuses:
1391
        statuses = set(s[0] for s in statuses)
1392
        if 'public' in statuses:
1393
            return 'public'
1394
        if 'private' in statuses:
1395
            return 'private'
1396
        if 'awaiting_approval' in statuses:
1397
            return 'awaiting_approval'
1398
    # If there are no statuses, or any of the previous ones have been found
1399
    # then the inferred status is 'sandbox'
1400
    return 'sandbox'
1401
1402
1403
def add_message(message, users):
1404
    """Adds a message to the messages table, attaching it to given users
1405
1406
    Parameters
1407
    ----------
1408
    message : str
1409
        Message to add
1410
    users : list of User objects
1411
        Users to connect the message to
1412
    """
1413
    with qdb.sql_connection.TRN:
1414
        sql = """INSERT INTO qiita.message (message) VALUES (%s)
1415
                 RETURNING message_id"""
1416
        qdb.sql_connection.TRN.add(sql, [message])
1417
        msg_id = qdb.sql_connection.TRN.execute_fetchlast()
1418
        sql = """INSERT INTO qiita.message_user (email, message_id)
1419
                 VALUES (%s, %s)"""
1420
        sql_args = [[user.id, msg_id] for user in users]
1421
        qdb.sql_connection.TRN.add(sql, sql_args, many=True)
1422
        qdb.sql_connection.TRN.execute()
1423
1424
1425
def add_system_message(message, expires):
1426
    """Adds a system message to the messages table, attaching it to asl users
1427
1428
    Parameters
1429
    ----------
1430
    message : str
1431
        Message to add
1432
    expires : datetime object
1433
        Expiration for the message
1434
    """
1435
    with qdb.sql_connection.TRN:
1436
        sql = """INSERT INTO qiita.message (message, expiration)
1437
                 VALUES (%s, %s)
1438
                 RETURNING message_id"""
1439
        qdb.sql_connection.TRN.add(sql, [message, expires])
1440
        msg_id = qdb.sql_connection.TRN.execute_fetchlast()
1441
        sql = """INSERT INTO qiita.message_user (email, message_id)
1442
                 SELECT email, %s FROM qiita.qiita_user"""
1443
        qdb.sql_connection.TRN.add(sql, [msg_id])
1444
        qdb.sql_connection.TRN.execute()
1445
1446
1447
def clear_system_messages():
1448
    with qdb.sql_connection.TRN:
1449
        sql = "SELECT message_id FROM qiita.message WHERE expiration < %s"
1450
        qdb.sql_connection.TRN.add(sql, [datetime.now()])
1451
        msg_ids = qdb.sql_connection.TRN.execute_fetchflatten()
1452
        if msg_ids:
1453
            msg_ids = tuple(msg_ids)
1454
            sql = "DELETE FROM qiita.message_user WHERE message_id IN %s"
1455
            qdb.sql_connection.TRN.add(sql, [msg_ids])
1456
            sql = "DELETE FROM qiita.message WHERE message_id IN %s"
1457
            qdb.sql_connection.TRN.add(sql, [msg_ids])
1458
            qdb.sql_connection.TRN.execute()
1459
1460
1461
def supported_filepath_types(artifact_type):
1462
    """Returns the list of supported filepath types for the given artifact type
1463
1464
    Parameters
1465
    ----------
1466
    artifact_type : str
1467
        The artifact type to check the supported filepath types
1468
1469
    Returns
1470
    -------
1471
    list of [str, bool]
1472
        The list of supported filepath types and whether it is required by the
1473
        artifact type or not
1474
    """
1475
    with qdb.sql_connection.TRN:
1476
        sql = """SELECT DISTINCT filepath_type, required
1477
                 FROM qiita.artifact_type_filepath_type
1478
                    JOIN qiita.artifact_type USING (artifact_type_id)
1479
                    JOIN qiita.filepath_type USING (filepath_type_id)
1480
                 WHERE artifact_type = %s"""
1481
        qdb.sql_connection.TRN.add(sql, [artifact_type])
1482
        return qdb.sql_connection.TRN.execute_fetchindex()
1483
1484
1485
def generate_study_list(user, visibility):
1486
    """Get general study information
1487
1488
    Parameters
1489
    ----------
1490
    user : qiita_db.user.User
1491
        The user of which we are requesting studies from
1492
    visibility : string
1493
        The visibility to get studies {'public', 'user'}
1494
1495
    Returns
1496
    -------
1497
    list of dict
1498
        The list of studies and their information
1499
1500
    Notes
1501
    -----
1502
    The main select might look scary but it's pretty simple:
1503
    - We select the requiered fields from qiita.study and qiita.study_person
1504
        SELECT metadata_complete, study_abstract, study_id, study_alias,
1505
            study_title, ebi_study_accession, autoloaded,
1506
            qiita.study_person.name AS pi_name,
1507
            qiita.study_person.email AS pi_email,
1508
    - the total number of samples collected by counting sample_ids
1509
            (SELECT COUNT(sample_id) FROM qiita.study_sample
1510
                WHERE study_id=qiita.study.study_id)
1511
                AS number_samples_collected]
1512
    - retrieve all the prep data types for all the artifacts depending on their
1513
      visibility
1514
            (SELECT array_agg(row_to_json((prep_template_id, data_type,
1515
                 artifact_id, artifact_type, deprecated,
1516
                 qiita.bioms_from_preparation_artifacts(prep_template_id)),
1517
                 true))
1518
                FROM qiita.study_prep_template
1519
                LEFT JOIN qiita.prep_template USING (prep_template_id)
1520
                LEFT JOIN qiita.data_type USING (data_type_id)
1521
                LEFT JOIN qiita.artifact USING (artifact_id)
1522
                LEFT JOIN qiita.artifact_type USING (artifact_type_id)
1523
                LEFT JOIN qiita.visibility USING (visibility_id)
1524
                WHERE {0} study_id = qiita.study.study_id)
1525
                    AS preparation_information,
1526
    - all the publications that belong to the study
1527
            (SELECT array_agg((publication, is_doi)))
1528
                FROM qiita.study_publication
1529
                WHERE study_id=qiita.study.study_id) AS publications,
1530
    - all names sorted by email of users that have access to the study
1531
            (SELECT array_agg(name ORDER BY email) FROM qiita.study_users
1532
                LEFT JOIN qiita.qiita_user USING (email)
1533
                WHERE study_id=qiita.study.study_id) AS shared_with_name,
1534
    - all emails sorted by email of users that have access to the study
1535
            (SELECT array_agg(email ORDER BY email) FROM qiita.study_users
1536
                LEFT JOIN qiita.qiita_user USING (email)
1537
                WHERE study_id=qiita.study.study_id) AS shared_with_email
1538
    - all study tags
1539
            (SELECT array_agg(study_tag) FROM qiita.per_study_tags
1540
                WHERE study_id=qiita.study.study_id) AS study_tags
1541
    - study owner
1542
            (SELECT name FROM qiita.qiita_user
1543
                WHERE email=qiita.study.email) AS owner
1544
    """
1545
1546
    visibility_sql = ''
1547
    sids = set(s.id for s in user.user_studies.union(user.shared_studies))
1548
    if visibility == 'user':
1549
        if user.level == 'admin':
1550
            sids = (sids |
1551
                    qdb.study.Study.get_ids_by_status('sandbox') |
1552
                    qdb.study.Study.get_ids_by_status('private') |
1553
                    qdb.study.Study.get_ids_by_status('awaiting_approval'))
1554
    elif visibility == 'public':
1555
        sids = qdb.study.Study.get_ids_by_status('public') - sids
1556
        visibility_sql = "visibility = 'public' AND"
1557
    else:
1558
        raise ValueError('Not a valid visibility: %s' % visibility)
1559
1560
    sql = """
1561
        SELECT metadata_complete, study_abstract, study_id, study_alias,
1562
            study_title, ebi_study_accession, autoloaded,
1563
            qiita.study_person.name AS pi_name,
1564
            qiita.study_person.email AS pi_email,
1565
            (SELECT COUNT(sample_id) FROM qiita.study_sample
1566
                WHERE study_id=qiita.study.study_id)
1567
                AS number_samples_collected,
1568
            (SELECT EXISTS(
1569
                SELECT 1 FROM qiita.study_sample
1570
                    WHERE study_id = qiita.study.study_id LIMIT 1))
1571
                    AS has_sample_info,
1572
            (SELECT array_agg(row_to_json((prep_template_id, data_type,
1573
                 artifact_id, artifact_type, deprecated,
1574
                 qiita.bioms_from_preparation_artifacts(prep_template_id)),
1575
                 true))
1576
                FROM qiita.study_prep_template
1577
                LEFT JOIN qiita.prep_template USING (prep_template_id)
1578
                LEFT JOIN qiita.data_type USING (data_type_id)
1579
                LEFT JOIN qiita.artifact USING (artifact_id)
1580
                LEFT JOIN qiita.artifact_type USING (artifact_type_id)
1581
                LEFT JOIN qiita.visibility USING (visibility_id)
1582
                WHERE {0} study_id = qiita.study.study_id)
1583
                    AS preparation_information,
1584
            (SELECT array_agg(row_to_json((publication, is_doi), true))
1585
                FROM qiita.study_publication
1586
                WHERE study_id=qiita.study.study_id) AS publications,
1587
            (SELECT array_agg(name ORDER BY email) FROM qiita.study_users
1588
                LEFT JOIN qiita.qiita_user USING (email)
1589
                WHERE study_id=qiita.study.study_id) AS shared_with_name,
1590
            (SELECT array_agg(email ORDER BY email) FROM qiita.study_users
1591
                LEFT JOIN qiita.qiita_user USING (email)
1592
                WHERE study_id=qiita.study.study_id) AS shared_with_email,
1593
            (SELECT array_agg(study_tag) FROM qiita.per_study_tags
1594
                WHERE study_id=qiita.study.study_id) AS study_tags,
1595
            (SELECT name FROM qiita.qiita_user
1596
                WHERE email=qiita.study.email) AS owner,
1597
            qiita.study.email AS owner_email
1598
            FROM qiita.study
1599
            LEFT JOIN qiita.study_person ON (
1600
                study_person_id=principal_investigator_id)
1601
            WHERE study_id IN %s
1602
            ORDER BY study_id""".format(visibility_sql)
1603
1604
    infolist = []
1605
    if sids:
1606
        with qdb.sql_connection.TRN:
1607
            qdb.sql_connection.TRN.add(sql, [tuple(sids)])
1608
            results = qdb.sql_connection.TRN.execute_fetchindex()
1609
1610
        for info in results:
1611
            info = dict(info)
1612
1613
            # cleaning owners name
1614
            if info['owner'] in (None, ''):
1615
                info['owner'] = info['owner_email']
1616
            del info['owner_email']
1617
1618
            preparation_data_types = []
1619
            artifact_biom_ids = []
1620
            if info['preparation_information'] is not None:
1621
                for pinfo in info['preparation_information']:
1622
                    # 'f1': prep_template_id, 'f2': data_type,
1623
                    # 'f3': artifact_id, 'f4': artifact_type,
1624
                    # 'f5':deprecated, 'f6': biom artifacts
1625
                    if pinfo['f5']:
1626
                        continue
1627
                    preparation_data_types.append(pinfo['f2'])
1628
                    if pinfo['f4'] == 'BIOM':
1629
                        artifact_biom_ids.append(pinfo['f3'])
1630
                    if pinfo['f6'] is not None:
1631
                        artifact_biom_ids.extend(
1632
                            map(int, pinfo['f6'].split(',')))
1633
            del info['preparation_information']
1634
            info['artifact_biom_ids'] = list(set(artifact_biom_ids))
1635
            info['preparation_data_types'] = list(set(
1636
                preparation_data_types))
1637
1638
            # publication info
1639
            info['publication_doi'] = []
1640
            info['publication_pid'] = []
1641
            if info['publications'] is not None:
1642
                for p in info['publications']:
1643
                    # f1-2 are the default names given by pgsql
1644
                    pub = p['f1']
1645
                    is_doi = p['f2']
1646
                    if is_doi:
1647
                        info['publication_doi'].append(pub)
1648
                    else:
1649
                        info['publication_pid'].append(pub)
1650
            del info['publications']
1651
1652
            # pi info
1653
            info["pi"] = (info['pi_email'], info['pi_name'])
1654
            del info["pi_email"]
1655
            del info["pi_name"]
1656
1657
            # shared with
1658
            info['shared'] = []
1659
            if info['shared_with_name'] and info['shared_with_email']:
1660
                for name, email in zip(info['shared_with_name'],
1661
                                       info['shared_with_email']):
1662
                    if not name:
1663
                        name = email
1664
                    info['shared'].append((email, name))
1665
            del info["shared_with_name"]
1666
            del info["shared_with_email"]
1667
1668
            # # add extra info about sample information file
1669
            # if info['has_sample_info']:
1670
            #     # the fix for #3091 should go here; please reference that
1671
            #     # issue for more information of why it hasn't been closed
1672
            #     with qdb.sql_connection.TRN:
1673
            #         # check if host_scientific_name is part of the metadata
1674
            #         BMT = qdb.metadata_template.base_metadata_template
1675
            #         QCN = BMT.QIITA_COLUMN_NAME
1676
            #         sql = """SELECT POSITION('host_scientific_name' IN
1677
            #                                  sample_values->>'columns')
1678
            #                  FROM qiita.sample_%d
1679
            #                  WHERE sample_id = '%s'""" % (
1680
            #                     info['study_id'], QCN)
1681
            #         qdb.sql_connection.TRN.add(sql)
1682
            #         has_hsn = qdb.sql_connection.TRN.execute_fetchflatten()
1683
            #         # if it has that column, we can retrieve the information
1684
            #         if has_hsn[0] != 0:
1685
            #             sql = """SELECT array_agg(
1686
            #                         DISTINCT
1687
            #                         sample_values->>'host_scientific_name')
1688
            #                      FROM qiita.sample_%d
1689
            #                      WHERE sample_id != '%s'""" % (
1690
            #                         info['study_id'], QCN))
1691
            #             qdb.sql_connection.TRN.add(sql)
1692
            #             hsn = qdb.sql_connection.TRN.execute_fetchflatten()
1693
            #             info['host_scientific_name'] = hsn
1694
            del info['has_sample_info']
1695
1696
            infolist.append(info)
1697
    return infolist
1698
1699
1700
def generate_study_list_without_artifacts(study_ids, portal=None):
1701
    """Get general study information without artifacts
1702
1703
    Parameters
1704
    ----------
1705
    study_ids : list of ints
1706
        The study ids to look for. Non-existing ids will be ignored
1707
    portal : str
1708
        Portal to use, if None take it from configuration. Mainly for tests.
1709
1710
    Returns
1711
    -------
1712
    list of dict
1713
        The list of studies and their information
1714
1715
    Notes
1716
    -----
1717
    The main select might look scary but it's pretty simple:
1718
    - We select the requiered fields from qiita.study and qiita.study_person
1719
        SELECT metadata_complete, study_abstract, study_id, study_alias,
1720
            study_title, ebi_study_accession, autoloaded,
1721
            qiita.study_person.name AS pi_name,
1722
            qiita.study_person.email AS pi_email,
1723
    - the total number of samples collected by counting sample_ids
1724
            (SELECT COUNT(sample_id) FROM qiita.study_sample
1725
                WHERE study_id=qiita.study.study_id)
1726
                AS number_samples_collected]
1727
    - all the publications that belong to the study
1728
            (SELECT array_agg((publication, is_doi)))
1729
                FROM qiita.study_publication
1730
                WHERE study_id=qiita.study.study_id) AS publications
1731
    """
1732
    if portal is None:
1733
        portal = qiita_config.portal
1734
    with qdb.sql_connection.TRN:
1735
        sql = """
1736
            SELECT metadata_complete, study_abstract, study_id, study_alias,
1737
                study_title, ebi_study_accession, autoloaded,
1738
                qiita.study_person.name AS pi_name,
1739
                qiita.study_person.email AS pi_email,
1740
                (SELECT COUNT(sample_id) FROM qiita.study_sample
1741
                    WHERE study_id=qiita.study.study_id)
1742
                    AS number_samples_collected,
1743
                (SELECT array_agg(row_to_json((publication, is_doi), true))
1744
                    FROM qiita.study_publication
1745
                    WHERE study_id=qiita.study.study_id) AS publications
1746
                FROM qiita.study
1747
                LEFT JOIN qiita.study_portal USING (study_id)
1748
                LEFT JOIN qiita.portal_type USING (portal_type_id)
1749
                LEFT JOIN qiita.study_person ON (
1750
                    study_person_id=principal_investigator_id)
1751
                WHERE study_id IN %s AND portal = %s
1752
                ORDER BY study_id"""
1753
        qdb.sql_connection.TRN.add(sql, [tuple(study_ids), portal])
1754
        infolist = []
1755
        for info in qdb.sql_connection.TRN.execute_fetchindex():
1756
            info = dict(info)
1757
1758
            # publication info
1759
            info['publication_doi'] = []
1760
            info['publication_pid'] = []
1761
            if info['publications'] is not None:
1762
                for p in info['publications']:
1763
                    # f1-2 are the default names given
1764
                    pub = p['f1']
1765
                    is_doi = p['f2']
1766
                    if is_doi:
1767
                        info['publication_doi'].append(pub)
1768
                    else:
1769
                        info['publication_pid'].append(pub)
1770
            del info['publications']
1771
1772
            # pi info
1773
            info["pi"] = (info['pi_email'], info['pi_name'])
1774
            del info["pi_email"]
1775
            del info["pi_name"]
1776
1777
            infolist.append(info)
1778
    return infolist
1779
1780
1781
def get_artifacts_information(artifact_ids, only_biom=True):
1782
    """Returns processing information about the artifact ids
1783
1784
    Parameters
1785
    ----------
1786
    artifact_ids : list of ints
1787
        The artifact ids to look for. Non-existing ids will be ignored
1788
    only_biom : bool
1789
        If true only the biom artifacts are retrieved
1790
1791
    Returns
1792
    -------
1793
    dict
1794
        The info of the artifacts
1795
    """
1796
    if not artifact_ids:
1797
        return {}
1798
1799
    sql = """
1800
        WITH main_query AS (
1801
            SELECT a.artifact_id, a.name, a.command_id as cid, sc.name,
1802
                   a.generated_timestamp, array_agg(a.command_parameters),
1803
                   dt.data_type, parent_id,
1804
                   parent_info.command_id, parent_info.name,
1805
                   array_agg(parent_info.command_parameters),
1806
                   array_agg(filepaths.filepath),
1807
                   qiita.find_artifact_roots(a.artifact_id) AS root_id
1808
            FROM qiita.artifact a
1809
            LEFT JOIN qiita.software_command sc USING (command_id)"""
1810
    if only_biom:
1811
        sql += """
1812
            JOIN qiita.artifact_type at ON (
1813
                a.artifact_type_id = at .artifact_type_id
1814
                    AND artifact_type = 'BIOM')"""
1815
    sql += """
1816
            LEFT JOIN qiita.parent_artifact pa ON (
1817
                a.artifact_id = pa.artifact_id)
1818
            LEFT JOIN qiita.data_type dt USING (data_type_id)
1819
            LEFT OUTER JOIN LATERAL (
1820
                SELECT command_id, sc.name, command_parameters
1821
                FROM qiita.artifact ap
1822
                LEFT JOIN qiita.software_command sc USING (command_id)
1823
                WHERE ap.artifact_id = pa.parent_id) parent_info ON true
1824
            LEFT OUTER JOIN LATERAL (
1825
                SELECT filepath
1826
                FROM qiita.artifact_filepath af
1827
                JOIN qiita.filepath USING (filepath_id)
1828
                WHERE af.artifact_id = a.artifact_id) filepaths ON true
1829
            WHERE a.artifact_id IN %s
1830
                AND a.visibility_id NOT IN %s
1831
            GROUP BY a.artifact_id, a.name, a.command_id, sc.name,
1832
                     a.generated_timestamp, dt.data_type, parent_id,
1833
                     parent_info.command_id, parent_info.name
1834
            ORDER BY a.command_id, artifact_id),
1835
          has_target_subfragment AS (
1836
            SELECT main_query.*, prep_template_id
1837
            FROM main_query
1838
            LEFT JOIN qiita.prep_template pt ON (
1839
                main_query.root_id = pt.artifact_id)
1840
        )
1841
        SELECT * FROM has_target_subfragment
1842
        ORDER BY cid, data_type, artifact_id
1843
        """
1844
1845
    sql_params = """SELECT command_id, array_agg(parameter_name)
1846
                    FROM qiita.command_parameter
1847
                    WHERE parameter_type = 'artifact'
1848
                    GROUP BY command_id"""
1849
1850
    QCN = qdb.metadata_template.base_metadata_template.QIITA_COLUMN_NAME
1851
    sql_ts = """SELECT DISTINCT sample_values->>'target_subfragment'
1852
                FROM qiita.prep_%s
1853
                WHERE sample_id != '{0}'""".format(QCN)
1854
1855
    with qdb.sql_connection.TRN:
1856
        results = []
1857
1858
        # getting all commands and their artifact parameters so we can
1859
        # delete from the results below
1860
        commands = {}
1861
        qdb.sql_connection.TRN.add(sql_params)
1862
        for cid, params in qdb.sql_connection.TRN.execute_fetchindex():
1863
            cmd = qdb.software.Command(cid)
1864
            commands[cid] = {
1865
                'params': params,
1866
                'merging_scheme': cmd.merging_scheme,
1867
                'active': cmd.active,
1868
                'deprecated': cmd.software.deprecated}
1869
1870
        # Now let's get the actual artifacts. Note that ts is a cache
1871
        # (prep id : target subfragment) so we don't have to query
1872
        # multiple times the target subfragment for a prep info file.
1873
        # However, some artifacts (like analysis) do not have a prep info
1874
        # file; thus we can have a None prep id (key)
1875
        ts = {None: []}
1876
        ps = {}
1877
        algorithm_az = {'': ''}
1878
        PT = qdb.metadata_template.prep_template.PrepTemplate
1879
        qdb.sql_connection.TRN.add(sql, [
1880
            tuple(artifact_ids), qdb.util.artifact_visibilities_to_skip()])
1881
        for row in qdb.sql_connection.TRN.execute_fetchindex():
1882
            aid, name, cid, cname, gt, aparams, dt, pid, pcid, pname, \
1883
                pparams, filepaths, _, prep_template_id = row
1884
1885
            # cleaning up aparams & pparams
1886
            # - [0] due to the array_agg
1887
            aparams = aparams[0]
1888
            pparams = pparams[0]
1889
            if aparams is None:
1890
                aparams = {}
1891
            else:
1892
                # we are going to remove any artifacts from the parameters
1893
                for ti in commands[cid]['params']:
1894
                    del aparams[ti]
1895
1896
            # - ignoring empty filepaths
1897
            if filepaths == [None]:
1898
                filepaths = []
1899
            else:
1900
                filepaths = [fp for fp in filepaths if fp.endswith('biom')]
1901
1902
            # generating algorithm, by default is ''
1903
            algorithm = ''
1904
            if cid is not None:
1905
                deprecated = commands[cid]['deprecated']
1906
                active = commands[cid]['active']
1907
                if pcid is None:
1908
                    parent_merging_scheme = None
1909
                else:
1910
                    parent_merging_scheme = commands[pcid][
1911
                        'merging_scheme']
1912
1913
                algorithm = human_merging_scheme(
1914
                    cname, commands[cid]['merging_scheme'],
1915
                    pname, parent_merging_scheme,
1916
                    aparams, filepaths, pparams)
1917
1918
                if algorithm not in algorithm_az:
1919
                    algorithm_az[algorithm] = hashlib.md5(
1920
                        algorithm.encode('utf-8')).hexdigest()
1921
            else:
1922
                # there is no cid, thus is a direct upload; setting things
1923
                # like this so the artifacts are dispayed
1924
                deprecated = False
1925
                active = True
1926
1927
            if prep_template_id not in ts:
1928
                qdb.sql_connection.TRN.add(sql_ts, [prep_template_id])
1929
                ts[prep_template_id] = \
1930
                    qdb.sql_connection.TRN.execute_fetchflatten()
1931
            target = ts[prep_template_id]
1932
1933
            prep_samples = 0
1934
            platform = 'not provided'
1935
            target_gene = 'not provided'
1936
            if prep_template_id is not None:
1937
                if prep_template_id not in ps:
1938
                    pt = PT(prep_template_id)
1939
                    categories = pt.categories
1940
                    if 'platform' in categories:
1941
                        platform = ', '.join(
1942
                            set(pt.get_category('platform').values()))
1943
                    if 'target_gene' in categories:
1944
                        target_gene = ', '.join(
1945
                            set(pt.get_category('target_gene').values()))
1946
1947
                    ps[prep_template_id] = [
1948
                        len(list(pt.keys())), platform, target_gene]
1949
1950
                prep_samples, platform, target_gene = ps[prep_template_id]
1951
1952
            results.append({
1953
                'artifact_id': aid,
1954
                'target_subfragment': target,
1955
                'prep_samples': prep_samples,
1956
                'platform': platform,
1957
                'target_gene': target_gene,
1958
                'name': name,
1959
                'data_type': dt,
1960
                'timestamp': str(gt),
1961
                'parameters': aparams,
1962
                'algorithm': algorithm,
1963
                'algorithm_az': algorithm_az[algorithm],
1964
                'deprecated': deprecated,
1965
                'active': active,
1966
                'files': filepaths})
1967
1968
        return results
1969
1970
1971
def _is_string_or_bytes(s):
1972
    """Returns True if input argument is string (unicode or not) or bytes.
1973
    """
1974
    return isinstance(s, str) or isinstance(s, bytes)
1975
1976
1977
def _get_filehandle(filepath_or, *args, **kwargs):
1978
    """Open file if `filepath_or` looks like a string/unicode/bytes/Excel, else
1979
    pass through.
1980
1981
    Notes
1982
    -----
1983
    If Excel, the code will write a temporary txt file with the contents. Also,
1984
    it will check if the file is a Qiimp file or a regular Excel file.
1985
    """
1986
    if _is_string_or_bytes(filepath_or):
1987
        if h5py.is_hdf5(filepath_or):
1988
            fh, own_fh = h5py.File(filepath_or, *args, **kwargs), True
1989
        elif filepath_or.endswith('.xlsx'):
1990
            # due to extension, let's assume Excel file
1991
            wb = load_workbook(filename=filepath_or, data_only=True)
1992
            sheetnames = wb.sheetnames
1993
            # let's check if Qiimp, they must be in same order
1994
            first_cell_index = 0
1995
            is_qiimp_wb = False
1996
            if sheetnames == ["Metadata", "Validation", "Data Dictionary",
1997
                              "metadata_schema", "metadata_form",
1998
                              "Instructions"]:
1999
                first_cell_index = 1
2000
                is_qiimp_wb = True
2001
            first_sheet = wb[sheetnames[0]]
2002
            cell_range = range(first_cell_index, first_sheet.max_column)
2003
            _, fp = mkstemp(suffix='.txt')
2004
            with open(fp, 'w') as fh:
2005
                cfh = csv_writer(fh, delimiter='\t')
2006
                for r in first_sheet.rows:
2007
                    if is_qiimp_wb:
2008
                        # check contents of first column; if they are a zero
2009
                        # (not a valid QIIMP sample_id) or a "No more than
2010
                        # max samples" message, there are no more valid rows,
2011
                        # so don't examine any more rows.
2012
                        fcv = str(r[cell_range[0]].value)
2013
                        if fcv == "0" or fcv.startswith("No more than"):
2014
                            break
2015
                    cfh.writerow([r[x].value for x in cell_range])
2016
            fh, own_fh = open(fp, *args, **kwargs), True
2017
        else:
2018
            fh, own_fh = open(filepath_or, *args, **kwargs), True
2019
    else:
2020
        fh, own_fh = filepath_or, False
2021
    return fh, own_fh
2022
2023
2024
@contextmanager
2025
def open_file(filepath_or, *args, **kwargs):
2026
    """Context manager, like ``open``, but lets file handles and file like
2027
    objects pass untouched.
2028
2029
    It is useful when implementing a function that can accept both
2030
    strings and file-like objects (like numpy.loadtxt, etc).
2031
2032
    This method differs slightly from scikit-bio's implementation in that it
2033
    handles HDF5 files appropriately.
2034
2035
    Parameters
2036
    ----------
2037
    filepath_or : str/bytes/unicode string or file-like
2038
         If string, file to be opened using ``h5py.File`` if the file is an
2039
         HDF5 file, otherwise builtin ``open`` will be used. If it is not a
2040
         string, the object is just returned untouched.
2041
2042
    Other parameters
2043
    ----------------
2044
    args, kwargs : tuple, dict
2045
        When `filepath_or` is a string, any extra arguments are passed
2046
        on to the ``open`` builtin.
2047
    """
2048
    fh, own_fh = _get_filehandle(filepath_or, *args, **kwargs)
2049
    try:
2050
        yield fh
2051
    finally:
2052
        if own_fh:
2053
            fh.close()
2054
2055
2056
def artifact_visibilities_to_skip():
2057
    return tuple([qdb.util.convert_to_id('archived', "visibility")])
2058
2059
2060
def generate_analysis_list(analysis_ids, public_only=False):
2061
    """Get general analysis information
2062
2063
    Parameters
2064
    ----------
2065
    analysis_ids : list of ints
2066
        The analysis ids to look for. Non-existing ids will be ignored
2067
    public_only : bool, optional
2068
        If true, return only public analyses. Default: false.
2069
2070
    Returns
2071
    -------
2072
    list of dict
2073
        The list of studies and their information
2074
    """
2075
    if not analysis_ids:
2076
        return []
2077
2078
    sql = """
2079
        SELECT analysis_id, a.name, a.description, a.timestamp, a.email,
2080
            array_agg(DISTINCT artifact_id),
2081
            array_agg(DISTINCT visibility),
2082
            array_agg(DISTINCT CASE WHEN filepath_type = 'plain_text'
2083
                      THEN filepath_id END)
2084
        FROM qiita.analysis a
2085
        LEFT JOIN qiita.analysis_artifact USING (analysis_id)
2086
        LEFT JOIN qiita.artifact USING (artifact_id)
2087
        LEFT JOIN qiita.visibility USING (visibility_id)
2088
        LEFT JOIN qiita.analysis_filepath USING (analysis_id)
2089
        LEFT JOIN qiita.filepath USING (filepath_id)
2090
        LEFT JOIN qiita.filepath_type USING (filepath_type_id)
2091
        WHERE dflt = false AND analysis_id IN %s
2092
        GROUP BY analysis_id
2093
        ORDER BY analysis_id"""
2094
2095
    with qdb.sql_connection.TRN:
2096
        results = []
2097
2098
        qdb.sql_connection.TRN.add(sql, [tuple(analysis_ids)])
2099
        for row in qdb.sql_connection.TRN.execute_fetchindex():
2100
            aid, name, description, ts, owner, artifacts, \
2101
                av, mapping_files = row
2102
2103
            av = 'public' if set(av) == {'public'} else 'private'
2104
            if av != 'public' and public_only:
2105
                continue
2106
2107
            if mapping_files == [None]:
2108
                mapping_files = []
2109
            else:
2110
                mapping_files = [
2111
                    (mid, get_filepath_information(mid)['fullpath'])
2112
                    for mid in mapping_files if mid is not None]
2113
            if artifacts == [None]:
2114
                artifacts = []
2115
            else:
2116
                # making sure they are int so they don't break the GUI
2117
                artifacts = [int(a) for a in artifacts if a is not None]
2118
2119
            results.append({
2120
                'analysis_id': aid, 'name': name, 'description': description,
2121
                'timestamp': ts.strftime("%m/%d/%y %H:%M:%S"),
2122
                'visibility': av, 'artifacts': artifacts, 'owner': owner,
2123
                'mapping_files': mapping_files})
2124
2125
    return results
2126
2127
2128
def generate_analyses_list_per_study(study_id):
2129
    """Get study analyses and their preparations
2130
2131
    Parameters
2132
    ----------
2133
    study_id : int
2134
        The study id
2135
2136
    Returns
2137
    -------
2138
    list of dict
2139
        The available analyses and their general information
2140
    """
2141
    # for speed and SQL simplicity, we are going to split the search in two
2142
    # queries: 1. analysis_sql: to find analyses associated with this study
2143
    # and the artifacts used to generate the analyses; and 2. extra_sql: each
2144
    # analysis details, including the artifacts (children) that belong to
2145
    # the analysis.
2146
    analysis_sql = """
2147
        SELECT DISTINCT analysis_id, array_agg(DISTINCT artifact_id) AS aids
2148
        FROM qiita.analysis_sample analysis_sample
2149
        WHERE sample_id IN (SELECT sample_id
2150
                            FROM qiita.study_sample
2151
                            WHERE study_id = %s)
2152
        GROUP BY analysis_id
2153
        ORDER BY analysis_id
2154
    """
2155
    extra_sql = """
2156
        SELECT analysis_id, analysis.name, analysis.email, analysis.dflt,
2157
            array_agg(DISTINCT aa.artifact_id) FILTER (
2158
                      WHERE aa.artifact_id IS NOT NULL) as artifact_ids,
2159
            ARRAY(SELECT DISTINCT prep_template_id
2160
                  FROM qiita.preparation_artifact
2161
                  WHERE artifact_id IN %s) as prep_ids,
2162
            array_agg(DISTINCT visibility.visibility) FILTER (
2163
                    WHERE aa.artifact_id IS NOT NULL) as visibility
2164
        FROM qiita.analysis analysis
2165
        LEFT JOIN qiita.analysis_artifact aa USING (analysis_id)
2166
        LEFT JOIN qiita.artifact artifact USING (artifact_id)
2167
        LEFT JOIN qiita.visibility visibility USING (visibility_id)
2168
        WHERE analysis_id = %s
2169
        GROUP BY analysis_id, analysis.name, analysis.email, analysis.dflt
2170
    """
2171
    results = []
2172
    with qdb.sql_connection.TRN:
2173
        qdb.sql_connection.TRN.add(analysis_sql, [study_id])
2174
        aids = qdb.sql_connection.TRN.execute_fetchindex()
2175
        for aid, artifact_ids in aids:
2176
            qdb.sql_connection.TRN.add(
2177
                extra_sql, [tuple(artifact_ids), aid])
2178
            for row in qdb.sql_connection.TRN.execute_fetchindex():
2179
                results.append(dict(row))
2180
2181
    return results
2182
2183
2184
def create_nested_path(path):
2185
    """Wraps makedirs() to make it safe across multiple concurrent calls.
2186
    Returns successfully if the path was created, or if it already exists.
2187
    (Note, this alters the normal makedirs() behavior, where False is returned
2188
    if the full path already exists.)
2189
2190
    Parameters
2191
    ----------
2192
    path : str
2193
        The path to be created. The path can contain multiple levels that do
2194
        not currently exist on the filesystem.
2195
2196
    Raises
2197
    ------
2198
    OSError
2199
        If the operation failed for whatever reason (likely because the caller
2200
        does not have permission to create new directories in the part of the
2201
        filesystem requested
2202
    """
2203
    # TODO: catching errno=EEXIST (17 usually) will suffice for now, to avoid
2204
    # stomping when multiple artifacts are being manipulated within a study.
2205
    # In the future, employ a process-spanning mutex to serialize.
2206
    # With Python3, the try/except wrapper can be replaced with a call to
2207
    # makedirs with exist_ok=True
2208
    try:
2209
        # try creating the directory specified. if the directory already exists
2210
        # , or if qiita does not have permissions to create/modify the path, an
2211
        # exception will be thrown.
2212
        makedirs(path)
2213
    except OSError as e:
2214
        # if the directory already exists, treat as success (idempotent)
2215
        if e.errno != EEXIST:
2216
            raise
2217
2218
2219
def human_merging_scheme(cname, merging_scheme,
2220
                         pname, parent_merging_scheme,
2221
                         artifact_parameters, artifact_filepaths,
2222
                         parent_parameters):
2223
    """From the artifact and its parent features format the merging scheme
2224
2225
    Parameters
2226
    ----------
2227
    cname : str
2228
        The artifact command name
2229
    merging_scheme : dict, from qdb.artifact.Artifact.merging_scheme
2230
        The artifact merging scheme
2231
    pname : str
2232
        The artifact parent command name
2233
    parent_merging_scheme : dict, from qdb.artifact.Artifact.merging_scheme
2234
        The artifact parent merging scheme
2235
    artifact_parameters : dict
2236
        The artfiact processing parameters
2237
    artifact_filepaths : list of str
2238
        The artifact filepaths
2239
    parent_parameters :
2240
        The artifact parents processing parameters
2241
2242
    Returns
2243
    -------
2244
    str
2245
        The merging scheme
2246
    """
2247
    eparams = []
2248
    if merging_scheme['parameters']:
2249
        eparams.append(','.join(['%s: %s' % (k, artifact_parameters[k])
2250
                                 for k in merging_scheme['parameters']]))
2251
    if (merging_scheme['outputs'] and
2252
            artifact_filepaths is not None and
2253
            artifact_filepaths):
2254
        eparams.append('BIOM: %s' % ', '.join(artifact_filepaths))
2255
    if eparams:
2256
        cname = "%s (%s)" % (cname, ', '.join(eparams))
2257
2258
    if merging_scheme['ignore_parent_command']:
2259
        algorithm = cname
2260
    else:
2261
        palgorithm = 'N/A'
2262
        if pname is not None:
2263
            palgorithm = pname
2264
            if parent_merging_scheme['parameters']:
2265
                params = ','.join(
2266
                    ['%s: %s' % (k, parent_parameters[k])
2267
                     for k in parent_merging_scheme['parameters']])
2268
                palgorithm = "%s (%s)" % (palgorithm, params)
2269
2270
        algorithm = '%s | %s' % (cname, palgorithm)
2271
2272
    return algorithm
2273
2274
2275
def activate_or_update_plugins(update=False):
2276
    """Activates/updates the plugins
2277
2278
    Parameters
2279
    ----------
2280
    update : bool, optional
2281
        If True will update the plugins. Otherwise will activate them.
2282
        Default: False.
2283
    """
2284
    conf_files = sorted(glob(join(qiita_config.plugin_dir, "*.conf")))
2285
    label = "{} plugin (%s/{}): %s... ".format(
2286
        "Updating" if update else "\tLoading", len(conf_files))
2287
    for i, fp in enumerate(conf_files):
2288
        print(label % (i + 1, basename(fp)), end=None)
2289
        s = qdb.software.Software.from_file(fp, update=update)
2290
        if not update:
2291
            s.activate()
2292
        print("Ok")
2293
2294
2295
def send_email(to, subject, body):
2296
    # create email
2297
    msg = MIMEMultipart()
2298
    msg['From'] = qiita_config.smtp_email
2299
    msg['To'] = to
2300
    # we need to do 'replace' because the subject can have
2301
    # new lines in the middle of the string
2302
    msg['Subject'] = subject.replace('\n', '')
2303
    msg.attach(MIMEText(body, 'plain'))
2304
2305
    # connect to smtp server, using ssl if needed
2306
    if qiita_config.smtp_ssl:
2307
        smtp = SMTP_SSL()
2308
    else:
2309
        smtp = SMTP()
2310
    smtp.set_debuglevel(False)
2311
    smtp.connect(qiita_config.smtp_host, qiita_config.smtp_port)
2312
    # try tls, if not available on server just ignore error
2313
    try:
2314
        smtp.starttls()
2315
    except SMTPException:
2316
        pass
2317
    smtp.ehlo_or_helo_if_needed()
2318
2319
    if qiita_config.smtp_user:
2320
        smtp.login(qiita_config.smtp_user, qiita_config.smtp_password)
2321
2322
    # send email
2323
    try:
2324
        smtp.sendmail(qiita_config.smtp_email, to, msg.as_string())
2325
    except Exception:
2326
        raise RuntimeError("Can't send email!")
2327
    finally:
2328
        smtp.close()
2329
2330
2331
def resource_allocation_plot(df, col_name):
2332
    """Builds resource allocation plot for given filename and jobs
2333
2334
    Parameters
2335
    ----------
2336
    file : str, required
2337
        Builds plot for the specified file name. Usually provided as tsv.gz
2338
    col_name: str, required
2339
        Specifies x axis for the graph
2340
2341
    Returns
2342
    ----------
2343
    matplotlib.pyplot object
2344
        Returns a matplotlib object with a plot
2345
    """
2346
2347
    df.dropna(subset=['samples', 'columns'], inplace=True)
2348
    df[col_name] = df.samples * df['columns']
2349
    df[col_name] = df[col_name].astype(int)
2350
2351
    fig, axs = plt.subplots(ncols=2, figsize=(10, 4), sharey=False)
2352
2353
    ax = axs[0]
2354
    mem_models, time_models = retrieve_equations()
2355
2356
    # models for memory
2357
    _resource_allocation_plot_helper(
2358
        df, ax, "MaxRSSRaw",  mem_models, col_name)
2359
    ax = axs[1]
2360
    # models for time
2361
    _resource_allocation_plot_helper(
2362
        df, ax, "ElapsedRaw",  time_models, col_name)
2363
2364
    return fig, axs
2365
2366
2367
def retrieve_equations():
2368
    '''
2369
    Helper function for resource_allocation_plot.
2370
    Retrieves equations from db. Creates dictionary for memory and time models.
2371
2372
    Returns
2373
    -------
2374
    tuple
2375
        dict
2376
            memory models - potential memory models for resource allocations
2377
        dict
2378
            time models - potential time models for resource allocations
2379
    '''
2380
    memory_models = {}
2381
    time_models = {}
2382
    res = []
2383
    with qdb.sql_connection.TRN:
2384
        sql = ''' SELECT * FROM qiita.allocation_equations; '''
2385
        qdb.sql_connection.TRN.add(sql)
2386
        res = qdb.sql_connection.TRN.execute_fetchindex()
2387
    for models in res:
2388
        if 'mem' in models[1]:
2389
            memory_models[models[1]] = {
2390
                "equation_name": models[2],
2391
                "equation": lambda x, k, a, b: eval(models[2])
2392
            }
2393
        else:
2394
            time_models[models[1]] = {
2395
                "equation_name": models[2],
2396
                "equation": lambda x, k, a, b: eval(models[2])
2397
            }
2398
    return (memory_models, time_models)
2399
2400
2401
def retrieve_resource_data(cname, sname, version, columns):
2402
    '''
2403
    Retrieves resource data from db and constructs a DataFrame with relevant
2404
    fields.
2405
2406
    Parameters
2407
    ----------
2408
    cname - command name for which we retrieve the resources
2409
    sname - software name for which we retrieve the resources
2410
    version - version of sftware for whhich we retrieve the resources
2411
    columns - column names for the DataFrame returned by this function
2412
2413
    Returns
2414
    -------
2415
    pd.DataFrame
2416
        DataFrame with resources.
2417
    '''
2418
    with qdb.sql_connection.TRN:
2419
        sql = """
2420
            SELECT
2421
                s.name AS sName,
2422
                s.version AS sVersion,
2423
                sc.command_id AS cID,
2424
                sc.name AS cName,
2425
                pr.processing_job_id AS processing_job_id,
2426
                pr.command_parameters AS parameters,
2427
                sra.samples AS samples,
2428
                sra.columns AS columns,
2429
                sra.input_size AS input_size,
2430
                sra.extra_info AS extra_info,
2431
                sra.memory_used AS memory_used,
2432
                sra.walltime_used AS walltime_used,
2433
                sra.job_start AS job_start,
2434
                sra.node_name AS node_name,
2435
                sra.node_model AS node_model
2436
            FROM
2437
                qiita.processing_job pr
2438
            JOIN
2439
                qiita.software_command sc ON pr.command_id = sc.command_id
2440
            JOIN
2441
                qiita.software s ON sc.software_id = s.software_id
2442
            JOIN
2443
                qiita.slurm_resource_allocations sra
2444
                ON pr.processing_job_id = sra.processing_job_id
2445
            WHERE
2446
                sc.name = %s
2447
                AND s.name = %s
2448
                AND s.version = %s
2449
            """
2450
        qdb.sql_connection.TRN.add(sql, sql_args=[cname, sname, version])
2451
        res = qdb.sql_connection.TRN.execute_fetchindex()
2452
        df = pd.DataFrame(res, columns=columns)
2453
        return df
2454
2455
2456
def _resource_allocation_plot_helper(
2457
        df, ax, curr, models, col_name):
2458
    """Helper function for resource allocation plot. Builds plot for MaxRSSRaw
2459
    and ElapsedRaw
2460
2461
    Parameters
2462
    ----------
2463
    df: pandas dataframe, required
2464
        Filtered dataframe for the plot
2465
    ax : matplotlib axes, required
2466
        Axes for current subplot
2467
    cname: str, required
2468
        Specified job type
2469
    sname: str, required
2470
        Specified job sub type.
2471
    col_name: str, required
2472
        Specifies x axis for the graph
2473
    curr: str, required
2474
        Either MaxRSSRaw or ElapsedRaw (y axis)
2475
    models: dictionary, required. Follows this structure
2476
        equation_name: string
2477
            Human readable representation of the equation
2478
        equation: Python lambda function
2479
            Lambda function representing equation to optimizse
2480
2481
    Returns
2482
    -------
2483
    best_model_name: string
2484
        the name of the best model from the table
2485
    best_model: function
2486
        best fitting function for the current dictionary models
2487
    options: object
2488
        object containing constants for the best model (e.g. k, a, b in kx+b*a)
2489
    """
2490
2491
    x_data, y_data = df[col_name], df[curr]
2492
    # ax.scatter(x_data, y_data, s=2, label="data")
2493
    d = dict()
2494
    for index, row in df.iterrows():
2495
        x_value = row[col_name]
2496
        y_value = row[curr]
2497
        if x_value not in d:
2498
            d[x_value] = []
2499
        d[x_value].append(y_value)
2500
2501
    for key in d.keys():
2502
        # save only top point increased by 5% because our graph needs to exceed
2503
        # the points
2504
        d[key] = [max(d[key]) * 1.05]
2505
2506
    x_data = []
2507
    y_data = []
2508
2509
    # Populate the lists with data from the dictionary
2510
    for x, ys in d.items():
2511
        for y in ys:
2512
            x_data.append(x)
2513
            y_data.append(y)
2514
2515
    x_data = np.array(x_data)
2516
    y_data = np.array(y_data)
2517
    ax.set_xscale('log')
2518
    ax.set_yscale('log')
2519
    ax.set_ylabel(curr)
2520
    ax.set_xlabel(col_name)
2521
2522
    # 50 - number of maximum iterations, 3 - number of failures we tolerate
2523
    best_model_name, best_model, options = _resource_allocation_calculate(
2524
        df, x_data, y_data, models, curr, col_name, 50, 3)
2525
    k, a, b = options.x
2526
    x_plot = np.array(sorted(df[col_name].unique()))
2527
    y_plot = best_model(x_plot, k, a, b)
2528
    ax.plot(x_plot, y_plot, linewidth=1, color='orange')
2529
2530
    cmin_value = min(y_plot)
2531
    cmax_value = max(y_plot)
2532
2533
    maxi = naturalsize(df[curr].max(), gnu=True) if curr == "MaxRSSRaw" else \
2534
        timedelta(seconds=float(df[curr].max()))
2535
    cmax = naturalsize(cmax_value, gnu=True) if curr == "MaxRSSRaw" else \
2536
        str(timedelta(seconds=round(cmax_value, 2))).rstrip('0').rstrip('.')
2537
2538
    mini = naturalsize(df[curr].min(), gnu=True) if curr == "MaxRSSRaw" else \
2539
        timedelta(seconds=float(df[curr].min()))
2540
    cmin = naturalsize(cmin_value, gnu=True) if curr == "MaxRSSRaw" else \
2541
        str(timedelta(seconds=round(cmin_value, 2))).rstrip('0').rstrip('.')
2542
2543
    x_plot = np.array(df[col_name])
2544
    success_df, failures_df = _resource_allocation_success_failures(
2545
        df, k, a, b, best_model, col_name, curr)
2546
    failures = failures_df.shape[0]
2547
    ax.scatter(failures_df[col_name], failures_df[curr], color='red', s=3,
2548
               label="failures")
2549
    success_df['node_name'] = success_df['node_name'].fillna('unknown')
2550
    slurm_hosts = set(success_df['node_name'].tolist())
2551
    cmap = colormaps.get_cmap('Accent')
2552
    if len(slurm_hosts) > len(cmap.colors):
2553
        raise ValueError(f"""'Accent' colormap only has {len(cmap.colors)}
2554
                     colors, but {len(slurm_hosts)} hosts are provided.""")
2555
    colors = cmap.colors[:len(slurm_hosts)]
2556
2557
    for i, host in enumerate(slurm_hosts):
2558
        host_df = success_df[success_df['node_name'] == host]
2559
        ax.scatter(host_df[col_name], host_df[curr], color=colors[i], s=3,
2560
                   label=host)
2561
    ax.set_title(
2562
                 f'k||a||b: {k}||{a}||{b}\n'
2563
                 f'model: {models[best_model_name]["equation_name"]}\n'
2564
                 f'real: {mini} || {maxi}\n'
2565
                 f'calculated: {cmin} || {cmax}\n'
2566
                 f'failures: {failures}')
2567
    ax.legend(loc='upper left')
2568
    return best_model_name, best_model, options
2569
2570
2571
def _resource_allocation_calculate(
2572
        df, x, y, models, type_, col_name, depth, tolerance):
2573
    """Helper function for resource allocation plot. Calculates best_model and
2574
    best_result given the models list and x,y data.
2575
2576
    Parameters
2577
    ----------
2578
    x: pandas.Series (pandas column), required
2579
        Represents x data for the function calculation
2580
    y: pandas.Series (pandas column), required
2581
        Represents y data for the function calculation
2582
    type_: str, required
2583
        current type (e.g. MaxRSSRaw)
2584
    col_name: str, required
2585
        Specifies x axis for the graph
2586
    models: dictionary, required. Follows this structure
2587
        equation_name: string
2588
            Human readable representation of the equation
2589
        equation: Python lambda function
2590
            Lambda function representing equation to optimizse
2591
    depth: int, required
2592
        Maximum number of iterations in binary search
2593
    tolerance: int, required,
2594
        Tolerance to number of failures possible to be considered as a model
2595
2596
    Returns
2597
    ----------
2598
    best_model_name: string
2599
        the name of the best model from the table
2600
    best_model: function
2601
        best fitting function for the current dictionary models
2602
    best_result: object
2603
        object containing constants for the best model (e.g. k, a, b in kx+b*a)
2604
    """
2605
2606
    init = [1, 1, 1]
2607
    best_model_name = None
2608
    best_model = None
2609
    best_result = None
2610
    best_failures = np.inf
2611
    best_max = np.inf
2612
    for model_name, model in models.items():
2613
        model_equation = model['equation']
2614
        # start values for binary search, where sl is left, sr is right
2615
        # penalty weight must be positive & non-zero, hence, sl >= 1.
2616
        # the upper bound for error can be an arbitrary large number
2617
        sl = 1
2618
        sr = 100000
2619
        left = sl
2620
        right = sr
2621
        prev_failures = np.inf
2622
        min_max = np.inf
2623
        cnt = 0
2624
        res = [1, 1, 1]  # k, a, b
2625
2626
        # binary search where we find the minimum penalty weight given the
2627
        # scoring constraints defined in if/else statements.
2628
        while left < right and cnt < depth:
2629
            middle = (left + right) // 2
2630
            options = minimize(_resource_allocation_custom_loss, init,
2631
                               args=(x, y, model_equation, middle))
2632
            k, a, b = options.x
2633
            # important: here we take the 2nd (last) value of tuple since
2634
            # the helper function returns success, then failures.
2635
            failures_df = _resource_allocation_success_failures(
2636
                df, k, a, b, model_equation, col_name, type_)[-1]
2637
            y_plot = model_equation(x, k, a, b)
2638
            if not any(y_plot):
2639
                continue
2640
            cmax = max(y_plot)
2641
            cmin = min(y_plot)
2642
            failures = failures_df.shape[0]
2643
2644
            if failures < prev_failures:
2645
                prev_failures = failures
2646
                right = middle
2647
                min_max = cmax
2648
                res = options
2649
2650
            elif failures > prev_failures:
2651
                left = middle
2652
            else:
2653
                if cmin < 0:
2654
                    left = middle
2655
                elif cmax < min_max:
2656
                    min_max = cmax
2657
                    res = options
2658
                    right = middle
2659
                else:
2660
                    right = middle
2661
2662
            # proceed with binary search in a window 10k to the right
2663
            if left >= right and cnt < depth:
2664
                sl += 10000
2665
                sr += 10000
2666
                left = sl
2667
                right = sr
2668
2669
            cnt += 1
2670
2671
        # check whether we tolerate a couple failures
2672
        # this is helpful if the model that has e.g. 1 failure is a better fit
2673
        # overall based on maximum calculated value.
2674
        is_acceptable_based_on_failures = (
2675
            prev_failures <= tolerance or abs(
2676
                prev_failures - best_failures) < tolerance or
2677
            best_failures == np.inf)
2678
2679
        # case where less failures
2680
        if is_acceptable_based_on_failures:
2681
            if min_max <= best_max:
2682
                best_failures = prev_failures
2683
                best_max = min_max
2684
                best_model_name = model_name
2685
                best_model = model_equation
2686
                best_result = res
2687
    return best_model_name, best_model, best_result
2688
2689
2690
def _resource_allocation_custom_loss(params, x, y, model, p):
2691
    """Helper function for resource allocation plot. Calculates custom loss
2692
    for given model.
2693
2694
    Parameters
2695
    ----------
2696
    params: list, required
2697
        Initial list of integers for the given model
2698
    x: pandas.Series (pandas column), required
2699
        Represents x data for the function calculation
2700
    y: pandas.Series (pandas column), required
2701
        Represents y data for the function calculation
2702
    model: Python function
2703
        Lambda function representing current equation
2704
    p: int, required
2705
        Penalty weight for custom loss function
2706
2707
    Returns
2708
    ----------
2709
    float
2710
        The mean of the list returned by the loss calculation (np.where)
2711
    """
2712
    k, a, b = params
2713
2714
    residuals = y - model(x, k, a, b)
2715
    # Apply a heavier penalty to points below the curve
2716
    penalty = p
2717
    weighted_residuals = np.where(residuals > 0, penalty * residuals**2,
2718
                                  residuals**2)
2719
    return np.mean(weighted_residuals)
2720
2721
2722
def _resource_allocation_success_failures(df, k, a, b, model, col_name, type_):
2723
    """Helper function for resource allocation plot. Creates a dataframe with
2724
    successes and failures given current model.
2725
2726
    Parameters
2727
    ----------
2728
    df: pandas.Dataframe, required
2729
        Represents dataframe containing current jobs data
2730
    k: int, required
2731
        k constant in a model
2732
    a: int, required
2733
        a constant in a model
2734
    b: int, required
2735
        b constant in a model
2736
    model: function, required
2737
        Current function
2738
    col_name: str, required
2739
        Specifies x axis for the graph
2740
    type_: str, required
2741
        Specifies for which type we're getting failures (e.g. MaxRSSRaw)
2742
2743
    Returns
2744
    ----------
2745
    tuple with:
2746
        pandas.Dataframe
2747
            Dataframe containing successes for current type.
2748
        pandas.Dataframe
2749
            Dataframe containing failures for current type.
2750
    """
2751
2752
    x_plot = np.array(df[col_name])
2753
    df[f'c{type_}'] = model(x_plot, k, a, b)
2754
    success_df = df[df[type_] <= df[f'c{type_}']]
2755
    failures_df = df[df[type_] > df[f'c{type_}']]
2756
    return (success_df, failures_df)
2757
2758
2759
def MaxRSS_helper(x):
2760
    if x[-1] == 'K':
2761
        y = float(x[:-1]) * 1000
2762
    elif x[-1] == 'M':
2763
        y = float(x[:-1]) * 1000000
2764
    elif x[-1] == 'G':
2765
        y = float(x[:-1]) * 1000000000
2766
    else:
2767
        y = float(x)
2768
    return y
2769
2770
2771
def update_resource_allocation_table(weeks=1, test=None):
2772
    # Thu, Apr 27, 2023 old allocations (from barnacle) were changed to a
2773
    # better allocation so we default start time 2023-04-28 to
2774
    # use the latests for the newest version
2775
    """
2776
        Updates qiita.slurm_resource_allocation SQL table with jobs from slurm.
2777
        Retrieves the most recent job available in the table and appends with
2778
        the data.
2779
2780
        Parameters:
2781
        ----------
2782
        weeks: integer, optional
2783
            Number of weeks for which we want to make a request from slurm.
2784
        test: pandas.DataFrame, optional
2785
            Represents dataframe containing slurm data from 2023-04-28. Used
2786
            for testing only.
2787
    """
2788
2789
    # retrieve the most recent timestamp
2790
    sql_timestamp = """
2791
            SELECT
2792
                pj.external_job_id,
2793
                sra.job_start
2794
            FROM
2795
                qiita.processing_job pj
2796
            JOIN
2797
                qiita.slurm_resource_allocations sra
2798
            ON
2799
                pj.processing_job_id = sra.processing_job_id
2800
            ORDER BY
2801
                sra.job_start DESC
2802
            LIMIT 1;
2803
        """
2804
2805
    dates = ['', '']
2806
2807
    slurm_external_id = 0
2808
    start_date = datetime.strptime('2023-04-28', '%Y-%m-%d')
2809
    with qdb.sql_connection.TRN:
2810
        sql = sql_timestamp
2811
        qdb.sql_connection.TRN.add(sql)
2812
        res = qdb.sql_connection.TRN.execute_fetchindex()
2813
        if res:
2814
            sei, sd = res[0]
2815
            if sei is not None:
2816
                slurm_external_id = sei
2817
            if sd is not None:
2818
                start_date = sd
2819
        dates = [start_date, start_date + timedelta(weeks=weeks)]
2820
2821
    sql_command = """
2822
            SELECT
2823
                pj.processing_job_id AS processing_job_id,
2824
                pj.external_job_id AS external_job_id
2825
            FROM
2826
                qiita.software_command sc
2827
            JOIN
2828
                qiita.processing_job pj ON pj.command_id = sc.command_id
2829
            JOIN
2830
                qiita.processing_job_status pjs
2831
                ON pj.processing_job_status_id = pjs.processing_job_status_id
2832
            LEFT JOIN
2833
                qiita.slurm_resource_allocations sra
2834
                ON pj.processing_job_id = sra.processing_job_id
2835
            WHERE
2836
                pjs.processing_job_status = 'success'
2837
            AND
2838
                pj.external_job_id ~ '^[0-9]+$'
2839
            AND
2840
                CAST(pj.external_job_id AS INTEGER) > %s
2841
            AND
2842
                sra.processing_job_id IS NULL;
2843
        """
2844
    df = pd.DataFrame()
2845
    with qdb.sql_connection.TRN:
2846
        qdb.sql_connection.TRN.add(sql_command, sql_args=[slurm_external_id])
2847
        res = qdb.sql_connection.TRN.execute_fetchindex()
2848
        df = pd.DataFrame(res, columns=["processing_job_id", 'external_id'])
2849
        df['external_id'] = df['external_id'].astype(int)
2850
2851
    data = []
2852
    sacct = [
2853
        'sacct', '-p',
2854
        '--format=JobID,ElapsedRaw,MaxRSS,Submit,Start,End,CPUTimeRAW,'
2855
        'ReqMem,AllocCPUs,AveVMSize,MaxVMSizeNode', '--starttime',
2856
        dates[0].strftime('%Y-%m-%d'), '--endtime',
2857
        dates[1].strftime('%Y-%m-%d'), '--user', 'qiita', '--state', 'CD']
2858
2859
    if test is not None:
2860
        slurm_data = test
2861
    else:
2862
        rvals = check_output(sacct).decode('ascii')
2863
        slurm_data = pd.read_csv(StringIO(rvals), sep='|')
2864
2865
    # In slurm, each JobID is represented by 3 rows in the dataframe:
2866
    # - external_id:        overall container for the job and its associated
2867
    #                       requests. When the Timelimit is hit, the container
2868
    #                       would take care of completing/stopping the
2869
    #                       external_id.batch job.
2870
    # - external_id.batch:  it's a container job, it provides how
2871
    #                       much memory it uses and cpus allocated, etc.
2872
    # - external_id.extern: takes into account anything that happens
2873
    #                       outside processing but yet is included in
2874
    #                       the container resources. As in, if you ssh
2875
    #                       to the node and do something additional or run
2876
    #                       a prolog script, that processing would be under
2877
    #                       external_id but separate from external_id.batch
2878
    # Here we are going to merge all this info into a single row + some
2879
    # other columns
2880
2881
    def merge_rows(rows):
2882
        date_fmt = '%Y-%m-%dT%H:%M:%S'
2883
        wait_time = (
2884
            datetime.strptime(rows.iloc[0]['Start'], date_fmt) -
2885
            datetime.strptime(rows.iloc[0]['Submit'], date_fmt))
2886
        if rows.shape[0] >= 2:
2887
            tmp = rows.iloc[1].copy()
2888
        else:
2889
            tmp = rows.iloc[0].copy()
2890
        tmp['WaitTime'] = wait_time
2891
        return tmp
2892
2893
    slurm_data['external_id'] = slurm_data['JobID'].apply(
2894
                                            lambda x: int(x.split('.')[0]))
2895
    slurm_data['external_id'] = slurm_data['external_id'].ffill()
2896
2897
    slurm_data = slurm_data.groupby(
2898
            'external_id').apply(merge_rows).reset_index(drop=True)
2899
2900
    # filter to only those jobs that are within the slurm_data df.
2901
    eids = set(slurm_data['external_id'])
2902
    df = df[df['external_id'].isin(eids)]
2903
2904
    for index, row in df.iterrows():
2905
        job = qdb.processing_job.ProcessingJob(row['processing_job_id'])
2906
        extra_info = ''
2907
        eid = job.external_id
2908
2909
        cmd = job.command
2910
        s = job.command.software
2911
        try:
2912
            samples, columns, input_size = job.shape
2913
        except qdb.exceptions.QiitaDBUnknownIDError:
2914
            # this will be raised if the study or the analysis has been
2915
            # deleted; in other words, the processing_job was ran but the
2916
            # details about it were erased when the user deleted them -
2917
            # however, we keep the job for the record
2918
            continue
2919
        except TypeError as e:
2920
            # similar to the except above, exept that for these 2 commands, we
2921
            # have the study_id as None
2922
            if cmd.name in {'create_sample_template', 'delete_sample_template',
2923
                            'list_remote_files'}:
2924
                continue
2925
            else:
2926
                raise e
2927
        sname = s.name
2928
2929
        if cmd.name == 'release_validators':
2930
            ej = qdb.processing_job.ProcessingJob(job.parameters.values['job'])
2931
            extra_info = ej.command.name
2932
            samples, columns, input_size = ej.shape
2933
        elif cmd.name == 'complete_job':
2934
            artifacts = loads(job.parameters.values['payload'])['artifacts']
2935
            if artifacts is not None:
2936
                extra_info = ','.join({
2937
                    x['artifact_type'] for x in artifacts.values()
2938
                    if 'artifact_type' in x})
2939
        elif cmd.name == 'Validate':
2940
            input_size = sum([len(x) for x in loads(
2941
                job.parameters.values['files']).values()])
2942
            sname = f"{sname} - {job.parameters.values['artifact_type']}"
2943
        elif cmd.name == 'Alpha rarefaction curves [alpha_rarefaction]':
2944
            extra_info = job.parameters.values[
2945
                ('The number of rarefaction depths to include between '
2946
                 'min_depth and max_depth. (steps)')]
2947
        curr = slurm_data[slurm_data['external_id'] == int(eid)].iloc[0]
2948
        barnacle_info = curr['MaxVMSizeNode']
2949
        if len(barnacle_info) == 0:
2950
            barnacle_info = [None, None]
2951
        else:
2952
            barnacle_info = barnacle_info.split('-')
2953
2954
        row_dict = {
2955
            'processing_job_id': job.id,
2956
            'samples': samples,
2957
            'columns': columns,
2958
            'input_size': input_size,
2959
            'extra_info': extra_info,
2960
            'ElapsedRaw': curr['ElapsedRaw'],
2961
            'MaxRSS': curr['MaxRSS'],
2962
            'Start': curr['Start'],
2963
            'node_name': barnacle_info[0],
2964
            'node_model': barnacle_info[1]
2965
        }
2966
        data.append(row_dict)
2967
    df = pd.DataFrame(data)
2968
2969
    # This is important as we are transforming the MaxRSS to raw value
2970
    # so we need to confirm that there is no other suffixes
2971
    print('Make sure that only 0/K/M exist', set(
2972
        df.MaxRSS.apply(lambda x: str(x)[-1])))
2973
2974
    # Generating new columns
2975
    df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x)))
2976
    df['ElapsedRawTime'] = df.ElapsedRaw.apply(
2977
        lambda x: timedelta(seconds=float(x)))
2978
    df.replace({np.nan: None}, inplace=True)
2979
2980
    for index, row in df.iterrows():
2981
        with qdb.sql_connection.TRN:
2982
            sql = """
2983
                INSERT INTO qiita.slurm_resource_allocations (
2984
                    processing_job_id,
2985
                    samples,
2986
                    columns,
2987
                    input_size,
2988
                    extra_info,
2989
                    memory_used,
2990
                    walltime_used,
2991
                    job_start,
2992
                    node_name,
2993
                    node_model
2994
                )
2995
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
2996
            """
2997
            to_insert = [
2998
                row['processing_job_id'], row['samples'], row['columns'],
2999
                row['input_size'], row['extra_info'], row['MaxRSSRaw'],
3000
                row['ElapsedRaw'], row['Start'], row['node_name'],
3001
                row['node_model']]
3002
            qdb.sql_connection.TRN.add(sql, sql_args=to_insert)
3003
            qdb.sql_connection.TRN.execute()
3004
3005
3006
def merge_overlapping_strings(str1, str2):
3007
    """Helper function to merge 2 overlapping strings
3008
3009
    Parameters
3010
    ----------
3011
    str1: str
3012
        Initial string
3013
    str2: str
3014
        End string
3015
3016
    Returns
3017
    ----------
3018
    str
3019
        The merged strings
3020
    """
3021
    overlap = ""
3022
    for i in range(1, min(len(str1), len(str2)) + 1):
3023
        if str1.endswith(str2[:i]):
3024
            overlap = str2[:i]
3025
    return str1 + str2[len(overlap):]