a b/qiita_db/test/test_util.py
1
# -----------------------------------------------------------------------------
2
# Copyright (c) 2014--, The Qiita Development Team.
3
#
4
# Distributed under the terms of the BSD 3-clause License.
5
#
6
# The full license is in the file LICENSE, distributed with this software.
7
# -----------------------------------------------------------------------------
8
9
from unittest import TestCase, main
10
from tempfile import mkstemp, mkdtemp, NamedTemporaryFile, TemporaryFile
11
from os import close, remove, mkdir
12
from os.path import join, exists, basename
13
from shutil import rmtree
14
from datetime import datetime
15
from functools import partial
16
from string import punctuation
17
import h5py
18
from six import StringIO, BytesIO
19
import pandas as pd
20
21
from qiita_core.util import qiita_test_checker
22
import qiita_db as qdb
23
24
from matplotlib.figure import Figure
25
from matplotlib.axes import Axes
26
import matplotlib.pyplot as plt
27
28
29
@qiita_test_checker()
30
class DBUtilTestsBase(TestCase):
31
    def setUp(self):
32
        self.table = 'study'
33
        self.required = [
34
            'study_title', 'mixs_compliant',
35
            'metadata_complete', 'study_description', 'first_contact',
36
            'reprocess', 'timeseries_type_id', 'study_alias',
37
            'study_abstract', 'principal_investigator_id', 'email']
38
        self.files_to_remove = []
39
40
    def tearDown(self):
41
        for fp in self.files_to_remove:
42
            if exists(fp):
43
                remove(fp)
44
45
46
class DBUtilTests(DBUtilTestsBase):
47
    def test_max_preparation_samples(self):
48
        """Test that we get the correct max_preparation_samples"""
49
        obs = qdb.util.max_preparation_samples()
50
        self.assertEqual(obs, 800)
51
52
    def test_max_artifacts_in_workflow(self):
53
        """Test that we get the correct max_artifacts_in_workflow"""
54
        obs = qdb.util.max_artifacts_in_workflow()
55
        self.assertEqual(obs, 35)
56
57
    def test_filepath_id_to_object_id(self):
58
        # filepaths 1, 2 belongs to artifact 1
59
        self.assertEqual(qdb.util.filepath_id_to_object_id(1), 1)
60
        self.assertEqual(qdb.util.filepath_id_to_object_id(2), 1)
61
        # filepaths 3, 4 belongs to artifact 2
62
        self.assertEqual(qdb.util.filepath_id_to_object_id(3), 2)
63
        self.assertEqual(qdb.util.filepath_id_to_object_id(4), 2)
64
        # filepaths 9 belongs to artifact 4
65
        self.assertEqual(qdb.util.filepath_id_to_object_id(9), 4)
66
        # filepath 16 belongs to anlaysis 1
67
        self.assertEqual(qdb.util.filepath_id_to_object_id(16), 1)
68
        # filepath 18 belongs to study 1
69
        self.assertIsNone(qdb.util.filepath_id_to_object_id(18))
70
        # filepath 22 belongs to analysis/artifact 7
71
        self.assertEqual(qdb.util.filepath_id_to_object_id(22), 7)
72
73
    def test_check_required_columns(self):
74
        # Doesn't do anything if correct info passed, only errors if wrong info
75
        qdb.util.check_required_columns(self.required, self.table)
76
77
    def test_check_required_columns_fail(self):
78
        self.required.remove('study_title')
79
        with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
80
            qdb.util.check_required_columns(self.required, self.table)
81
82
    def test_check_table_cols(self):
83
        # Doesn't do anything if correct info passed, only errors if wrong info
84
        qdb.util.check_table_cols(self.required, self.table)
85
86
    def test_check_table_cols_fail(self):
87
        self.required.append('BADTHINGNOINHERE')
88
        with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
89
            qdb.util.check_table_cols(self.required, self.table)
90
91
    def test_get_table_cols(self):
92
        obs = qdb.util.get_table_cols("qiita_user")
93
        exp = {"email", "user_level_id", "password", "name", "affiliation",
94
               "address", "phone", "user_verify_code", "pass_reset_code",
95
               "pass_reset_timestamp", "receive_processing_job_emails",
96
               "social_orcid", "social_researchgate", "social_googlescholar",
97
               "creation_timestamp"}
98
        self.assertEqual(set(obs), exp)
99
100
    def test_exists_table(self):
101
        """Correctly checks if a table exists"""
102
        # True cases
103
        self.assertTrue(qdb.util.exists_table("filepath"))
104
        self.assertTrue(qdb.util.exists_table("qiita_user"))
105
        self.assertTrue(qdb.util.exists_table("analysis"))
106
        self.assertTrue(qdb.util.exists_table("prep_1"))
107
        self.assertTrue(qdb.util.exists_table("sample_1"))
108
        # False cases
109
        self.assertFalse(qdb.util.exists_table("sample_2"))
110
        self.assertFalse(qdb.util.exists_table("prep_3"))
111
        self.assertFalse(qdb.util.exists_table("foo_table"))
112
        self.assertFalse(qdb.util.exists_table("bar_table"))
113
114
    def test_convert_to_id(self):
115
        """Tests that ids are returned correctly"""
116
        self.assertEqual(
117
            qdb.util.convert_to_id("directory", "filepath_type"), 8)
118
        self.assertEqual(
119
            qdb.util.convert_to_id("private", "visibility", "visibility"), 3)
120
        self.assertEqual(
121
            qdb.util.convert_to_id("EMP", "portal_type", "portal"), 2)
122
123
    def test_convert_to_id_bad_value(self):
124
        """Tests that ids are returned correctly"""
125
        with self.assertRaises(qdb.exceptions.QiitaDBLookupError):
126
            qdb.util.convert_to_id("FAKE", "filepath_type")
127
128
    def test_get_artifact_types(self):
129
        obs = qdb.util.get_artifact_types()
130
        exp = {'SFF': 1, 'FASTA_Sanger': 2, 'FASTQ': 3, 'FASTA': 4,
131
               'per_sample_FASTQ': 5, 'Demultiplexed': 6, 'BIOM': 7,
132
               'beta_div_plots': 8, 'rarefaction_curves': 9,
133
               'taxa_summary': 10}
134
        self.assertEqual(obs, exp)
135
136
        obs = qdb.util.get_artifact_types(key_by_id=True)
137
        exp = {v: k for k, v in exp.items()}
138
        self.assertEqual(obs, exp)
139
140
    def test_get_filepath_types(self):
141
        """Tests that get_filepath_types works with valid arguments"""
142
        obs = qdb.util.get_filepath_types()
143
        exp = {'raw_forward_seqs': 1, 'raw_reverse_seqs': 2,
144
               'raw_barcodes': 3, 'preprocessed_fasta': 4,
145
               'preprocessed_fastq': 5, 'preprocessed_demux': 6, 'biom': 7,
146
               'directory': 8, 'plain_text': 9, 'reference_seqs': 10,
147
               'reference_tax': 11, 'reference_tree': 12, 'log': 13,
148
               'sample_template': 14, 'prep_template': 15, 'qiime_map': 16,
149
               'bam': 17
150
               }
151
        with qdb.sql_connection.TRN:
152
            qdb.sql_connection.TRN.add("SELECT filepath_type,filepath_type_id "
153
                                       "FROM qiita.filepath_type")
154
            exp = dict(qdb.sql_connection.TRN.execute_fetchindex())
155
        self.assertEqual(obs, exp)
156
157
        obs = qdb.util.get_filepath_types(key='filepath_type_id')
158
        exp = {v: k for k, v in exp.items()}
159
        self.assertEqual(obs, exp)
160
161
    def test_get_filepath_types_fail(self):
162
        """Tests that get_Filetypes fails with invalid argument"""
163
        with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
164
            qdb.util.get_filepath_types(key='invalid')
165
166
    def test_get_data_types(self):
167
        """Tests that get_data_types works with valid arguments"""
168
        obs = qdb.util.get_data_types()
169
        exp = {'16S': 1, '18S': 2, 'ITS': 3, 'Proteomic': 4, 'Metabolomic': 5,
170
               'Metagenomic': 6, 'Multiomic': 7, 'Metatranscriptomics': 8,
171
               'Viromics': 9, 'Genomics': 10, 'Transcriptomics': 11,
172
               'Job Output Folder': 12}
173
        self.assertEqual(obs, exp)
174
175
        obs = qdb.util.get_data_types(key='data_type_id')
176
        exp = {v: k for k, v in exp.items()}
177
        self.assertEqual(obs, exp)
178
179
    def test_create_rand_string(self):
180
        set_punct = set(punctuation)
181
182
        obs = qdb.util.create_rand_string(200)
183
        self.assertEqual(len(obs), 200)
184
        self.assertTrue(set_punct.intersection(set(obs)))
185
186
        obs = qdb.util.create_rand_string(400, punct=False)
187
        self.assertEqual(len(obs), 400)
188
        self.assertFalse(set_punct.intersection(set(obs)))
189
190
    def test_get_count(self):
191
        """Checks that get_count retrieves proper count"""
192
        self.assertEqual(qdb.util.get_count('qiita.study_person'), 3)
193
194
    def test_check_count(self):
195
        """Checks that check_count returns True and False appropriately"""
196
        self.assertTrue(qdb.util.check_count('qiita.study_person', 3))
197
        self.assertFalse(qdb.util.check_count('qiita.study_person', 2))
198
199
    def test_insert_filepaths(self):
200
        fd, fp = mkstemp()
201
        close(fd)
202
        with open(fp, "w") as f:
203
            f.write("\n")
204
        self.files_to_remove.append(fp)
205
206
        with qdb.sql_connection.TRN:
207
            qdb.sql_connection.TRN.add(
208
                "SELECT last_value FROM qiita.filepath_filepath_id_seq")
209
            exp_new_id = 1 + qdb.sql_connection.TRN.execute_fetchflatten()[0]
210
        obs = qdb.util.insert_filepaths([(fp, 1)], 2, "raw_data")
211
        self.assertEqual(obs, [exp_new_id])
212
213
        # Check that the files have been copied correctly
214
        exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
215
                      "2_%s" % basename(fp))
216
        self.assertTrue(exists(exp_fp))
217
        self.assertFalse(exists(fp))
218
        self.files_to_remove.append(exp_fp)
219
220
        # Check that the filepaths have been added to the DB
221
        with qdb.sql_connection.TRN:
222
            qdb.sql_connection.TRN.add("SELECT * FROM qiita.filepath "
223
                                       "WHERE filepath_id=%d" % exp_new_id)
224
            obs = qdb.sql_connection.TRN.execute_fetchindex()
225
        exp_fp = "2_%s" % basename(fp)
226
        exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5, 1]]
227
        self.assertEqual(obs, exp)
228
229
        qdb.util.purge_filepaths()
230
231
    def test_insert_filepaths_copy(self):
232
        fd, fp = mkstemp()
233
        close(fd)
234
        with open(fp, "w") as f:
235
            f.write("\n")
236
        self.files_to_remove.append(fp)
237
238
        # The id's in the database are bigserials, i.e. they get
239
        # autoincremented for each element introduced.
240
        with qdb.sql_connection.TRN:
241
            qdb.sql_connection.TRN.add(
242
                "SELECT last_value FROM qiita.filepath_filepath_id_seq")
243
            exp_new_id = 1 + qdb.sql_connection.TRN.execute_fetchflatten()[0]
244
        obs = qdb.util.insert_filepaths([(fp, 1)], 2, "raw_data",
245
                                        move_files=False, copy=True)
246
        self.assertEqual(obs, [exp_new_id])
247
248
        # Check that the files have been copied correctly
249
        exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
250
                      "2_%s" % basename(fp))
251
        self.assertTrue(exists(exp_fp))
252
        self.assertTrue(exists(fp))
253
        self.files_to_remove.append(exp_fp)
254
255
        # Check that the filepaths have been added to the DB
256
        with qdb.sql_connection.TRN:
257
            qdb.sql_connection.TRN.add("SELECT * FROM qiita.filepath "
258
                                       "WHERE filepath_id=%d" % exp_new_id)
259
            obs = qdb.sql_connection.TRN.execute_fetchindex()
260
        exp_fp = "2_%s" % basename(fp)
261
        exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5, 1]]
262
        self.assertEqual(obs, exp)
263
264
        # let's do that again but with move_files = True
265
        exp_new_id += 1
266
        obs = qdb.util.insert_filepaths([(fp, 1)], 2, "raw_data",
267
                                        move_files=True, copy=True)
268
        self.assertEqual(obs, [exp_new_id])
269
270
        # Check that the files have been copied correctly
271
        exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
272
                      "2_%s" % basename(fp))
273
        self.assertTrue(exists(exp_fp))
274
        self.assertTrue(exists(fp))
275
        self.files_to_remove.append(exp_fp)
276
277
        qdb.util.purge_filepaths()
278
279
    def test_insert_filepaths_string(self):
280
        fd, fp = mkstemp()
281
        close(fd)
282
        with open(fp, "w") as f:
283
            f.write("\n")
284
        self.files_to_remove.append(fp)
285
286
        with qdb.sql_connection.TRN:
287
            qdb.sql_connection.TRN.add(
288
                "SELECT last_value FROM qiita.filepath_filepath_id_seq")
289
            exp_new_id = 1 + qdb.sql_connection.TRN.execute_fetchflatten()[0]
290
        obs = qdb.util.insert_filepaths(
291
            [(fp, "raw_forward_seqs")], 2, "raw_data")
292
        self.assertEqual(obs, [exp_new_id])
293
294
        # Check that the files have been copied correctly
295
        exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
296
                      "2_%s" % basename(fp))
297
        self.assertTrue(exists(exp_fp))
298
        self.files_to_remove.append(exp_fp)
299
300
        # Check that the filepaths have been added to the DB
301
        with qdb.sql_connection.TRN:
302
            qdb.sql_connection.TRN.add("SELECT * FROM qiita.filepath "
303
                                       "WHERE filepath_id=%d" % exp_new_id)
304
            obs = qdb.sql_connection.TRN.execute_fetchindex()
305
        exp_fp = "2_%s" % basename(fp)
306
        exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5, 1]]
307
        self.assertEqual(obs, exp)
308
309
        qdb.util.purge_filepaths()
310
311
    def test_retrieve_filepaths(self):
312
        obs = qdb.util.retrieve_filepaths('artifact_filepath',
313
                                          'artifact_id', 1)
314
        path_builder = partial(
315
            join, qdb.util.get_db_files_base_dir(), "raw_data")
316
        exp = [{'fp_id': 1,
317
                'fp': path_builder("1_s_G1_L001_sequences.fastq.gz"),
318
                'fp_type': "raw_forward_seqs",
319
                'checksum': '2125826711',
320
                'fp_size': 58},
321
               {'fp_id': 2,
322
                'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
323
                'fp_type': "raw_barcodes",
324
                'checksum': '2125826711',
325
                'fp_size': 58}]
326
        self.assertEqual(obs, exp)
327
328
    def test_retrieve_filepaths_sort(self):
329
        obs = qdb.util.retrieve_filepaths(
330
            'artifact_filepath', 'artifact_id', 1, sort='descending')
331
        path_builder = partial(
332
            join, qdb.util.get_db_files_base_dir(), "raw_data")
333
        exp = [{'fp_id': 2,
334
                'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
335
                'fp_type': "raw_barcodes",
336
                'checksum': '2125826711',
337
                'fp_size': 58},
338
               {'fp_id': 1,
339
                'fp': path_builder("1_s_G1_L001_sequences.fastq.gz"),
340
                'fp_type': "raw_forward_seqs",
341
                'checksum': '2125826711',
342
                'fp_size': 58}]
343
        self.assertEqual(obs, exp)
344
345
    def test_retrieve_filepaths_type(self):
346
        obs = qdb.util.retrieve_filepaths(
347
            'artifact_filepath', 'artifact_id', 1, sort='descending',
348
            fp_type='raw_barcodes')
349
        path_builder = partial(
350
            join, qdb.util.get_db_files_base_dir(), "raw_data")
351
        exp = [{'fp_id': 2,
352
                'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
353
                'fp_type': "raw_barcodes",
354
                'checksum': '2125826711',
355
                'fp_size': 58}]
356
        self.assertEqual(obs, exp)
357
358
        obs = qdb.util.retrieve_filepaths(
359
            'artifact_filepath', 'artifact_id', 1, fp_type='raw_barcodes')
360
        path_builder = partial(
361
            join, qdb.util.get_db_files_base_dir(), "raw_data")
362
        exp = [{'fp_id': 2,
363
                'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
364
                'fp_type': "raw_barcodes",
365
                'checksum': '2125826711',
366
                'fp_size': 58}]
367
        self.assertEqual(obs, exp)
368
369
        obs = qdb.util.retrieve_filepaths(
370
            'artifact_filepath', 'artifact_id', 1, fp_type='biom')
371
        path_builder = partial(
372
            join, qdb.util.get_db_files_base_dir(), "raw_data")
373
        self.assertEqual(obs, [])
374
375
    def test_retrieve_filepaths_error(self):
376
        with self.assertRaises(qdb.exceptions.QiitaDBError):
377
            qdb.util.retrieve_filepaths('artifact_filepath', 'artifact_id', 1,
378
                                        sort='Unknown')
379
380
    def test_empty_trash_upload_folder(self):
381
        # creating file to delete so we know it actually works
382
        study_id = '1'
383
        uploads_fp = join(qdb.util.get_mountpoint("uploads")[0][1], study_id)
384
        trash = join(uploads_fp, 'trash')
385
        if not exists(trash):
386
            mkdir(trash)
387
        fp = join(trash, 'my_file_to_delete.txt')
388
        open(fp, 'w').close()
389
390
        self.assertTrue(exists(fp))
391
        qdb.util.empty_trash_upload_folder()
392
        self.assertFalse(exists(fp))
393
394
    def test_move_filepaths_to_upload_folder(self):
395
        # we are going to test the move_filepaths_to_upload_folder indirectly
396
        # by creating an artifact and deleting it. To accomplish this we need
397
        # to create a new prep info file, attach a biom with html_summary and
398
        # then delete it. However, we will do this twice to assure that
399
        # there are no conflicts with this
400
        study_id = 1
401
        # creating the 2 sets of files for the 2 artifacts
402
        fd, seqs_fp1 = mkstemp(suffix='_seqs.fastq')
403
        close(fd)
404
405
        html_fp1 = mkdtemp()
406
        html_fp1 = join(html_fp1, 'support_files')
407
        mkdir(html_fp1)
408
        with open(join(html_fp1, 'index.html'), 'w') as fp:
409
            fp.write(">AAA\nAAA")
410
        fd, seqs_fp2 = mkstemp(suffix='_seqs.fastq')
411
        close(fd)
412
413
        html_fp2 = mkdtemp()
414
        html_fp2 = join(html_fp2, 'support_files')
415
        mkdir(html_fp2)
416
        with open(join(html_fp2, 'index.html'), 'w') as fp:
417
            fp.write(">AAA\nAAA")
418
419
        # creating new prep info file
420
        metadata_dict = {
421
            'SKB8.640193': {'center_name': 'ANL',
422
                            'primer': 'GTGCCAGCMGCCGCGGTAA',
423
                            'barcode': 'GTCCGCAAGTTA',
424
                            'run_prefix': "s_G1_L001_sequences",
425
                            'platform': 'Illumina',
426
                            'instrument_model': 'Illumina MiSeq',
427
                            'library_construction_protocol': 'AAAA',
428
                            'experiment_design_description': 'BBBB'}}
429
        metadata = pd.DataFrame.from_dict(
430
            metadata_dict, orient='index', dtype=str)
431
        pt1 = qdb.metadata_template.prep_template.PrepTemplate.create(
432
            metadata, qdb.study.Study(study_id), "16S")
433
        pt2 = qdb.metadata_template.prep_template.PrepTemplate.create(
434
            metadata, qdb.study.Study(study_id), "16S")
435
436
        # inserting artifact 1
437
        artifact1 = qdb.artifact.Artifact.create(
438
            [(seqs_fp1, 1), (html_fp1, 'html_summary')], "FASTQ",
439
            prep_template=pt1)
440
        filepaths = artifact1.filepaths
441
        # inserting artifact 2
442
        artifact2 = qdb.artifact.Artifact.create(
443
            [(seqs_fp2, 1), (html_fp2, 'html_summary')], "FASTQ",
444
            prep_template=pt2)
445
        filepaths.extend(artifact2.filepaths)
446
447
        # get before delete files in upload folders
448
        GUPLOADS = qdb.util.get_files_from_uploads_folders
449
        upload_files = set(GUPLOADS("1"))
450
451
        # delete artifact 1
452
        qdb.artifact.Artifact.delete(artifact1.id)
453
454
        # confirm that _only_ the fastq from the file is recovered; this means
455
        # that all the extra files/folders were ignored
456
        diff_upload = set(GUPLOADS("1")) - set(upload_files)
457
        self.assertEqual(len(diff_upload), 1)
458
        self.assertEqual(diff_upload.pop()[1], basename(seqs_fp1))
459
460
        # finish deleting artifacts :: there should be a new fastq
461
        qdb.artifact.Artifact.delete(artifact2.id)
462
        diff_upload = set(GUPLOADS("1")) - set(upload_files)
463
        self.assertEqual(len(diff_upload), 2)
464
        self.assertCountEqual(
465
            [x[1] for x in diff_upload],
466
            [basename(seqs_fp1), basename(seqs_fp2)])
467
468
        # now let's create another artifact with the same filenames that
469
        # artifact1 so we can test successfull overlapping of names
470
        with open(seqs_fp1, 'w') as fp:
471
            fp.write(">AAA\nAAA")
472
        mkdir(html_fp1)
473
        with open(join(html_fp1, 'index.html'), 'w') as fp:
474
            fp.write(">AAA\nAAA")
475
        artifact3 = qdb.artifact.Artifact.create(
476
            [(seqs_fp1, 1), (html_fp1, 'html_summary')], "FASTQ",
477
            prep_template=pt1)
478
        filepaths.extend(artifact3.filepaths)
479
        qdb.artifact.Artifact.delete(artifact3.id)
480
481
        # files should be the same as the previous test
482
        diff_upload = set(GUPLOADS("1")) - set(upload_files)
483
        self.assertEqual(len(diff_upload), 2)
484
        self.assertCountEqual(
485
            [x[1] for x in diff_upload],
486
            [basename(seqs_fp1), basename(seqs_fp2)])
487
488
        bd = qdb.util.get_mountpoint("uploads")[0][1]
489
        for x in filepaths:
490
            self.files_to_remove.append(join(bd, "1", basename(x['fp'])))
491
492
    def test_get_mountpoint(self):
493
        exp = [(5, join(qdb.util.get_db_files_base_dir(), 'raw_data'))]
494
        obs = qdb.util.get_mountpoint("raw_data")
495
        self.assertEqual(obs, exp)
496
497
        exp = [(1, join(qdb.util.get_db_files_base_dir(), 'analysis'))]
498
        obs = qdb.util.get_mountpoint("analysis")
499
        self.assertEqual(obs, exp)
500
501
        exp = [(2, join(qdb.util.get_db_files_base_dir(), 'job'))]
502
        obs = qdb.util.get_mountpoint("job")
503
        self.assertEqual(obs, exp)
504
505
        # inserting new ones so we can test that it retrieves these and
506
        # doesn't alter other ones
507
        qdb.sql_connection.perform_as_transaction(
508
            "UPDATE qiita.data_directory SET active=false WHERE "
509
            "data_directory_id=1")
510
        count = qdb.util.get_count('qiita.data_directory')
511
        sql = """INSERT INTO qiita.data_directory (data_type, mountpoint,
512
                                                   subdirectory, active)
513
                 VALUES ('analysis', 'analysis_tmp', true, true),
514
                        ('raw_data', 'raw_data_tmp', true, false)"""
515
        qdb.sql_connection.perform_as_transaction(sql)
516
517
        # this should have been updated
518
        exp = [(count + 1, join(qdb.util.get_db_files_base_dir(),
519
                'analysis_tmp'))]
520
        obs = qdb.util.get_mountpoint("analysis")
521
        self.assertEqual(obs, exp)
522
523
        # these 2 shouldn't
524
        exp = [(5, join(qdb.util.get_db_files_base_dir(), 'raw_data'))]
525
        obs = qdb.util.get_mountpoint("raw_data")
526
        self.assertEqual(obs, exp)
527
528
        exp = [(2, join(qdb.util.get_db_files_base_dir(), 'job'))]
529
        obs = qdb.util.get_mountpoint("job")
530
        self.assertEqual(obs, exp)
531
532
        # testing multi returns
533
        exp = [(5, join(qdb.util.get_db_files_base_dir(), 'raw_data')),
534
               (count + 2, join(qdb.util.get_db_files_base_dir(),
535
                'raw_data_tmp'))]
536
        obs = qdb.util.get_mountpoint("raw_data", retrieve_all=True)
537
        self.assertEqual(obs, exp)
538
539
        # testing retrieve subdirectory
540
        exp = [
541
            (5, join(qdb.util.get_db_files_base_dir(), 'raw_data'), False),
542
            (count + 2, join(qdb.util.get_db_files_base_dir(), 'raw_data_tmp'),
543
             True)]
544
        obs = qdb.util.get_mountpoint("raw_data", retrieve_all=True,
545
                                      retrieve_subdir=True)
546
        self.assertEqual(obs, exp)
547
548
    def test_get_mountpoint_path_by_id(self):
549
        exp = join(qdb.util.get_db_files_base_dir(), 'raw_data')
550
        obs = qdb.util.get_mountpoint_path_by_id(5)
551
        self.assertEqual(obs, exp)
552
553
        exp = join(qdb.util.get_db_files_base_dir(), 'analysis')
554
        obs = qdb.util.get_mountpoint_path_by_id(1)
555
        self.assertEqual(obs, exp)
556
557
        exp = join(qdb.util.get_db_files_base_dir(), 'job')
558
        obs = qdb.util.get_mountpoint_path_by_id(2)
559
        self.assertEqual(obs, exp)
560
561
        # inserting new ones so we can test that it retrieves these and
562
        # doesn't alter other ones
563
        qdb.sql_connection.perform_as_transaction(
564
            "UPDATE qiita.data_directory SET active=false WHERE "
565
            "data_directory_id=1")
566
        count = qdb.util.get_count('qiita.data_directory')
567
        sql = """INSERT INTO qiita.data_directory (data_type, mountpoint,
568
                                                   subdirectory, active)
569
                 VALUES ('analysis', 'analysis_tmp', true, true),
570
                        ('raw_data', 'raw_data_tmp', true, false)"""
571
        qdb.sql_connection.perform_as_transaction(sql)
572
573
        # this should have been updated
574
        exp = join(qdb.util.get_db_files_base_dir(), 'analysis_tmp')
575
        obs = qdb.util.get_mountpoint_path_by_id(count + 1)
576
        self.assertEqual(obs, exp)
577
578
        # these 2 shouldn't
579
        exp = join(qdb.util.get_db_files_base_dir(), 'raw_data')
580
        obs = qdb.util.get_mountpoint_path_by_id(5)
581
        self.assertEqual(obs, exp)
582
583
        exp = join(qdb.util.get_db_files_base_dir(), 'job')
584
        obs = qdb.util.get_mountpoint_path_by_id(2)
585
        self.assertEqual(obs, exp)
586
587
    def test_get_files_from_uploads_folders(self):
588
        # something has been uploaded and ignoring hidden files/folders
589
        # and folders
590
        exp = (7, 'uploaded_file.txt', '0B')
591
        obs = qdb.util.get_files_from_uploads_folders("1")
592
        self.assertIn(exp, obs)
593
594
        # nothing has been uploaded
595
        exp = []
596
        obs = qdb.util.get_files_from_uploads_folders("2")
597
        self.assertEqual(obs, exp)
598
599
    def test_move_upload_files_to_trash(self):
600
        test_filename = 'this_is_a_test_file.txt'
601
602
        # create file to move to trash
603
        fid, folder = qdb.util.get_mountpoint("uploads")[0]
604
        test_fp = join(folder, '1', test_filename)
605
        with open(test_fp, 'w') as f:
606
            f.write('test')
607
608
        self.files_to_remove.append(test_fp)
609
610
        exp = (fid, 'this_is_a_test_file.txt', '4B')
611
        obs = qdb.util.get_files_from_uploads_folders("1")
612
        self.assertIn(exp, obs)
613
614
        # move file
615
        qdb.util.move_upload_files_to_trash(1, [(fid, test_filename)])
616
        obs = qdb.util.get_files_from_uploads_folders("1")
617
        self.assertNotIn(obs, exp)
618
619
        # if the file doesn't exist, don't raise any errors
620
        qdb.util.move_upload_files_to_trash(1, [(fid, test_filename)])
621
622
        # testing errors
623
        # - study doesn't exist
624
        with self.assertRaises(qdb.exceptions.QiitaDBError):
625
            qdb.util.move_upload_files_to_trash(100, [(fid, test_filename)])
626
        # - fid doen't exist
627
        with self.assertRaises(qdb.exceptions.QiitaDBError):
628
            qdb.util.move_upload_files_to_trash(1, [(10, test_filename)])
629
630
        # removing trash folder
631
        rmtree(join(folder, '1', 'trash'))
632
633
    def test_get_environmental_packages(self):
634
        obs = qdb.util.get_environmental_packages()
635
        exp = [['air', 'ep_air'],
636
               ['built environment', 'ep_built_environment'],
637
               ['host-associated', 'ep_host_associated'],
638
               ['human-amniotic-fluid', 'ep_human_amniotic_fluid'],
639
               ['human-associated', 'ep_human_associated'],
640
               ['human-blood', 'ep_human_blood'],
641
               ['human-gut', 'ep_human_gut'],
642
               ['human-oral', 'ep_human_oral'],
643
               ['human-skin', 'ep_human_skin'],
644
               ['human-urine', 'ep_human_urine'],
645
               ['human-vaginal', 'ep_human_vaginal'],
646
               ['microbial mat/biofilm', 'ep_microbial_mat_biofilm'],
647
               ['miscellaneous natural or artificial environment',
648
                'ep_misc_artif'],
649
               ['plant-associated', 'ep_plant_associated'],
650
               ['sediment', 'ep_sediment'],
651
               ['soil', 'ep_soil'],
652
               ['wastewater/sludge', 'ep_wastewater_sludge'],
653
               ['water', 'ep_water']]
654
        self.assertEqual(sorted(obs), sorted(exp))
655
656
    def test_get_timeseries_types(self):
657
        obs = qdb.util.get_timeseries_types()
658
        exp = [[1, 'None', 'None'],
659
               [2, 'real', 'single intervention'],
660
               [3, 'real', 'multiple intervention'],
661
               [4, 'real', 'combo intervention'],
662
               [5, 'pseudo', 'single intervention'],
663
               [6, 'pseudo', 'multiple intervention'],
664
               [7, 'pseudo', 'combo intervention'],
665
               [8, 'mixed', 'single intervention'],
666
               [9, 'mixed', 'multiple intervention'],
667
               [10, 'mixed', 'combo intervention']]
668
        self.assertEqual(obs, exp)
669
670
    def test_get_filepath_information(self):
671
        obs = qdb.util.get_filepath_information(1)
672
        # This path is machine specific. Just checking that is not empty
673
        self.assertIsNotNone(obs.pop('fullpath'))
674
        exp = {'filepath_id': 1, 'filepath': '1_s_G1_L001_sequences.fastq.gz',
675
               'filepath_type': 'raw_forward_seqs', 'checksum': '2125826711',
676
               'data_type': 'raw_data', 'mountpoint': 'raw_data',
677
               'subdirectory': False, 'active': True}
678
        self.assertEqual(obs, exp)
679
680
    def test_filepath_id_to_rel_path(self):
681
        obs = qdb.util.filepath_id_to_rel_path(1)
682
        exp = 'raw_data/1_s_G1_L001_sequences.fastq.gz'
683
        self.assertEqual(obs, exp)
684
685
        obs = qdb.util.filepath_id_to_rel_path(3)
686
        exp = 'preprocessed_data/1_seqs.fna'
687
        self.assertEqual(obs, exp)
688
689
        fd, fp = mkstemp()
690
        close(fd)
691
        with open(fp, 'w') as f:
692
            f.write('\n')
693
        self.files_to_remove.append(fp)
694
        test = qdb.util.insert_filepaths(
695
            [(fp, "raw_forward_seqs")], 2, "FASTQ")[0]
696
        sql = """INSERT INTO qiita.artifact_filepath
697
                        (artifact_id, filepath_id)
698
                    VALUES (%s, %s)"""
699
        qdb.sql_connection.perform_as_transaction(sql, [2, test])
700
701
        obs = qdb.util.filepath_id_to_rel_path(test)
702
        exp = 'FASTQ/2/%s' % basename(fp)
703
        self.assertEqual(obs, exp)
704
705
    def test_filepath_ids_to_rel_paths(self):
706
        fd, fp = mkstemp()
707
        close(fd)
708
        with open(fp, 'w') as f:
709
            f.write('\n')
710
        self.files_to_remove.append(fp)
711
        test = qdb.util.insert_filepaths(
712
            [(fp, "raw_forward_seqs")], 2, "FASTQ")[0]
713
        sql = """INSERT INTO qiita.artifact_filepath
714
                        (artifact_id, filepath_id)
715
                    VALUES (%s, %s)"""
716
        qdb.sql_connection.perform_as_transaction(sql, [2, test])
717
718
        obs = qdb.util.filepath_ids_to_rel_paths([1, 3, test])
719
        exp = {1: 'raw_data/1_s_G1_L001_sequences.fastq.gz',
720
               3: 'preprocessed_data/1_seqs.fna',
721
               test: 'FASTQ/2/%s' % basename(fp)}
722
723
        self.assertEqual(obs, exp)
724
725
    def test_add_message(self):
726
        count = qdb.util.get_count('qiita.message') + 1
727
        user = qdb.user.User.create('new@test.bar', 'password')
728
        users = [user]
729
        qdb.util.add_message("TEST MESSAGE", users)
730
731
        obs = [[x[0], x[1]] for x in user.messages()]
732
        exp = [[count, 'TEST MESSAGE']]
733
        self.assertEqual(obs, exp)
734
735
    def test_add_system_message(self):
736
        count = qdb.util.get_count('qiita.message') + 1
737
        qdb.util.add_system_message("SYS MESSAGE",
738
                                    datetime(2015, 8, 5, 19, 41))
739
740
        obs = [[x[0], x[1]]
741
               for x in qdb.user.User('shared@foo.bar').messages()]
742
        exp = [[count, 'SYS MESSAGE'], [1, 'message 1']]
743
        self.assertEqual(obs, exp)
744
        obs = [[x[0], x[1]] for x in qdb.user.User('admin@foo.bar').messages()]
745
        exp = [[count, 'SYS MESSAGE']]
746
        self.assertEqual(obs, exp)
747
748
        sql = "SELECT expiration from qiita.message WHERE message_id = %s"
749
        with qdb.sql_connection.TRN:
750
            qdb.sql_connection.TRN.add(sql, [count])
751
            obs = qdb.sql_connection.TRN.execute_fetchindex()
752
        exp = [[datetime(2015, 8, 5, 19, 41)]]
753
        self.assertEqual(obs, exp)
754
755
    def test_clear_system_messages(self):
756
        message_id = qdb.util.get_count('qiita.message') + 1
757
        user = qdb.user.User.create('csm@test.bar', 'password')
758
        obs = [[x[0], x[1]] for x in user.messages()]
759
        exp = []
760
        self.assertEqual(obs, exp)
761
762
        qdb.util.add_system_message("SYS MESSAGE",
763
                                    datetime(2015, 8, 5, 19, 41))
764
        obs = [[x[0], x[1]] for x in user.messages()]
765
        exp = [[message_id, 'SYS MESSAGE']]
766
        self.assertCountEqual(obs, exp)
767
768
        qdb.util.clear_system_messages()
769
        obs = [[x[0], x[1]] for x in user.messages()]
770
        exp = []
771
        self.assertEqual(obs, exp)
772
773
        # Run again with no system messages to make sure no errors
774
        qdb.util.clear_system_messages()
775
776
    def test_supported_filepath_types(self):
777
        obs = qdb.util.supported_filepath_types("FASTQ")
778
        exp = [["raw_forward_seqs", True], ["raw_reverse_seqs", False],
779
               ["raw_barcodes", True]]
780
        self.assertCountEqual(obs, exp)
781
782
        obs = qdb.util.supported_filepath_types("BIOM")
783
        exp = [["biom", True], ["directory", False], ["log", False]]
784
        self.assertCountEqual(obs, exp)
785
786
    def test_generate_analysis_list(self):
787
        self.assertEqual(qdb.util.generate_analysis_list([]), [])
788
789
        obs = qdb.util.generate_analysis_list([1, 2, 3, 5])
790
        exp = [{'mapping_files': [
791
                (16, qdb.util.get_filepath_information(16)['fullpath'])],
792
                'description': 'A test analysis', 'artifacts': [8, 9], 'name':
793
                'SomeAnalysis', 'owner': 'test@foo.bar', 'analysis_id': 1,
794
                'visibility': 'private'},
795
               {'mapping_files': [], 'description': 'Another test analysis',
796
                'artifacts': [], 'name': 'SomeSecondAnalysis',
797
                'owner': 'admin@foo.bar',
798
                'analysis_id': 2, 'visibility': 'private'}]
799
        # removing timestamp for testing
800
        for i in range(len(obs)):
801
            del obs[i]['timestamp']
802
        self.assertEqual(obs, exp)
803
804
        self.assertEqual(
805
            qdb.util.generate_analysis_list([1, 2, 3, 5], True), [])
806
807
808
@qiita_test_checker()
809
class UtilTests(TestCase):
810
    """Tests for the util functions that do not need to access the DB"""
811
812
    def setUp(self):
813
        fh, self.filepath = mkstemp()
814
        close(fh)
815
        with open(self.filepath, "w") as f:
816
            f.write("Some text so we can actually compute a checksum")
817
818
    def test_compute_checksum(self):
819
        """Correctly returns the file checksum"""
820
        obs = qdb.util.compute_checksum(self.filepath)
821
        exp = 1719580229
822
        self.assertEqual(obs, exp)
823
824
    def test_scrub_data_nothing(self):
825
        """Returns the same string without changes"""
826
        self.assertEqual(qdb.util.scrub_data("nothing_changes"),
827
                         "nothing_changes")
828
829
    def test_scrub_data_semicolon(self):
830
        """Correctly removes the semicolon from the string"""
831
        self.assertEqual(qdb.util.scrub_data("remove_;_char"), "remove__char")
832
833
    def test_scrub_data_single_quote(self):
834
        """Correctly removes single quotes from the string"""
835
        self.assertEqual(qdb.util.scrub_data("'quotes'"), "quotes")
836
837
    def test_get_visibilities(self):
838
        obs = qdb.util.get_visibilities()
839
        exp = ['awaiting_approval', 'sandbox', 'private', 'public', 'archived']
840
        self.assertEqual(obs, exp)
841
842
    def test_infer_status(self):
843
        obs = qdb.util.infer_status([])
844
        self.assertEqual(obs, 'sandbox')
845
846
        obs = qdb.util.infer_status([['private']])
847
        self.assertEqual(obs, 'private')
848
849
        obs = qdb.util.infer_status([['private'], ['public']])
850
        self.assertEqual(obs, 'public')
851
852
        obs = qdb.util.infer_status([['sandbox'], ['awaiting_approval']])
853
        self.assertEqual(obs, 'awaiting_approval')
854
855
        obs = qdb.util.infer_status([['sandbox'], ['sandbox']])
856
        self.assertEqual(obs, 'sandbox')
857
858
    def test_get_pubmed_ids_from_dois(self):
859
        exp = {'10.100/123456': '123456'}
860
        obs = qdb.util.get_pubmed_ids_from_dois(['', '10.100/123456'])
861
        self.assertEqual(obs, exp)
862
863
    def test_generate_study_list(self):
864
        USER = qdb.user.User
865
        STUDY = qdb.study.Study
866
        PREP = qdb.metadata_template.prep_template.PrepTemplate
867
        UTIL = qdb.util
868
869
        # testing owner email as name
870
        user = USER('test@foo.bar')
871
        username = user.info['name']
872
        # test without changes
873
        self.assertDictEqual(
874
            STUDY_INFO, UTIL.generate_study_list(user, 'user')[0])
875
        # change user's name to None and tests again
876
        user.info = {'name': None}
877
        exp = STUDY_INFO.copy()
878
        exp['owner'] = 'test@foo.bar'
879
        self.assertDictEqual(
880
            exp, qdb.util.generate_study_list(user, 'user')[0])
881
882
        # returning original name
883
        user.info = {'name': username}
884
885
        # creating a new study to make sure that empty studies are also
886
        # returned
887
        info = {"timeseries_type_id": 1, "metadata_complete": True,
888
                "mixs_compliant": True, "study_alias": "TST",
889
                "study_description": "Some description of the study goes here",
890
                "study_abstract": "Some abstract goes here",
891
                "principal_investigator_id": qdb.study.StudyPerson(1),
892
                "lab_person_id": qdb.study.StudyPerson(1)}
893
        new_study = STUDY.create(
894
            USER('shared@foo.bar'), 'test_study_1', info=info)
895
896
        snew_info = {
897
            'study_title': 'test_study_1',
898
            'metadata_complete': True, 'publication_pid': [],
899
            'artifact_biom_ids': [], 'autoloaded': False,
900
            'study_id': new_study.id, 'ebi_study_accession': None,
901
            'owner': 'Shared', 'shared': [],
902
            'study_abstract': 'Some abstract goes here',
903
            'pi': ('lab_dude@foo.bar', 'LabDude'), 'publication_doi': [],
904
            'study_alias': 'TST', 'study_tags': None,
905
            'preparation_data_types': [], 'number_samples_collected': 0}
906
        exp1 = [STUDY_INFO]
907
        exp2 = [snew_info]
908
        exp_both = [STUDY_INFO, snew_info]
909
910
        # let's make sure that everything is private for study 1
911
        for a in STUDY(1).artifacts():
912
            a.visibility = 'private'
913
914
        # owner of study
915
        obs = UTIL.generate_study_list(USER('test@foo.bar'), 'user')
916
        self.assertEqual(len(obs), 1)
917
        self.assertDictEqual(obs[0], exp1[0])
918
        # shared with
919
        obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'user')
920
        self.assertEqual(len(obs), 2)
921
        self.assertDictEqual(obs[0], exp_both[0])
922
        self.assertDictEqual(obs[1], exp_both[1])
923
        # admin
924
        obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'user')
925
        self.assertEqual(obs, exp_both)
926
        # no access/hidden
927
        obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'user')
928
        self.assertEqual(obs, [])
929
        # public - none for everyone
930
        obs = UTIL.generate_study_list(USER('test@foo.bar'), 'public')
931
        self.assertEqual(obs, [])
932
        obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'public')
933
        self.assertEqual(obs, [])
934
        obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'public')
935
        self.assertEqual(obs, [])
936
        obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'public')
937
        self.assertEqual(obs, [])
938
939
        def _avoid_duplicated_tests(all_artifacts=False):
940
            # nothing should shange for owner, shared
941
            obs = UTIL.generate_study_list(USER('test@foo.bar'), 'user')
942
            self.assertEqual(obs, exp1)
943
            obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'user')
944
            self.assertEqual(obs, exp_both)
945
            # for admin it should be shown in public and user cause there are
946
            # 2 preps and only one is public
947
            obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'user')
948
            if not all_artifacts:
949
                self.assertEqual(obs, exp_both)
950
            else:
951
                self.assertEqual(obs, exp2)
952
            obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'user')
953
            self.assertEqual(obs, [])
954
            # for the public query, everything should be same for owner, share
955
            # and admin but demo should now see it as public but with limited
956
            # artifacts
957
            obs = UTIL.generate_study_list(USER('test@foo.bar'), 'public')
958
            self.assertEqual(obs, [])
959
            obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'public')
960
            self.assertEqual(obs, [])
961
            obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'public')
962
            if not all_artifacts:
963
                exp1[0]['artifact_biom_ids'] = [7]
964
            self.assertEqual(obs, exp1)
965
            obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'public')
966
            self.assertEqual(obs, exp1)
967
968
            # returning artifacts
969
            exp1[0]['artifact_biom_ids'] = [4, 5, 6, 7]
970
971
        # make artifacts of prep 2 public
972
        PREP(2).artifact.visibility = 'public'
973
        _avoid_duplicated_tests()
974
975
        # make artifacts of prep 1 awaiting_approval
976
        PREP(1).artifact.visibility = 'awaiting_approval'
977
        _avoid_duplicated_tests()
978
979
        # making all studies public
980
        PREP(1).artifact.visibility = 'public'
981
        _avoid_duplicated_tests(True)
982
983
        # deleting the new study study and returning artifact status
984
        qdb.study.Study.delete(new_study.id)
985
        PREP(1).artifact.visibility = 'private'
986
        PREP(2).artifact.visibility = 'private'
987
988
    def test_generate_study_list_errors(self):
989
        with self.assertRaises(ValueError):
990
            qdb.util.generate_study_list(qdb.user.User('test@foo.bar'), 'bad')
991
992
    def test_generate_study_list_without_artifacts(self):
993
        # creating a new study to make sure that empty studies are also
994
        # returned
995
        info = {"timeseries_type_id": 1, "metadata_complete": True,
996
                "mixs_compliant": True, "study_alias": "TST",
997
                "study_description": "Some description of the study goes here",
998
                "study_abstract": "Some abstract goes here",
999
                "principal_investigator_id": qdb.study.StudyPerson(1),
1000
                "lab_person_id": qdb.study.StudyPerson(1)}
1001
        new_study = qdb.study.Study.create(
1002
            qdb.user.User('shared@foo.bar'), 'test_study_1', info=info)
1003
1004
        exp_info = [
1005
            {'study_title': (
1006
                'Identification of the Microbiomes for Cannabis Soils'),
1007
             'metadata_complete': True, 'publication_pid': [
1008
                '123456', '7891011'],
1009
             'study_id': 1, 'ebi_study_accession': 'EBI123456-BB',
1010
             'autoloaded': False,
1011
             'study_abstract': (
1012
                'This is a preliminary study to examine the microbiota '
1013
                'associated with the Cannabis plant. Soils samples from '
1014
                'the bulk soil, soil associated with the roots, and the '
1015
                'rhizosphere were extracted and the DNA sequenced. Roots '
1016
                'from three independent plants of different strains were '
1017
                'examined. These roots were obtained November 11, 2011 from '
1018
                'plants that had been harvested in the summer. Future studies '
1019
                'will attempt to analyze the soils and rhizospheres from the '
1020
                'same location at different time points in the plant '
1021
                'lifecycle.'), 'pi': ('PI_dude@foo.bar', 'PIDude'),
1022
             'publication_doi': ['10.100/123456', '10.100/7891011'],
1023
             'study_alias': 'Cannabis Soils', 'number_samples_collected': 27},
1024
            {'study_title': 'test_study_1',
1025
             'metadata_complete': True, 'publication_pid': [],
1026
             'autoloaded': False,
1027
             'study_id': new_study.id, 'ebi_study_accession': None,
1028
             'study_abstract': 'Some abstract goes here',
1029
             'pi': ('lab_dude@foo.bar', 'LabDude'), 'publication_doi': [],
1030
             'study_alias': 'TST', 'number_samples_collected': 0}]
1031
        obs_info = qdb.util.generate_study_list_without_artifacts([1, 2, 3, 4])
1032
        self.assertEqual(obs_info, exp_info)
1033
1034
        obs_info = qdb.util.generate_study_list_without_artifacts(
1035
            [1, 2, 3, 4], 'EMP')
1036
        self.assertEqual(obs_info, [])
1037
1038
        # deleting the old study
1039
        qdb.study.Study.delete(new_study.id)
1040
1041
    def test_get_artifacts_information(self):
1042
        # we are going to test that it ignores 1 and 2 cause they are not biom,
1043
        # 4 has all information and 7 and 8 don't
1044
        obs = qdb.util.get_artifacts_information([1, 2, 4, 6, 7, 8])
1045
        # not testing timestamp
1046
        for i in range(len(obs)):
1047
            del obs[i]['timestamp']
1048
1049
        exp = [
1050
            {'artifact_id': 6, 'target_subfragment': ['V4'],
1051
             'prep_samples': 27, 'platform': 'Illumina',
1052
             'target_gene': '16S rRNA', 'name': 'BIOM', 'data_type': '16S',
1053
             'parameters': {'reference': '2', 'similarity': '0.97',
1054
                            'sortmerna_e_value': '1',
1055
                            'sortmerna_max_pos': '10000', 'threads': '1',
1056
                            'sortmerna_coverage': '0.97'},
1057
             'algorithm': 'Pick closed-reference OTUs | Split libraries FASTQ',
1058
             'algorithm_az': 'd480799a0a7a2fbe0e9022bc9c602018',
1059
             'deprecated': False, 'active': True,
1060
             'files': ['1_study_1001_closed_reference_otu_table_Silva.biom']},
1061
            {'artifact_id': 4, 'target_subfragment': ['V4'],
1062
             'prep_samples': 27, 'platform': 'Illumina',
1063
             'target_gene': '16S rRNA', 'name': 'BIOM', 'data_type': '18S',
1064
             'parameters': {'reference': '1', 'similarity': '0.97',
1065
                            'sortmerna_e_value': '1',
1066
                            'sortmerna_max_pos': '10000', 'threads': '1',
1067
                            'sortmerna_coverage': '0.97'},
1068
             'algorithm': 'Pick closed-reference OTUs | Split libraries FASTQ',
1069
             'algorithm_az': 'd480799a0a7a2fbe0e9022bc9c602018',
1070
             'deprecated': False, 'active': True,
1071
             'files': ['1_study_1001_closed_reference_otu_table.biom']},
1072
            {'artifact_id': 7, 'target_subfragment': ['V4'],
1073
             'prep_samples': 27, 'platform': 'Illumina',
1074
             'target_gene': '16S rRNA', 'name': 'BIOM', 'data_type': '16S',
1075
             'parameters': {}, 'algorithm': '', 'algorithm_az': '',
1076
             'deprecated': False, 'active': True,
1077
             'files': ['biom_table.biom']},
1078
            {'artifact_id': 8, 'target_subfragment': [], 'prep_samples': 0,
1079
             'platform': 'not provided', 'target_gene': 'not provided', 'name':
1080
             'noname', 'data_type': '18S', 'parameters': {}, 'algorithm': '',
1081
             'algorithm_az': '', 'deprecated': False, 'active': True,
1082
             'files': ['biom_table.biom']}]
1083
        self.assertCountEqual(obs, exp)
1084
        exp = exp[1:]
1085
1086
        # now let's test that the order given by the commands actually give the
1087
        # correct results
1088
        with qdb.sql_connection.TRN:
1089
            # setting up database changes for just checking commands
1090
            qdb.sql_connection.TRN.add(
1091
                """UPDATE qiita.command_parameter SET check_biom_merge = True
1092
                   WHERE parameter_name = 'reference'""")
1093
            qdb.sql_connection.TRN.execute()
1094
1095
            # testing that it works as expected
1096
            obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
1097
            # not testing timestamp
1098
            for i in range(len(obs)):
1099
                del obs[i]['timestamp']
1100
            exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1) '
1101
                                   '| Split libraries FASTQ')
1102
            exp[0]['algorithm_az'] = '33fed1b35728417d7ba4139b8f817d44'
1103
            self.assertCountEqual(obs, exp)
1104
1105
            # setting up database changes for also command output
1106
            qdb.sql_connection.TRN.add(
1107
                "UPDATE qiita.command_output SET check_biom_merge = True")
1108
            qdb.sql_connection.TRN.execute()
1109
            obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
1110
            # not testing timestamp
1111
            for i in range(len(obs)):
1112
                del obs[i]['timestamp']
1113
            exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1, '
1114
                                   'BIOM: 1_study_1001_closed_reference_'
1115
                                   'otu_table.biom) | Split libraries FASTQ')
1116
            exp[0]['algorithm_az'] = 'de5b794a2cacd428f36fea86df196bfd'
1117
            self.assertCountEqual(obs, exp)
1118
1119
            # let's test that we ignore the parent_info
1120
            qdb.sql_connection.TRN.add("""UPDATE qiita.software_command
1121
                                          SET ignore_parent_command = True""")
1122
            qdb.sql_connection.TRN.execute()
1123
            obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
1124
            # not testing timestamp
1125
            for i in range(len(obs)):
1126
                del obs[i]['timestamp']
1127
            exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1, '
1128
                                   'BIOM: 1_study_1001_closed_reference_'
1129
                                   'otu_table.biom)')
1130
            exp[0]['algorithm_az'] = '7f59a45b2f0d30cd1ed1929391c26e07'
1131
            self.assertCountEqual(obs, exp)
1132
1133
            # let's test that we ignore the parent_info
1134
            qdb.sql_connection.TRN.add("""UPDATE qiita.software_command
1135
                                          SET ignore_parent_command = True""")
1136
            qdb.sql_connection.TRN.execute()
1137
            obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
1138
            # not testing timestamp
1139
            for i in range(len(obs)):
1140
                del obs[i]['timestamp']
1141
            exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1, '
1142
                                   'BIOM: 1_study_1001_closed_reference_'
1143
                                   'otu_table.biom)')
1144
            exp[0]['algorithm_az'] = '7f59a45b2f0d30cd1ed1929391c26e07'
1145
            self.assertCountEqual(obs, exp)
1146
1147
            # returning database as it was
1148
            qdb.sql_connection.TRN.add(
1149
                "UPDATE qiita.command_output SET check_biom_merge = False")
1150
            qdb.sql_connection.TRN.add("""UPDATE qiita.software_command
1151
                                          SET ignore_parent_command = False""")
1152
            qdb.sql_connection.TRN.add(
1153
                """UPDATE qiita.command_parameter SET check_biom_merge = False
1154
                   WHERE parameter_name = 'reference'""")
1155
            qdb.sql_connection.TRN.execute()
1156
1157
1158
class TestFilePathOpening(TestCase):
1159
    """Tests adapted from scikit-bio's skbio.io.util tests"""
1160
    def test_is_string_or_bytes(self):
1161
        self.assertTrue(qdb.util._is_string_or_bytes('foo'))
1162
        self.assertTrue(qdb.util._is_string_or_bytes(u'foo'))
1163
        self.assertTrue(qdb.util._is_string_or_bytes(b'foo'))
1164
        self.assertFalse(qdb.util._is_string_or_bytes(StringIO('bar')))
1165
        self.assertFalse(qdb.util._is_string_or_bytes([1]))
1166
1167
    def test_file_closed(self):
1168
        """File gets closed in decorator"""
1169
        f = NamedTemporaryFile('r')
1170
        filepath = f.name
1171
        with qdb.util.open_file(filepath) as fh:
1172
            pass
1173
        self.assertTrue(fh.closed)
1174
1175
    def test_file_closed_harder(self):
1176
        """File gets closed in decorator, even if exceptions happen."""
1177
        f = NamedTemporaryFile('r')
1178
        filepath = f.name
1179
        try:
1180
            with qdb.util.open_file(filepath) as fh:
1181
                raise TypeError
1182
        except TypeError:
1183
            self.assertTrue(fh.closed)
1184
        else:
1185
            # If we're here, no exceptions have been raised inside the
1186
            # try clause, so the context manager swallowed them. No
1187
            # good.
1188
            raise Exception("`open_file` didn't propagate exceptions")
1189
1190
    def test_filehandle(self):
1191
        """Filehandles slip through untouched"""
1192
        with TemporaryFile('r') as fh:
1193
            with qdb.util.open_file(fh) as ffh:
1194
                self.assertTrue(fh is ffh)
1195
            # And it doesn't close the file-handle
1196
            self.assertFalse(fh.closed)
1197
1198
    def test_StringIO(self):
1199
        """StringIO (useful e.g. for testing) slips through."""
1200
        f = StringIO("File contents")
1201
        with qdb.util.open_file(f) as fh:
1202
            self.assertTrue(fh is f)
1203
1204
    def test_BytesIO(self):
1205
        """BytesIO (useful e.g. for testing) slips through."""
1206
        f = BytesIO(b"File contents")
1207
        with qdb.util.open_file(f) as fh:
1208
            self.assertTrue(fh is f)
1209
1210
    def test_hdf5IO(self):
1211
        """This tests that if we send a file handler it returns it"""
1212
        f = h5py.File('test', driver='core', backing_store=False, mode='w')
1213
        with qdb.util.open_file(f) as fh:
1214
            self.assertTrue(fh is f)
1215
1216
    def test_hdf5IO_open(self):
1217
        with NamedTemporaryFile(delete=False) as fh:
1218
            name = fh.name
1219
            fh.close()
1220
1221
            h5file = h5py.File(name, 'w')
1222
            h5file.close()
1223
1224
            with qdb.util.open_file(name) as fh_inner:
1225
                self.assertTrue(isinstance(fh_inner, h5py.File))
1226
1227
        remove(name)
1228
1229
1230
class PurgeFilepathsTests(DBUtilTestsBase):
1231
1232
    def _get_current_filepaths(self):
1233
        sql_fp = "SELECT filepath_id FROM qiita.filepath"
1234
        with qdb.sql_connection.TRN:
1235
            qdb.sql_connection.TRN.add(sql_fp)
1236
            results = qdb.sql_connection.TRN.execute_fetchflatten()
1237
        return [qdb.util.get_filepath_information(_id)['fullpath']
1238
                for _id in results]
1239
1240
    def _create_files(self, files):
1241
        # format is: [mp_id, fp_type_id, file_name]
1242
        sql = """INSERT INTO qiita.filepath (
1243
                    data_directory_id, filepath_type_id, filepath, checksum,
1244
                    checksum_algorithm_id)
1245
                 VALUES (%s, %s, %s, '852952723', 1) RETURNING filepath_id"""
1246
        with qdb.sql_connection.TRN:
1247
            for f in files:
1248
                qdb.sql_connection.TRN.add(sql, tuple(f))
1249
                fid = qdb.sql_connection.TRN.execute_fetchflatten()[0]
1250
                qdb.util.get_filepath_information(fid)
1251
1252
    def test_purge_filepaths_test(self):
1253
        # Get all the filepaths so we can test if they've been removed or not
1254
        fps_expected = self._get_current_filepaths()
1255
        # Make sure that the files exist - specially for travis
1256
        for fp in fps_expected:
1257
            if not exists(fp):
1258
                with open(fp, 'w') as f:
1259
                    f.write('\n')
1260
                self.files_to_remove.append(fp)
1261
1262
        # nothing shold be removed
1263
        qdb.util.purge_filepaths()
1264
        fps_viewed = self._get_current_filepaths()
1265
        self.assertCountEqual(fps_expected, fps_viewed)
1266
1267
        # testing study filepath delete by inserting a new study sample info
1268
        # and make sure it gets deleted
1269
        mp_id, mp = qdb.util.get_mountpoint('templates')[0]
1270
        txt_id = qdb.util.convert_to_id('sample_template', "filepath_type")
1271
        self._create_files([[mp_id, txt_id, '100_filepath.txt']])
1272
        qdb.util.purge_filepaths()
1273
        fps_viewed = self._get_current_filepaths()
1274
        self.assertCountEqual(fps_expected, fps_viewed)
1275
1276
        # testing artifact [A], creating a folder with an artifact that
1277
        # doesn't exist
1278
        _, mp = qdb.util.get_mountpoint('per_sample_FASTQ')[0]
1279
        not_an_artifact_fp = join(mp, '10000')
1280
        mkdir(not_an_artifact_fp)
1281
        # now let's add test for [B] by creating 2 filepaths without a
1282
        # link to the artifacts tables
1283
        mp_id, mp = qdb.util.get_mountpoint('BIOM')[0]
1284
        biom_id = qdb.util.convert_to_id('biom', "filepath_type")
1285
        self._create_files([
1286
            [mp_id, txt_id, 'artifact_filepath.txt'],
1287
            [mp_id, biom_id, 'my_biom.biom']
1288
        ])
1289
        # adding files to tests
1290
        qdb.util.purge_filepaths()
1291
        fps_viewed = self._get_current_filepaths()
1292
        self.assertCountEqual(fps_expected, fps_viewed)
1293
        self.assertFalse(exists(not_an_artifact_fp))
1294
1295
        # testing analysis filepath delete by filepaths for 2 different files
1296
        # and making sure they get deleted
1297
        mp_id, mp = qdb.util.get_mountpoint('analysis')[0]
1298
        biom_id = qdb.util.convert_to_id('biom', "filepath_type")
1299
        self._create_files([
1300
            [mp_id, txt_id, '10000_my_analysis_map.txt'],
1301
            [mp_id, biom_id, '10000_my_analysis_biom.biom']
1302
        ])
1303
        qdb.util.purge_filepaths()
1304
        fps_viewed = self._get_current_filepaths()
1305
        self.assertCountEqual(fps_expected, fps_viewed)
1306
1307
    def test_quick_mounts_purge(self):
1308
        # one of the tests creates a conflicting artifact_type so this test
1309
        # will always raise this ValueError
1310
        with self.assertRaises(ValueError):
1311
            qdb.util.quick_mounts_purge()
1312
1313
1314
class ResourceAllocationPlotTests(TestCase):
1315
    def setUp(self):
1316
        self.cname = "Split libraries FASTQ"
1317
        self.sname = "QIIMEq2"
1318
        self.version = "1.9.1"
1319
        self.col_name = 'samples * columns'
1320
        self.columns = [
1321
                "sName", "sVersion", "cID", "cName", "processing_job_id",
1322
                "parameters", "samples", "columns", "input_size", "extra_info",
1323
                "MaxRSSRaw", "ElapsedRaw", "Start", "node_name", "node_model"]
1324
1325
        # df is a dataframe that represents a table with columns specified in
1326
        # self.columns
1327
        self.df = qdb.util.retrieve_resource_data(
1328
                self.cname, self.sname, self.version, self.columns)
1329
1330
    def test_plot_return(self):
1331
        # check the plot returns correct objects
1332
        fig1, axs1 = qdb.util.resource_allocation_plot(self.df, self.col_name)
1333
        self.assertIsInstance(
1334
            fig1, Figure,
1335
            "Returned object fig1 is not a Matplotlib Figure")
1336
        for ax in axs1:
1337
            self.assertIsInstance(
1338
                ax, Axes,
1339
                "Returned object axs1 is not a single Matplotlib Axes object")
1340
1341
    def test_minimize_const(self):
1342
        self.df = self.df[
1343
            (self.df.cName == self.cname) & (self.df.sName == self.sname)]
1344
        self.df.dropna(subset=['samples', 'columns'], inplace=True)
1345
        self.df[self.col_name] = self.df.samples * self.df['columns']
1346
        fig, axs = plt.subplots(ncols=2, figsize=(10, 4), sharey=False)
1347
1348
        mem_models, time_models = qdb.util.retrieve_equations()
1349
        bm_name, bm, options = qdb.util._resource_allocation_plot_helper(
1350
            self.df, axs[0], 'MaxRSSRaw', mem_models, self.col_name)
1351
        # check that the algorithm chooses correct model for MaxRSSRaw and
1352
        # has 0 failures
1353
        k, a, b = options.x
1354
        failures_df = qdb.util._resource_allocation_success_failures(
1355
            self.df, k, a, b, bm, self.col_name, 'MaxRSSRaw')[-1]
1356
        failures = failures_df.shape[0]
1357
1358
        self.assertEqual(bm_name, 'mem_model4',
1359
                         msg=f"""Best memory model
1360
                         doesn't match
1361
                         {bm_name} != 'mem_model4'""")
1362
        self.assertEqual(bm, mem_models['mem_model4']['equation'],
1363
                         msg=f"""Best memory model
1364
                                 doesn't match
1365
                                 Coefficients:{k} {a} {b}
1366
                            """)
1367
        self.assertEqual(failures, 0, "Number of failures must be 0")
1368
1369
        # check that the algorithm chooses correct model for ElapsedRaw and
1370
        # has 1 failure
1371
        bm_name, bm, options = qdb.util._resource_allocation_plot_helper(
1372
            self.df, axs[1], 'ElapsedRaw', time_models, self.col_name)
1373
        k, a, b = options.x
1374
        failures_df = qdb.util._resource_allocation_success_failures(
1375
            self.df, k, a, b, bm, self.col_name, 'ElapsedRaw')[-1]
1376
        failures = failures_df.shape[0]
1377
        self.assertEqual(bm_name, 'time_model4',
1378
                         msg=f"""Best time model
1379
                         doesn't match
1380
                         {bm_name} != 'time_model4'""")
1381
1382
        self.assertEqual(bm, time_models[bm_name]['equation'],
1383
                         msg=f"""Best time model
1384
                                doesn't match
1385
                                Coefficients:{k} {a} {b}
1386
                                """)
1387
        self.assertEqual(failures, 0, "Number of failures must be 0")
1388
1389
    def test_MaxRSS_helper(self):
1390
        tests = [
1391
            ('6', 6.0),
1392
            ('6K', 6000),
1393
            ('6M', 6000000),
1394
            ('6G', 6000000000),
1395
            ('6.9', 6.9),
1396
            ('6.9K', 6900),
1397
            ('6.9M', 6900000),
1398
            ('6.9G', 6900000000),
1399
        ]
1400
        for x, y in tests:
1401
            self.assertEqual(qdb.util.MaxRSS_helper(x), y)
1402
1403
    def test_db_update(self):
1404
        path_to_data = './qiita_db/test/test_data/slurm_data.txt.gz'
1405
        test_data = pd.read_csv(path_to_data, sep="|")
1406
        types = {
1407
            'Split libraries FASTQ': [
1408
                '6d368e16-2242-4cf8-87b4-a5dc40bb890b',
1409
                '4c7115e8-4c8e-424c-bf25-96c292ca1931',
1410
                'b72369f9-a886-4193-8d3d-f7b504168e75',
1411
                '46b76f74-e100-47aa-9bf2-c0208bcea52d',
1412
                '6ad4d590-4fa3-44d3-9a8f-ddbb472b1b5f'],
1413
            'Pick closed-reference OTUs': [
1414
                '3c9991ab-6c14-4368-a48c-841e8837a79c',
1415
                '80bf25f3-5f1d-4e10-9369-315e4244f6d5',
1416
                '9ba5ae7a-41e1-4202-b396-0259aeaac366',
1417
                'e5609746-a985-41a1-babf-6b3ebe9eb5a9',
1418
            ],
1419
            'Single Rarefaction': [
1420
                '8a7a8461-e8a1-4b4e-a428-1bc2f4d3ebd0'
1421
            ]
1422
        }
1423
1424
        qdb.util.update_resource_allocation_table(test=test_data)
1425
1426
        for curr_cname, ids in types.items():
1427
            updated_df = qdb.util.retrieve_resource_data(
1428
                    curr_cname, self.sname, self.version, self.columns)
1429
            updated_ids_set = set(updated_df['processing_job_id'])
1430
            previous_ids_set = set(self.df['processing_job_id'])
1431
            for id in ids:
1432
                self.assertTrue(id in updated_ids_set)
1433
                self.assertFalse(id in previous_ids_set)
1434
1435
1436
STUDY_INFO = {
1437
    'study_id': 1,
1438
    'owner': 'Dude',
1439
    'study_alias': 'Cannabis Soils',
1440
    'study_abstract':
1441
        'This is a preliminary study to examine the microbiota '
1442
        'associated with the Cannabis plant. Soils samples '
1443
        'from the bulk soil, soil associated with the roots, '
1444
        'and the rhizosphere were extracted and the DNA '
1445
        'sequenced. Roots from three independent plants of '
1446
        'different strains were examined. These roots were '
1447
        'obtained November 11, 2011 from plants that had been '
1448
        'harvested in the summer. Future studies will attempt '
1449
        'to analyze the soils and rhizospheres from the same '
1450
        'location at different time points in the plant '
1451
        'lifecycle.',
1452
    'metadata_complete': True,
1453
    'autoloaded': False,
1454
    'ebi_study_accession': 'EBI123456-BB',
1455
    'study_title':
1456
        'Identification of the Microbiomes for Cannabis Soils',
1457
    'number_samples_collected': 27,
1458
    'shared': [('shared@foo.bar', 'Shared')],
1459
    'publication_doi': ['10.100/123456', '10.100/7891011'],
1460
    'publication_pid': ['123456', '7891011'],
1461
    'pi': ('PI_dude@foo.bar', 'PIDude'),
1462
    'artifact_biom_ids': [4, 5, 6, 7],
1463
    'preparation_data_types': ['18S'],
1464
    'study_tags': None,
1465
}
1466
1467
1468
if __name__ == '__main__':
1469
    main()