Switch to side-by-side view

--- a
+++ b/qiita_db/test/test_util.py
@@ -0,0 +1,1469 @@
+# -----------------------------------------------------------------------------
+# Copyright (c) 2014--, The Qiita Development Team.
+#
+# Distributed under the terms of the BSD 3-clause License.
+#
+# The full license is in the file LICENSE, distributed with this software.
+# -----------------------------------------------------------------------------
+
+from unittest import TestCase, main
+from tempfile import mkstemp, mkdtemp, NamedTemporaryFile, TemporaryFile
+from os import close, remove, mkdir
+from os.path import join, exists, basename
+from shutil import rmtree
+from datetime import datetime
+from functools import partial
+from string import punctuation
+import h5py
+from six import StringIO, BytesIO
+import pandas as pd
+
+from qiita_core.util import qiita_test_checker
+import qiita_db as qdb
+
+from matplotlib.figure import Figure
+from matplotlib.axes import Axes
+import matplotlib.pyplot as plt
+
+
+@qiita_test_checker()
+class DBUtilTestsBase(TestCase):
+    def setUp(self):
+        self.table = 'study'
+        self.required = [
+            'study_title', 'mixs_compliant',
+            'metadata_complete', 'study_description', 'first_contact',
+            'reprocess', 'timeseries_type_id', 'study_alias',
+            'study_abstract', 'principal_investigator_id', 'email']
+        self.files_to_remove = []
+
+    def tearDown(self):
+        for fp in self.files_to_remove:
+            if exists(fp):
+                remove(fp)
+
+
+class DBUtilTests(DBUtilTestsBase):
+    def test_max_preparation_samples(self):
+        """Test that we get the correct max_preparation_samples"""
+        obs = qdb.util.max_preparation_samples()
+        self.assertEqual(obs, 800)
+
+    def test_max_artifacts_in_workflow(self):
+        """Test that we get the correct max_artifacts_in_workflow"""
+        obs = qdb.util.max_artifacts_in_workflow()
+        self.assertEqual(obs, 35)
+
+    def test_filepath_id_to_object_id(self):
+        # filepaths 1, 2 belongs to artifact 1
+        self.assertEqual(qdb.util.filepath_id_to_object_id(1), 1)
+        self.assertEqual(qdb.util.filepath_id_to_object_id(2), 1)
+        # filepaths 3, 4 belongs to artifact 2
+        self.assertEqual(qdb.util.filepath_id_to_object_id(3), 2)
+        self.assertEqual(qdb.util.filepath_id_to_object_id(4), 2)
+        # filepaths 9 belongs to artifact 4
+        self.assertEqual(qdb.util.filepath_id_to_object_id(9), 4)
+        # filepath 16 belongs to anlaysis 1
+        self.assertEqual(qdb.util.filepath_id_to_object_id(16), 1)
+        # filepath 18 belongs to study 1
+        self.assertIsNone(qdb.util.filepath_id_to_object_id(18))
+        # filepath 22 belongs to analysis/artifact 7
+        self.assertEqual(qdb.util.filepath_id_to_object_id(22), 7)
+
+    def test_check_required_columns(self):
+        # Doesn't do anything if correct info passed, only errors if wrong info
+        qdb.util.check_required_columns(self.required, self.table)
+
+    def test_check_required_columns_fail(self):
+        self.required.remove('study_title')
+        with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
+            qdb.util.check_required_columns(self.required, self.table)
+
+    def test_check_table_cols(self):
+        # Doesn't do anything if correct info passed, only errors if wrong info
+        qdb.util.check_table_cols(self.required, self.table)
+
+    def test_check_table_cols_fail(self):
+        self.required.append('BADTHINGNOINHERE')
+        with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
+            qdb.util.check_table_cols(self.required, self.table)
+
+    def test_get_table_cols(self):
+        obs = qdb.util.get_table_cols("qiita_user")
+        exp = {"email", "user_level_id", "password", "name", "affiliation",
+               "address", "phone", "user_verify_code", "pass_reset_code",
+               "pass_reset_timestamp", "receive_processing_job_emails",
+               "social_orcid", "social_researchgate", "social_googlescholar",
+               "creation_timestamp"}
+        self.assertEqual(set(obs), exp)
+
+    def test_exists_table(self):
+        """Correctly checks if a table exists"""
+        # True cases
+        self.assertTrue(qdb.util.exists_table("filepath"))
+        self.assertTrue(qdb.util.exists_table("qiita_user"))
+        self.assertTrue(qdb.util.exists_table("analysis"))
+        self.assertTrue(qdb.util.exists_table("prep_1"))
+        self.assertTrue(qdb.util.exists_table("sample_1"))
+        # False cases
+        self.assertFalse(qdb.util.exists_table("sample_2"))
+        self.assertFalse(qdb.util.exists_table("prep_3"))
+        self.assertFalse(qdb.util.exists_table("foo_table"))
+        self.assertFalse(qdb.util.exists_table("bar_table"))
+
+    def test_convert_to_id(self):
+        """Tests that ids are returned correctly"""
+        self.assertEqual(
+            qdb.util.convert_to_id("directory", "filepath_type"), 8)
+        self.assertEqual(
+            qdb.util.convert_to_id("private", "visibility", "visibility"), 3)
+        self.assertEqual(
+            qdb.util.convert_to_id("EMP", "portal_type", "portal"), 2)
+
+    def test_convert_to_id_bad_value(self):
+        """Tests that ids are returned correctly"""
+        with self.assertRaises(qdb.exceptions.QiitaDBLookupError):
+            qdb.util.convert_to_id("FAKE", "filepath_type")
+
+    def test_get_artifact_types(self):
+        obs = qdb.util.get_artifact_types()
+        exp = {'SFF': 1, 'FASTA_Sanger': 2, 'FASTQ': 3, 'FASTA': 4,
+               'per_sample_FASTQ': 5, 'Demultiplexed': 6, 'BIOM': 7,
+               'beta_div_plots': 8, 'rarefaction_curves': 9,
+               'taxa_summary': 10}
+        self.assertEqual(obs, exp)
+
+        obs = qdb.util.get_artifact_types(key_by_id=True)
+        exp = {v: k for k, v in exp.items()}
+        self.assertEqual(obs, exp)
+
+    def test_get_filepath_types(self):
+        """Tests that get_filepath_types works with valid arguments"""
+        obs = qdb.util.get_filepath_types()
+        exp = {'raw_forward_seqs': 1, 'raw_reverse_seqs': 2,
+               'raw_barcodes': 3, 'preprocessed_fasta': 4,
+               'preprocessed_fastq': 5, 'preprocessed_demux': 6, 'biom': 7,
+               'directory': 8, 'plain_text': 9, 'reference_seqs': 10,
+               'reference_tax': 11, 'reference_tree': 12, 'log': 13,
+               'sample_template': 14, 'prep_template': 15, 'qiime_map': 16,
+               'bam': 17
+               }
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add("SELECT filepath_type,filepath_type_id "
+                                       "FROM qiita.filepath_type")
+            exp = dict(qdb.sql_connection.TRN.execute_fetchindex())
+        self.assertEqual(obs, exp)
+
+        obs = qdb.util.get_filepath_types(key='filepath_type_id')
+        exp = {v: k for k, v in exp.items()}
+        self.assertEqual(obs, exp)
+
+    def test_get_filepath_types_fail(self):
+        """Tests that get_Filetypes fails with invalid argument"""
+        with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
+            qdb.util.get_filepath_types(key='invalid')
+
+    def test_get_data_types(self):
+        """Tests that get_data_types works with valid arguments"""
+        obs = qdb.util.get_data_types()
+        exp = {'16S': 1, '18S': 2, 'ITS': 3, 'Proteomic': 4, 'Metabolomic': 5,
+               'Metagenomic': 6, 'Multiomic': 7, 'Metatranscriptomics': 8,
+               'Viromics': 9, 'Genomics': 10, 'Transcriptomics': 11,
+               'Job Output Folder': 12}
+        self.assertEqual(obs, exp)
+
+        obs = qdb.util.get_data_types(key='data_type_id')
+        exp = {v: k for k, v in exp.items()}
+        self.assertEqual(obs, exp)
+
+    def test_create_rand_string(self):
+        set_punct = set(punctuation)
+
+        obs = qdb.util.create_rand_string(200)
+        self.assertEqual(len(obs), 200)
+        self.assertTrue(set_punct.intersection(set(obs)))
+
+        obs = qdb.util.create_rand_string(400, punct=False)
+        self.assertEqual(len(obs), 400)
+        self.assertFalse(set_punct.intersection(set(obs)))
+
+    def test_get_count(self):
+        """Checks that get_count retrieves proper count"""
+        self.assertEqual(qdb.util.get_count('qiita.study_person'), 3)
+
+    def test_check_count(self):
+        """Checks that check_count returns True and False appropriately"""
+        self.assertTrue(qdb.util.check_count('qiita.study_person', 3))
+        self.assertFalse(qdb.util.check_count('qiita.study_person', 2))
+
+    def test_insert_filepaths(self):
+        fd, fp = mkstemp()
+        close(fd)
+        with open(fp, "w") as f:
+            f.write("\n")
+        self.files_to_remove.append(fp)
+
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add(
+                "SELECT last_value FROM qiita.filepath_filepath_id_seq")
+            exp_new_id = 1 + qdb.sql_connection.TRN.execute_fetchflatten()[0]
+        obs = qdb.util.insert_filepaths([(fp, 1)], 2, "raw_data")
+        self.assertEqual(obs, [exp_new_id])
+
+        # Check that the files have been copied correctly
+        exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
+                      "2_%s" % basename(fp))
+        self.assertTrue(exists(exp_fp))
+        self.assertFalse(exists(fp))
+        self.files_to_remove.append(exp_fp)
+
+        # Check that the filepaths have been added to the DB
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add("SELECT * FROM qiita.filepath "
+                                       "WHERE filepath_id=%d" % exp_new_id)
+            obs = qdb.sql_connection.TRN.execute_fetchindex()
+        exp_fp = "2_%s" % basename(fp)
+        exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5, 1]]
+        self.assertEqual(obs, exp)
+
+        qdb.util.purge_filepaths()
+
+    def test_insert_filepaths_copy(self):
+        fd, fp = mkstemp()
+        close(fd)
+        with open(fp, "w") as f:
+            f.write("\n")
+        self.files_to_remove.append(fp)
+
+        # The id's in the database are bigserials, i.e. they get
+        # autoincremented for each element introduced.
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add(
+                "SELECT last_value FROM qiita.filepath_filepath_id_seq")
+            exp_new_id = 1 + qdb.sql_connection.TRN.execute_fetchflatten()[0]
+        obs = qdb.util.insert_filepaths([(fp, 1)], 2, "raw_data",
+                                        move_files=False, copy=True)
+        self.assertEqual(obs, [exp_new_id])
+
+        # Check that the files have been copied correctly
+        exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
+                      "2_%s" % basename(fp))
+        self.assertTrue(exists(exp_fp))
+        self.assertTrue(exists(fp))
+        self.files_to_remove.append(exp_fp)
+
+        # Check that the filepaths have been added to the DB
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add("SELECT * FROM qiita.filepath "
+                                       "WHERE filepath_id=%d" % exp_new_id)
+            obs = qdb.sql_connection.TRN.execute_fetchindex()
+        exp_fp = "2_%s" % basename(fp)
+        exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5, 1]]
+        self.assertEqual(obs, exp)
+
+        # let's do that again but with move_files = True
+        exp_new_id += 1
+        obs = qdb.util.insert_filepaths([(fp, 1)], 2, "raw_data",
+                                        move_files=True, copy=True)
+        self.assertEqual(obs, [exp_new_id])
+
+        # Check that the files have been copied correctly
+        exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
+                      "2_%s" % basename(fp))
+        self.assertTrue(exists(exp_fp))
+        self.assertTrue(exists(fp))
+        self.files_to_remove.append(exp_fp)
+
+        qdb.util.purge_filepaths()
+
+    def test_insert_filepaths_string(self):
+        fd, fp = mkstemp()
+        close(fd)
+        with open(fp, "w") as f:
+            f.write("\n")
+        self.files_to_remove.append(fp)
+
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add(
+                "SELECT last_value FROM qiita.filepath_filepath_id_seq")
+            exp_new_id = 1 + qdb.sql_connection.TRN.execute_fetchflatten()[0]
+        obs = qdb.util.insert_filepaths(
+            [(fp, "raw_forward_seqs")], 2, "raw_data")
+        self.assertEqual(obs, [exp_new_id])
+
+        # Check that the files have been copied correctly
+        exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
+                      "2_%s" % basename(fp))
+        self.assertTrue(exists(exp_fp))
+        self.files_to_remove.append(exp_fp)
+
+        # Check that the filepaths have been added to the DB
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add("SELECT * FROM qiita.filepath "
+                                       "WHERE filepath_id=%d" % exp_new_id)
+            obs = qdb.sql_connection.TRN.execute_fetchindex()
+        exp_fp = "2_%s" % basename(fp)
+        exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5, 1]]
+        self.assertEqual(obs, exp)
+
+        qdb.util.purge_filepaths()
+
+    def test_retrieve_filepaths(self):
+        obs = qdb.util.retrieve_filepaths('artifact_filepath',
+                                          'artifact_id', 1)
+        path_builder = partial(
+            join, qdb.util.get_db_files_base_dir(), "raw_data")
+        exp = [{'fp_id': 1,
+                'fp': path_builder("1_s_G1_L001_sequences.fastq.gz"),
+                'fp_type': "raw_forward_seqs",
+                'checksum': '2125826711',
+                'fp_size': 58},
+               {'fp_id': 2,
+                'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
+                'fp_type': "raw_barcodes",
+                'checksum': '2125826711',
+                'fp_size': 58}]
+        self.assertEqual(obs, exp)
+
+    def test_retrieve_filepaths_sort(self):
+        obs = qdb.util.retrieve_filepaths(
+            'artifact_filepath', 'artifact_id', 1, sort='descending')
+        path_builder = partial(
+            join, qdb.util.get_db_files_base_dir(), "raw_data")
+        exp = [{'fp_id': 2,
+                'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
+                'fp_type': "raw_barcodes",
+                'checksum': '2125826711',
+                'fp_size': 58},
+               {'fp_id': 1,
+                'fp': path_builder("1_s_G1_L001_sequences.fastq.gz"),
+                'fp_type': "raw_forward_seqs",
+                'checksum': '2125826711',
+                'fp_size': 58}]
+        self.assertEqual(obs, exp)
+
+    def test_retrieve_filepaths_type(self):
+        obs = qdb.util.retrieve_filepaths(
+            'artifact_filepath', 'artifact_id', 1, sort='descending',
+            fp_type='raw_barcodes')
+        path_builder = partial(
+            join, qdb.util.get_db_files_base_dir(), "raw_data")
+        exp = [{'fp_id': 2,
+                'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
+                'fp_type': "raw_barcodes",
+                'checksum': '2125826711',
+                'fp_size': 58}]
+        self.assertEqual(obs, exp)
+
+        obs = qdb.util.retrieve_filepaths(
+            'artifact_filepath', 'artifact_id', 1, fp_type='raw_barcodes')
+        path_builder = partial(
+            join, qdb.util.get_db_files_base_dir(), "raw_data")
+        exp = [{'fp_id': 2,
+                'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
+                'fp_type': "raw_barcodes",
+                'checksum': '2125826711',
+                'fp_size': 58}]
+        self.assertEqual(obs, exp)
+
+        obs = qdb.util.retrieve_filepaths(
+            'artifact_filepath', 'artifact_id', 1, fp_type='biom')
+        path_builder = partial(
+            join, qdb.util.get_db_files_base_dir(), "raw_data")
+        self.assertEqual(obs, [])
+
+    def test_retrieve_filepaths_error(self):
+        with self.assertRaises(qdb.exceptions.QiitaDBError):
+            qdb.util.retrieve_filepaths('artifact_filepath', 'artifact_id', 1,
+                                        sort='Unknown')
+
+    def test_empty_trash_upload_folder(self):
+        # creating file to delete so we know it actually works
+        study_id = '1'
+        uploads_fp = join(qdb.util.get_mountpoint("uploads")[0][1], study_id)
+        trash = join(uploads_fp, 'trash')
+        if not exists(trash):
+            mkdir(trash)
+        fp = join(trash, 'my_file_to_delete.txt')
+        open(fp, 'w').close()
+
+        self.assertTrue(exists(fp))
+        qdb.util.empty_trash_upload_folder()
+        self.assertFalse(exists(fp))
+
+    def test_move_filepaths_to_upload_folder(self):
+        # we are going to test the move_filepaths_to_upload_folder indirectly
+        # by creating an artifact and deleting it. To accomplish this we need
+        # to create a new prep info file, attach a biom with html_summary and
+        # then delete it. However, we will do this twice to assure that
+        # there are no conflicts with this
+        study_id = 1
+        # creating the 2 sets of files for the 2 artifacts
+        fd, seqs_fp1 = mkstemp(suffix='_seqs.fastq')
+        close(fd)
+
+        html_fp1 = mkdtemp()
+        html_fp1 = join(html_fp1, 'support_files')
+        mkdir(html_fp1)
+        with open(join(html_fp1, 'index.html'), 'w') as fp:
+            fp.write(">AAA\nAAA")
+        fd, seqs_fp2 = mkstemp(suffix='_seqs.fastq')
+        close(fd)
+
+        html_fp2 = mkdtemp()
+        html_fp2 = join(html_fp2, 'support_files')
+        mkdir(html_fp2)
+        with open(join(html_fp2, 'index.html'), 'w') as fp:
+            fp.write(">AAA\nAAA")
+
+        # creating new prep info file
+        metadata_dict = {
+            'SKB8.640193': {'center_name': 'ANL',
+                            'primer': 'GTGCCAGCMGCCGCGGTAA',
+                            'barcode': 'GTCCGCAAGTTA',
+                            'run_prefix': "s_G1_L001_sequences",
+                            'platform': 'Illumina',
+                            'instrument_model': 'Illumina MiSeq',
+                            'library_construction_protocol': 'AAAA',
+                            'experiment_design_description': 'BBBB'}}
+        metadata = pd.DataFrame.from_dict(
+            metadata_dict, orient='index', dtype=str)
+        pt1 = qdb.metadata_template.prep_template.PrepTemplate.create(
+            metadata, qdb.study.Study(study_id), "16S")
+        pt2 = qdb.metadata_template.prep_template.PrepTemplate.create(
+            metadata, qdb.study.Study(study_id), "16S")
+
+        # inserting artifact 1
+        artifact1 = qdb.artifact.Artifact.create(
+            [(seqs_fp1, 1), (html_fp1, 'html_summary')], "FASTQ",
+            prep_template=pt1)
+        filepaths = artifact1.filepaths
+        # inserting artifact 2
+        artifact2 = qdb.artifact.Artifact.create(
+            [(seqs_fp2, 1), (html_fp2, 'html_summary')], "FASTQ",
+            prep_template=pt2)
+        filepaths.extend(artifact2.filepaths)
+
+        # get before delete files in upload folders
+        GUPLOADS = qdb.util.get_files_from_uploads_folders
+        upload_files = set(GUPLOADS("1"))
+
+        # delete artifact 1
+        qdb.artifact.Artifact.delete(artifact1.id)
+
+        # confirm that _only_ the fastq from the file is recovered; this means
+        # that all the extra files/folders were ignored
+        diff_upload = set(GUPLOADS("1")) - set(upload_files)
+        self.assertEqual(len(diff_upload), 1)
+        self.assertEqual(diff_upload.pop()[1], basename(seqs_fp1))
+
+        # finish deleting artifacts :: there should be a new fastq
+        qdb.artifact.Artifact.delete(artifact2.id)
+        diff_upload = set(GUPLOADS("1")) - set(upload_files)
+        self.assertEqual(len(diff_upload), 2)
+        self.assertCountEqual(
+            [x[1] for x in diff_upload],
+            [basename(seqs_fp1), basename(seqs_fp2)])
+
+        # now let's create another artifact with the same filenames that
+        # artifact1 so we can test successfull overlapping of names
+        with open(seqs_fp1, 'w') as fp:
+            fp.write(">AAA\nAAA")
+        mkdir(html_fp1)
+        with open(join(html_fp1, 'index.html'), 'w') as fp:
+            fp.write(">AAA\nAAA")
+        artifact3 = qdb.artifact.Artifact.create(
+            [(seqs_fp1, 1), (html_fp1, 'html_summary')], "FASTQ",
+            prep_template=pt1)
+        filepaths.extend(artifact3.filepaths)
+        qdb.artifact.Artifact.delete(artifact3.id)
+
+        # files should be the same as the previous test
+        diff_upload = set(GUPLOADS("1")) - set(upload_files)
+        self.assertEqual(len(diff_upload), 2)
+        self.assertCountEqual(
+            [x[1] for x in diff_upload],
+            [basename(seqs_fp1), basename(seqs_fp2)])
+
+        bd = qdb.util.get_mountpoint("uploads")[0][1]
+        for x in filepaths:
+            self.files_to_remove.append(join(bd, "1", basename(x['fp'])))
+
+    def test_get_mountpoint(self):
+        exp = [(5, join(qdb.util.get_db_files_base_dir(), 'raw_data'))]
+        obs = qdb.util.get_mountpoint("raw_data")
+        self.assertEqual(obs, exp)
+
+        exp = [(1, join(qdb.util.get_db_files_base_dir(), 'analysis'))]
+        obs = qdb.util.get_mountpoint("analysis")
+        self.assertEqual(obs, exp)
+
+        exp = [(2, join(qdb.util.get_db_files_base_dir(), 'job'))]
+        obs = qdb.util.get_mountpoint("job")
+        self.assertEqual(obs, exp)
+
+        # inserting new ones so we can test that it retrieves these and
+        # doesn't alter other ones
+        qdb.sql_connection.perform_as_transaction(
+            "UPDATE qiita.data_directory SET active=false WHERE "
+            "data_directory_id=1")
+        count = qdb.util.get_count('qiita.data_directory')
+        sql = """INSERT INTO qiita.data_directory (data_type, mountpoint,
+                                                   subdirectory, active)
+                 VALUES ('analysis', 'analysis_tmp', true, true),
+                        ('raw_data', 'raw_data_tmp', true, false)"""
+        qdb.sql_connection.perform_as_transaction(sql)
+
+        # this should have been updated
+        exp = [(count + 1, join(qdb.util.get_db_files_base_dir(),
+                'analysis_tmp'))]
+        obs = qdb.util.get_mountpoint("analysis")
+        self.assertEqual(obs, exp)
+
+        # these 2 shouldn't
+        exp = [(5, join(qdb.util.get_db_files_base_dir(), 'raw_data'))]
+        obs = qdb.util.get_mountpoint("raw_data")
+        self.assertEqual(obs, exp)
+
+        exp = [(2, join(qdb.util.get_db_files_base_dir(), 'job'))]
+        obs = qdb.util.get_mountpoint("job")
+        self.assertEqual(obs, exp)
+
+        # testing multi returns
+        exp = [(5, join(qdb.util.get_db_files_base_dir(), 'raw_data')),
+               (count + 2, join(qdb.util.get_db_files_base_dir(),
+                'raw_data_tmp'))]
+        obs = qdb.util.get_mountpoint("raw_data", retrieve_all=True)
+        self.assertEqual(obs, exp)
+
+        # testing retrieve subdirectory
+        exp = [
+            (5, join(qdb.util.get_db_files_base_dir(), 'raw_data'), False),
+            (count + 2, join(qdb.util.get_db_files_base_dir(), 'raw_data_tmp'),
+             True)]
+        obs = qdb.util.get_mountpoint("raw_data", retrieve_all=True,
+                                      retrieve_subdir=True)
+        self.assertEqual(obs, exp)
+
+    def test_get_mountpoint_path_by_id(self):
+        exp = join(qdb.util.get_db_files_base_dir(), 'raw_data')
+        obs = qdb.util.get_mountpoint_path_by_id(5)
+        self.assertEqual(obs, exp)
+
+        exp = join(qdb.util.get_db_files_base_dir(), 'analysis')
+        obs = qdb.util.get_mountpoint_path_by_id(1)
+        self.assertEqual(obs, exp)
+
+        exp = join(qdb.util.get_db_files_base_dir(), 'job')
+        obs = qdb.util.get_mountpoint_path_by_id(2)
+        self.assertEqual(obs, exp)
+
+        # inserting new ones so we can test that it retrieves these and
+        # doesn't alter other ones
+        qdb.sql_connection.perform_as_transaction(
+            "UPDATE qiita.data_directory SET active=false WHERE "
+            "data_directory_id=1")
+        count = qdb.util.get_count('qiita.data_directory')
+        sql = """INSERT INTO qiita.data_directory (data_type, mountpoint,
+                                                   subdirectory, active)
+                 VALUES ('analysis', 'analysis_tmp', true, true),
+                        ('raw_data', 'raw_data_tmp', true, false)"""
+        qdb.sql_connection.perform_as_transaction(sql)
+
+        # this should have been updated
+        exp = join(qdb.util.get_db_files_base_dir(), 'analysis_tmp')
+        obs = qdb.util.get_mountpoint_path_by_id(count + 1)
+        self.assertEqual(obs, exp)
+
+        # these 2 shouldn't
+        exp = join(qdb.util.get_db_files_base_dir(), 'raw_data')
+        obs = qdb.util.get_mountpoint_path_by_id(5)
+        self.assertEqual(obs, exp)
+
+        exp = join(qdb.util.get_db_files_base_dir(), 'job')
+        obs = qdb.util.get_mountpoint_path_by_id(2)
+        self.assertEqual(obs, exp)
+
+    def test_get_files_from_uploads_folders(self):
+        # something has been uploaded and ignoring hidden files/folders
+        # and folders
+        exp = (7, 'uploaded_file.txt', '0B')
+        obs = qdb.util.get_files_from_uploads_folders("1")
+        self.assertIn(exp, obs)
+
+        # nothing has been uploaded
+        exp = []
+        obs = qdb.util.get_files_from_uploads_folders("2")
+        self.assertEqual(obs, exp)
+
+    def test_move_upload_files_to_trash(self):
+        test_filename = 'this_is_a_test_file.txt'
+
+        # create file to move to trash
+        fid, folder = qdb.util.get_mountpoint("uploads")[0]
+        test_fp = join(folder, '1', test_filename)
+        with open(test_fp, 'w') as f:
+            f.write('test')
+
+        self.files_to_remove.append(test_fp)
+
+        exp = (fid, 'this_is_a_test_file.txt', '4B')
+        obs = qdb.util.get_files_from_uploads_folders("1")
+        self.assertIn(exp, obs)
+
+        # move file
+        qdb.util.move_upload_files_to_trash(1, [(fid, test_filename)])
+        obs = qdb.util.get_files_from_uploads_folders("1")
+        self.assertNotIn(obs, exp)
+
+        # if the file doesn't exist, don't raise any errors
+        qdb.util.move_upload_files_to_trash(1, [(fid, test_filename)])
+
+        # testing errors
+        # - study doesn't exist
+        with self.assertRaises(qdb.exceptions.QiitaDBError):
+            qdb.util.move_upload_files_to_trash(100, [(fid, test_filename)])
+        # - fid doen't exist
+        with self.assertRaises(qdb.exceptions.QiitaDBError):
+            qdb.util.move_upload_files_to_trash(1, [(10, test_filename)])
+
+        # removing trash folder
+        rmtree(join(folder, '1', 'trash'))
+
+    def test_get_environmental_packages(self):
+        obs = qdb.util.get_environmental_packages()
+        exp = [['air', 'ep_air'],
+               ['built environment', 'ep_built_environment'],
+               ['host-associated', 'ep_host_associated'],
+               ['human-amniotic-fluid', 'ep_human_amniotic_fluid'],
+               ['human-associated', 'ep_human_associated'],
+               ['human-blood', 'ep_human_blood'],
+               ['human-gut', 'ep_human_gut'],
+               ['human-oral', 'ep_human_oral'],
+               ['human-skin', 'ep_human_skin'],
+               ['human-urine', 'ep_human_urine'],
+               ['human-vaginal', 'ep_human_vaginal'],
+               ['microbial mat/biofilm', 'ep_microbial_mat_biofilm'],
+               ['miscellaneous natural or artificial environment',
+                'ep_misc_artif'],
+               ['plant-associated', 'ep_plant_associated'],
+               ['sediment', 'ep_sediment'],
+               ['soil', 'ep_soil'],
+               ['wastewater/sludge', 'ep_wastewater_sludge'],
+               ['water', 'ep_water']]
+        self.assertEqual(sorted(obs), sorted(exp))
+
+    def test_get_timeseries_types(self):
+        obs = qdb.util.get_timeseries_types()
+        exp = [[1, 'None', 'None'],
+               [2, 'real', 'single intervention'],
+               [3, 'real', 'multiple intervention'],
+               [4, 'real', 'combo intervention'],
+               [5, 'pseudo', 'single intervention'],
+               [6, 'pseudo', 'multiple intervention'],
+               [7, 'pseudo', 'combo intervention'],
+               [8, 'mixed', 'single intervention'],
+               [9, 'mixed', 'multiple intervention'],
+               [10, 'mixed', 'combo intervention']]
+        self.assertEqual(obs, exp)
+
+    def test_get_filepath_information(self):
+        obs = qdb.util.get_filepath_information(1)
+        # This path is machine specific. Just checking that is not empty
+        self.assertIsNotNone(obs.pop('fullpath'))
+        exp = {'filepath_id': 1, 'filepath': '1_s_G1_L001_sequences.fastq.gz',
+               'filepath_type': 'raw_forward_seqs', 'checksum': '2125826711',
+               'data_type': 'raw_data', 'mountpoint': 'raw_data',
+               'subdirectory': False, 'active': True}
+        self.assertEqual(obs, exp)
+
+    def test_filepath_id_to_rel_path(self):
+        obs = qdb.util.filepath_id_to_rel_path(1)
+        exp = 'raw_data/1_s_G1_L001_sequences.fastq.gz'
+        self.assertEqual(obs, exp)
+
+        obs = qdb.util.filepath_id_to_rel_path(3)
+        exp = 'preprocessed_data/1_seqs.fna'
+        self.assertEqual(obs, exp)
+
+        fd, fp = mkstemp()
+        close(fd)
+        with open(fp, 'w') as f:
+            f.write('\n')
+        self.files_to_remove.append(fp)
+        test = qdb.util.insert_filepaths(
+            [(fp, "raw_forward_seqs")], 2, "FASTQ")[0]
+        sql = """INSERT INTO qiita.artifact_filepath
+                        (artifact_id, filepath_id)
+                    VALUES (%s, %s)"""
+        qdb.sql_connection.perform_as_transaction(sql, [2, test])
+
+        obs = qdb.util.filepath_id_to_rel_path(test)
+        exp = 'FASTQ/2/%s' % basename(fp)
+        self.assertEqual(obs, exp)
+
+    def test_filepath_ids_to_rel_paths(self):
+        fd, fp = mkstemp()
+        close(fd)
+        with open(fp, 'w') as f:
+            f.write('\n')
+        self.files_to_remove.append(fp)
+        test = qdb.util.insert_filepaths(
+            [(fp, "raw_forward_seqs")], 2, "FASTQ")[0]
+        sql = """INSERT INTO qiita.artifact_filepath
+                        (artifact_id, filepath_id)
+                    VALUES (%s, %s)"""
+        qdb.sql_connection.perform_as_transaction(sql, [2, test])
+
+        obs = qdb.util.filepath_ids_to_rel_paths([1, 3, test])
+        exp = {1: 'raw_data/1_s_G1_L001_sequences.fastq.gz',
+               3: 'preprocessed_data/1_seqs.fna',
+               test: 'FASTQ/2/%s' % basename(fp)}
+
+        self.assertEqual(obs, exp)
+
+    def test_add_message(self):
+        count = qdb.util.get_count('qiita.message') + 1
+        user = qdb.user.User.create('new@test.bar', 'password')
+        users = [user]
+        qdb.util.add_message("TEST MESSAGE", users)
+
+        obs = [[x[0], x[1]] for x in user.messages()]
+        exp = [[count, 'TEST MESSAGE']]
+        self.assertEqual(obs, exp)
+
+    def test_add_system_message(self):
+        count = qdb.util.get_count('qiita.message') + 1
+        qdb.util.add_system_message("SYS MESSAGE",
+                                    datetime(2015, 8, 5, 19, 41))
+
+        obs = [[x[0], x[1]]
+               for x in qdb.user.User('shared@foo.bar').messages()]
+        exp = [[count, 'SYS MESSAGE'], [1, 'message 1']]
+        self.assertEqual(obs, exp)
+        obs = [[x[0], x[1]] for x in qdb.user.User('admin@foo.bar').messages()]
+        exp = [[count, 'SYS MESSAGE']]
+        self.assertEqual(obs, exp)
+
+        sql = "SELECT expiration from qiita.message WHERE message_id = %s"
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add(sql, [count])
+            obs = qdb.sql_connection.TRN.execute_fetchindex()
+        exp = [[datetime(2015, 8, 5, 19, 41)]]
+        self.assertEqual(obs, exp)
+
+    def test_clear_system_messages(self):
+        message_id = qdb.util.get_count('qiita.message') + 1
+        user = qdb.user.User.create('csm@test.bar', 'password')
+        obs = [[x[0], x[1]] for x in user.messages()]
+        exp = []
+        self.assertEqual(obs, exp)
+
+        qdb.util.add_system_message("SYS MESSAGE",
+                                    datetime(2015, 8, 5, 19, 41))
+        obs = [[x[0], x[1]] for x in user.messages()]
+        exp = [[message_id, 'SYS MESSAGE']]
+        self.assertCountEqual(obs, exp)
+
+        qdb.util.clear_system_messages()
+        obs = [[x[0], x[1]] for x in user.messages()]
+        exp = []
+        self.assertEqual(obs, exp)
+
+        # Run again with no system messages to make sure no errors
+        qdb.util.clear_system_messages()
+
+    def test_supported_filepath_types(self):
+        obs = qdb.util.supported_filepath_types("FASTQ")
+        exp = [["raw_forward_seqs", True], ["raw_reverse_seqs", False],
+               ["raw_barcodes", True]]
+        self.assertCountEqual(obs, exp)
+
+        obs = qdb.util.supported_filepath_types("BIOM")
+        exp = [["biom", True], ["directory", False], ["log", False]]
+        self.assertCountEqual(obs, exp)
+
+    def test_generate_analysis_list(self):
+        self.assertEqual(qdb.util.generate_analysis_list([]), [])
+
+        obs = qdb.util.generate_analysis_list([1, 2, 3, 5])
+        exp = [{'mapping_files': [
+                (16, qdb.util.get_filepath_information(16)['fullpath'])],
+                'description': 'A test analysis', 'artifacts': [8, 9], 'name':
+                'SomeAnalysis', 'owner': 'test@foo.bar', 'analysis_id': 1,
+                'visibility': 'private'},
+               {'mapping_files': [], 'description': 'Another test analysis',
+                'artifacts': [], 'name': 'SomeSecondAnalysis',
+                'owner': 'admin@foo.bar',
+                'analysis_id': 2, 'visibility': 'private'}]
+        # removing timestamp for testing
+        for i in range(len(obs)):
+            del obs[i]['timestamp']
+        self.assertEqual(obs, exp)
+
+        self.assertEqual(
+            qdb.util.generate_analysis_list([1, 2, 3, 5], True), [])
+
+
+@qiita_test_checker()
+class UtilTests(TestCase):
+    """Tests for the util functions that do not need to access the DB"""
+
+    def setUp(self):
+        fh, self.filepath = mkstemp()
+        close(fh)
+        with open(self.filepath, "w") as f:
+            f.write("Some text so we can actually compute a checksum")
+
+    def test_compute_checksum(self):
+        """Correctly returns the file checksum"""
+        obs = qdb.util.compute_checksum(self.filepath)
+        exp = 1719580229
+        self.assertEqual(obs, exp)
+
+    def test_scrub_data_nothing(self):
+        """Returns the same string without changes"""
+        self.assertEqual(qdb.util.scrub_data("nothing_changes"),
+                         "nothing_changes")
+
+    def test_scrub_data_semicolon(self):
+        """Correctly removes the semicolon from the string"""
+        self.assertEqual(qdb.util.scrub_data("remove_;_char"), "remove__char")
+
+    def test_scrub_data_single_quote(self):
+        """Correctly removes single quotes from the string"""
+        self.assertEqual(qdb.util.scrub_data("'quotes'"), "quotes")
+
+    def test_get_visibilities(self):
+        obs = qdb.util.get_visibilities()
+        exp = ['awaiting_approval', 'sandbox', 'private', 'public', 'archived']
+        self.assertEqual(obs, exp)
+
+    def test_infer_status(self):
+        obs = qdb.util.infer_status([])
+        self.assertEqual(obs, 'sandbox')
+
+        obs = qdb.util.infer_status([['private']])
+        self.assertEqual(obs, 'private')
+
+        obs = qdb.util.infer_status([['private'], ['public']])
+        self.assertEqual(obs, 'public')
+
+        obs = qdb.util.infer_status([['sandbox'], ['awaiting_approval']])
+        self.assertEqual(obs, 'awaiting_approval')
+
+        obs = qdb.util.infer_status([['sandbox'], ['sandbox']])
+        self.assertEqual(obs, 'sandbox')
+
+    def test_get_pubmed_ids_from_dois(self):
+        exp = {'10.100/123456': '123456'}
+        obs = qdb.util.get_pubmed_ids_from_dois(['', '10.100/123456'])
+        self.assertEqual(obs, exp)
+
+    def test_generate_study_list(self):
+        USER = qdb.user.User
+        STUDY = qdb.study.Study
+        PREP = qdb.metadata_template.prep_template.PrepTemplate
+        UTIL = qdb.util
+
+        # testing owner email as name
+        user = USER('test@foo.bar')
+        username = user.info['name']
+        # test without changes
+        self.assertDictEqual(
+            STUDY_INFO, UTIL.generate_study_list(user, 'user')[0])
+        # change user's name to None and tests again
+        user.info = {'name': None}
+        exp = STUDY_INFO.copy()
+        exp['owner'] = 'test@foo.bar'
+        self.assertDictEqual(
+            exp, qdb.util.generate_study_list(user, 'user')[0])
+
+        # returning original name
+        user.info = {'name': username}
+
+        # creating a new study to make sure that empty studies are also
+        # returned
+        info = {"timeseries_type_id": 1, "metadata_complete": True,
+                "mixs_compliant": True, "study_alias": "TST",
+                "study_description": "Some description of the study goes here",
+                "study_abstract": "Some abstract goes here",
+                "principal_investigator_id": qdb.study.StudyPerson(1),
+                "lab_person_id": qdb.study.StudyPerson(1)}
+        new_study = STUDY.create(
+            USER('shared@foo.bar'), 'test_study_1', info=info)
+
+        snew_info = {
+            'study_title': 'test_study_1',
+            'metadata_complete': True, 'publication_pid': [],
+            'artifact_biom_ids': [], 'autoloaded': False,
+            'study_id': new_study.id, 'ebi_study_accession': None,
+            'owner': 'Shared', 'shared': [],
+            'study_abstract': 'Some abstract goes here',
+            'pi': ('lab_dude@foo.bar', 'LabDude'), 'publication_doi': [],
+            'study_alias': 'TST', 'study_tags': None,
+            'preparation_data_types': [], 'number_samples_collected': 0}
+        exp1 = [STUDY_INFO]
+        exp2 = [snew_info]
+        exp_both = [STUDY_INFO, snew_info]
+
+        # let's make sure that everything is private for study 1
+        for a in STUDY(1).artifacts():
+            a.visibility = 'private'
+
+        # owner of study
+        obs = UTIL.generate_study_list(USER('test@foo.bar'), 'user')
+        self.assertEqual(len(obs), 1)
+        self.assertDictEqual(obs[0], exp1[0])
+        # shared with
+        obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'user')
+        self.assertEqual(len(obs), 2)
+        self.assertDictEqual(obs[0], exp_both[0])
+        self.assertDictEqual(obs[1], exp_both[1])
+        # admin
+        obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'user')
+        self.assertEqual(obs, exp_both)
+        # no access/hidden
+        obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'user')
+        self.assertEqual(obs, [])
+        # public - none for everyone
+        obs = UTIL.generate_study_list(USER('test@foo.bar'), 'public')
+        self.assertEqual(obs, [])
+        obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'public')
+        self.assertEqual(obs, [])
+        obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'public')
+        self.assertEqual(obs, [])
+        obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'public')
+        self.assertEqual(obs, [])
+
+        def _avoid_duplicated_tests(all_artifacts=False):
+            # nothing should shange for owner, shared
+            obs = UTIL.generate_study_list(USER('test@foo.bar'), 'user')
+            self.assertEqual(obs, exp1)
+            obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'user')
+            self.assertEqual(obs, exp_both)
+            # for admin it should be shown in public and user cause there are
+            # 2 preps and only one is public
+            obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'user')
+            if not all_artifacts:
+                self.assertEqual(obs, exp_both)
+            else:
+                self.assertEqual(obs, exp2)
+            obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'user')
+            self.assertEqual(obs, [])
+            # for the public query, everything should be same for owner, share
+            # and admin but demo should now see it as public but with limited
+            # artifacts
+            obs = UTIL.generate_study_list(USER('test@foo.bar'), 'public')
+            self.assertEqual(obs, [])
+            obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'public')
+            self.assertEqual(obs, [])
+            obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'public')
+            if not all_artifacts:
+                exp1[0]['artifact_biom_ids'] = [7]
+            self.assertEqual(obs, exp1)
+            obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'public')
+            self.assertEqual(obs, exp1)
+
+            # returning artifacts
+            exp1[0]['artifact_biom_ids'] = [4, 5, 6, 7]
+
+        # make artifacts of prep 2 public
+        PREP(2).artifact.visibility = 'public'
+        _avoid_duplicated_tests()
+
+        # make artifacts of prep 1 awaiting_approval
+        PREP(1).artifact.visibility = 'awaiting_approval'
+        _avoid_duplicated_tests()
+
+        # making all studies public
+        PREP(1).artifact.visibility = 'public'
+        _avoid_duplicated_tests(True)
+
+        # deleting the new study study and returning artifact status
+        qdb.study.Study.delete(new_study.id)
+        PREP(1).artifact.visibility = 'private'
+        PREP(2).artifact.visibility = 'private'
+
+    def test_generate_study_list_errors(self):
+        with self.assertRaises(ValueError):
+            qdb.util.generate_study_list(qdb.user.User('test@foo.bar'), 'bad')
+
+    def test_generate_study_list_without_artifacts(self):
+        # creating a new study to make sure that empty studies are also
+        # returned
+        info = {"timeseries_type_id": 1, "metadata_complete": True,
+                "mixs_compliant": True, "study_alias": "TST",
+                "study_description": "Some description of the study goes here",
+                "study_abstract": "Some abstract goes here",
+                "principal_investigator_id": qdb.study.StudyPerson(1),
+                "lab_person_id": qdb.study.StudyPerson(1)}
+        new_study = qdb.study.Study.create(
+            qdb.user.User('shared@foo.bar'), 'test_study_1', info=info)
+
+        exp_info = [
+            {'study_title': (
+                'Identification of the Microbiomes for Cannabis Soils'),
+             'metadata_complete': True, 'publication_pid': [
+                '123456', '7891011'],
+             'study_id': 1, 'ebi_study_accession': 'EBI123456-BB',
+             'autoloaded': False,
+             'study_abstract': (
+                'This is a preliminary study to examine the microbiota '
+                'associated with the Cannabis plant. Soils samples from '
+                'the bulk soil, soil associated with the roots, and the '
+                'rhizosphere were extracted and the DNA sequenced. Roots '
+                'from three independent plants of different strains were '
+                'examined. These roots were obtained November 11, 2011 from '
+                'plants that had been harvested in the summer. Future studies '
+                'will attempt to analyze the soils and rhizospheres from the '
+                'same location at different time points in the plant '
+                'lifecycle.'), 'pi': ('PI_dude@foo.bar', 'PIDude'),
+             'publication_doi': ['10.100/123456', '10.100/7891011'],
+             'study_alias': 'Cannabis Soils', 'number_samples_collected': 27},
+            {'study_title': 'test_study_1',
+             'metadata_complete': True, 'publication_pid': [],
+             'autoloaded': False,
+             'study_id': new_study.id, 'ebi_study_accession': None,
+             'study_abstract': 'Some abstract goes here',
+             'pi': ('lab_dude@foo.bar', 'LabDude'), 'publication_doi': [],
+             'study_alias': 'TST', 'number_samples_collected': 0}]
+        obs_info = qdb.util.generate_study_list_without_artifacts([1, 2, 3, 4])
+        self.assertEqual(obs_info, exp_info)
+
+        obs_info = qdb.util.generate_study_list_without_artifacts(
+            [1, 2, 3, 4], 'EMP')
+        self.assertEqual(obs_info, [])
+
+        # deleting the old study
+        qdb.study.Study.delete(new_study.id)
+
+    def test_get_artifacts_information(self):
+        # we are going to test that it ignores 1 and 2 cause they are not biom,
+        # 4 has all information and 7 and 8 don't
+        obs = qdb.util.get_artifacts_information([1, 2, 4, 6, 7, 8])
+        # not testing timestamp
+        for i in range(len(obs)):
+            del obs[i]['timestamp']
+
+        exp = [
+            {'artifact_id': 6, 'target_subfragment': ['V4'],
+             'prep_samples': 27, 'platform': 'Illumina',
+             'target_gene': '16S rRNA', 'name': 'BIOM', 'data_type': '16S',
+             'parameters': {'reference': '2', 'similarity': '0.97',
+                            'sortmerna_e_value': '1',
+                            'sortmerna_max_pos': '10000', 'threads': '1',
+                            'sortmerna_coverage': '0.97'},
+             'algorithm': 'Pick closed-reference OTUs | Split libraries FASTQ',
+             'algorithm_az': 'd480799a0a7a2fbe0e9022bc9c602018',
+             'deprecated': False, 'active': True,
+             'files': ['1_study_1001_closed_reference_otu_table_Silva.biom']},
+            {'artifact_id': 4, 'target_subfragment': ['V4'],
+             'prep_samples': 27, 'platform': 'Illumina',
+             'target_gene': '16S rRNA', 'name': 'BIOM', 'data_type': '18S',
+             'parameters': {'reference': '1', 'similarity': '0.97',
+                            'sortmerna_e_value': '1',
+                            'sortmerna_max_pos': '10000', 'threads': '1',
+                            'sortmerna_coverage': '0.97'},
+             'algorithm': 'Pick closed-reference OTUs | Split libraries FASTQ',
+             'algorithm_az': 'd480799a0a7a2fbe0e9022bc9c602018',
+             'deprecated': False, 'active': True,
+             'files': ['1_study_1001_closed_reference_otu_table.biom']},
+            {'artifact_id': 7, 'target_subfragment': ['V4'],
+             'prep_samples': 27, 'platform': 'Illumina',
+             'target_gene': '16S rRNA', 'name': 'BIOM', 'data_type': '16S',
+             'parameters': {}, 'algorithm': '', 'algorithm_az': '',
+             'deprecated': False, 'active': True,
+             'files': ['biom_table.biom']},
+            {'artifact_id': 8, 'target_subfragment': [], 'prep_samples': 0,
+             'platform': 'not provided', 'target_gene': 'not provided', 'name':
+             'noname', 'data_type': '18S', 'parameters': {}, 'algorithm': '',
+             'algorithm_az': '', 'deprecated': False, 'active': True,
+             'files': ['biom_table.biom']}]
+        self.assertCountEqual(obs, exp)
+        exp = exp[1:]
+
+        # now let's test that the order given by the commands actually give the
+        # correct results
+        with qdb.sql_connection.TRN:
+            # setting up database changes for just checking commands
+            qdb.sql_connection.TRN.add(
+                """UPDATE qiita.command_parameter SET check_biom_merge = True
+                   WHERE parameter_name = 'reference'""")
+            qdb.sql_connection.TRN.execute()
+
+            # testing that it works as expected
+            obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
+            # not testing timestamp
+            for i in range(len(obs)):
+                del obs[i]['timestamp']
+            exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1) '
+                                   '| Split libraries FASTQ')
+            exp[0]['algorithm_az'] = '33fed1b35728417d7ba4139b8f817d44'
+            self.assertCountEqual(obs, exp)
+
+            # setting up database changes for also command output
+            qdb.sql_connection.TRN.add(
+                "UPDATE qiita.command_output SET check_biom_merge = True")
+            qdb.sql_connection.TRN.execute()
+            obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
+            # not testing timestamp
+            for i in range(len(obs)):
+                del obs[i]['timestamp']
+            exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1, '
+                                   'BIOM: 1_study_1001_closed_reference_'
+                                   'otu_table.biom) | Split libraries FASTQ')
+            exp[0]['algorithm_az'] = 'de5b794a2cacd428f36fea86df196bfd'
+            self.assertCountEqual(obs, exp)
+
+            # let's test that we ignore the parent_info
+            qdb.sql_connection.TRN.add("""UPDATE qiita.software_command
+                                          SET ignore_parent_command = True""")
+            qdb.sql_connection.TRN.execute()
+            obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
+            # not testing timestamp
+            for i in range(len(obs)):
+                del obs[i]['timestamp']
+            exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1, '
+                                   'BIOM: 1_study_1001_closed_reference_'
+                                   'otu_table.biom)')
+            exp[0]['algorithm_az'] = '7f59a45b2f0d30cd1ed1929391c26e07'
+            self.assertCountEqual(obs, exp)
+
+            # let's test that we ignore the parent_info
+            qdb.sql_connection.TRN.add("""UPDATE qiita.software_command
+                                          SET ignore_parent_command = True""")
+            qdb.sql_connection.TRN.execute()
+            obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
+            # not testing timestamp
+            for i in range(len(obs)):
+                del obs[i]['timestamp']
+            exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1, '
+                                   'BIOM: 1_study_1001_closed_reference_'
+                                   'otu_table.biom)')
+            exp[0]['algorithm_az'] = '7f59a45b2f0d30cd1ed1929391c26e07'
+            self.assertCountEqual(obs, exp)
+
+            # returning database as it was
+            qdb.sql_connection.TRN.add(
+                "UPDATE qiita.command_output SET check_biom_merge = False")
+            qdb.sql_connection.TRN.add("""UPDATE qiita.software_command
+                                          SET ignore_parent_command = False""")
+            qdb.sql_connection.TRN.add(
+                """UPDATE qiita.command_parameter SET check_biom_merge = False
+                   WHERE parameter_name = 'reference'""")
+            qdb.sql_connection.TRN.execute()
+
+
+class TestFilePathOpening(TestCase):
+    """Tests adapted from scikit-bio's skbio.io.util tests"""
+    def test_is_string_or_bytes(self):
+        self.assertTrue(qdb.util._is_string_or_bytes('foo'))
+        self.assertTrue(qdb.util._is_string_or_bytes(u'foo'))
+        self.assertTrue(qdb.util._is_string_or_bytes(b'foo'))
+        self.assertFalse(qdb.util._is_string_or_bytes(StringIO('bar')))
+        self.assertFalse(qdb.util._is_string_or_bytes([1]))
+
+    def test_file_closed(self):
+        """File gets closed in decorator"""
+        f = NamedTemporaryFile('r')
+        filepath = f.name
+        with qdb.util.open_file(filepath) as fh:
+            pass
+        self.assertTrue(fh.closed)
+
+    def test_file_closed_harder(self):
+        """File gets closed in decorator, even if exceptions happen."""
+        f = NamedTemporaryFile('r')
+        filepath = f.name
+        try:
+            with qdb.util.open_file(filepath) as fh:
+                raise TypeError
+        except TypeError:
+            self.assertTrue(fh.closed)
+        else:
+            # If we're here, no exceptions have been raised inside the
+            # try clause, so the context manager swallowed them. No
+            # good.
+            raise Exception("`open_file` didn't propagate exceptions")
+
+    def test_filehandle(self):
+        """Filehandles slip through untouched"""
+        with TemporaryFile('r') as fh:
+            with qdb.util.open_file(fh) as ffh:
+                self.assertTrue(fh is ffh)
+            # And it doesn't close the file-handle
+            self.assertFalse(fh.closed)
+
+    def test_StringIO(self):
+        """StringIO (useful e.g. for testing) slips through."""
+        f = StringIO("File contents")
+        with qdb.util.open_file(f) as fh:
+            self.assertTrue(fh is f)
+
+    def test_BytesIO(self):
+        """BytesIO (useful e.g. for testing) slips through."""
+        f = BytesIO(b"File contents")
+        with qdb.util.open_file(f) as fh:
+            self.assertTrue(fh is f)
+
+    def test_hdf5IO(self):
+        """This tests that if we send a file handler it returns it"""
+        f = h5py.File('test', driver='core', backing_store=False, mode='w')
+        with qdb.util.open_file(f) as fh:
+            self.assertTrue(fh is f)
+
+    def test_hdf5IO_open(self):
+        with NamedTemporaryFile(delete=False) as fh:
+            name = fh.name
+            fh.close()
+
+            h5file = h5py.File(name, 'w')
+            h5file.close()
+
+            with qdb.util.open_file(name) as fh_inner:
+                self.assertTrue(isinstance(fh_inner, h5py.File))
+
+        remove(name)
+
+
+class PurgeFilepathsTests(DBUtilTestsBase):
+
+    def _get_current_filepaths(self):
+        sql_fp = "SELECT filepath_id FROM qiita.filepath"
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add(sql_fp)
+            results = qdb.sql_connection.TRN.execute_fetchflatten()
+        return [qdb.util.get_filepath_information(_id)['fullpath']
+                for _id in results]
+
+    def _create_files(self, files):
+        # format is: [mp_id, fp_type_id, file_name]
+        sql = """INSERT INTO qiita.filepath (
+                    data_directory_id, filepath_type_id, filepath, checksum,
+                    checksum_algorithm_id)
+                 VALUES (%s, %s, %s, '852952723', 1) RETURNING filepath_id"""
+        with qdb.sql_connection.TRN:
+            for f in files:
+                qdb.sql_connection.TRN.add(sql, tuple(f))
+                fid = qdb.sql_connection.TRN.execute_fetchflatten()[0]
+                qdb.util.get_filepath_information(fid)
+
+    def test_purge_filepaths_test(self):
+        # Get all the filepaths so we can test if they've been removed or not
+        fps_expected = self._get_current_filepaths()
+        # Make sure that the files exist - specially for travis
+        for fp in fps_expected:
+            if not exists(fp):
+                with open(fp, 'w') as f:
+                    f.write('\n')
+                self.files_to_remove.append(fp)
+
+        # nothing shold be removed
+        qdb.util.purge_filepaths()
+        fps_viewed = self._get_current_filepaths()
+        self.assertCountEqual(fps_expected, fps_viewed)
+
+        # testing study filepath delete by inserting a new study sample info
+        # and make sure it gets deleted
+        mp_id, mp = qdb.util.get_mountpoint('templates')[0]
+        txt_id = qdb.util.convert_to_id('sample_template', "filepath_type")
+        self._create_files([[mp_id, txt_id, '100_filepath.txt']])
+        qdb.util.purge_filepaths()
+        fps_viewed = self._get_current_filepaths()
+        self.assertCountEqual(fps_expected, fps_viewed)
+
+        # testing artifact [A], creating a folder with an artifact that
+        # doesn't exist
+        _, mp = qdb.util.get_mountpoint('per_sample_FASTQ')[0]
+        not_an_artifact_fp = join(mp, '10000')
+        mkdir(not_an_artifact_fp)
+        # now let's add test for [B] by creating 2 filepaths without a
+        # link to the artifacts tables
+        mp_id, mp = qdb.util.get_mountpoint('BIOM')[0]
+        biom_id = qdb.util.convert_to_id('biom', "filepath_type")
+        self._create_files([
+            [mp_id, txt_id, 'artifact_filepath.txt'],
+            [mp_id, biom_id, 'my_biom.biom']
+        ])
+        # adding files to tests
+        qdb.util.purge_filepaths()
+        fps_viewed = self._get_current_filepaths()
+        self.assertCountEqual(fps_expected, fps_viewed)
+        self.assertFalse(exists(not_an_artifact_fp))
+
+        # testing analysis filepath delete by filepaths for 2 different files
+        # and making sure they get deleted
+        mp_id, mp = qdb.util.get_mountpoint('analysis')[0]
+        biom_id = qdb.util.convert_to_id('biom', "filepath_type")
+        self._create_files([
+            [mp_id, txt_id, '10000_my_analysis_map.txt'],
+            [mp_id, biom_id, '10000_my_analysis_biom.biom']
+        ])
+        qdb.util.purge_filepaths()
+        fps_viewed = self._get_current_filepaths()
+        self.assertCountEqual(fps_expected, fps_viewed)
+
+    def test_quick_mounts_purge(self):
+        # one of the tests creates a conflicting artifact_type so this test
+        # will always raise this ValueError
+        with self.assertRaises(ValueError):
+            qdb.util.quick_mounts_purge()
+
+
+class ResourceAllocationPlotTests(TestCase):
+    def setUp(self):
+        self.cname = "Split libraries FASTQ"
+        self.sname = "QIIMEq2"
+        self.version = "1.9.1"
+        self.col_name = 'samples * columns'
+        self.columns = [
+                "sName", "sVersion", "cID", "cName", "processing_job_id",
+                "parameters", "samples", "columns", "input_size", "extra_info",
+                "MaxRSSRaw", "ElapsedRaw", "Start", "node_name", "node_model"]
+
+        # df is a dataframe that represents a table with columns specified in
+        # self.columns
+        self.df = qdb.util.retrieve_resource_data(
+                self.cname, self.sname, self.version, self.columns)
+
+    def test_plot_return(self):
+        # check the plot returns correct objects
+        fig1, axs1 = qdb.util.resource_allocation_plot(self.df, self.col_name)
+        self.assertIsInstance(
+            fig1, Figure,
+            "Returned object fig1 is not a Matplotlib Figure")
+        for ax in axs1:
+            self.assertIsInstance(
+                ax, Axes,
+                "Returned object axs1 is not a single Matplotlib Axes object")
+
+    def test_minimize_const(self):
+        self.df = self.df[
+            (self.df.cName == self.cname) & (self.df.sName == self.sname)]
+        self.df.dropna(subset=['samples', 'columns'], inplace=True)
+        self.df[self.col_name] = self.df.samples * self.df['columns']
+        fig, axs = plt.subplots(ncols=2, figsize=(10, 4), sharey=False)
+
+        mem_models, time_models = qdb.util.retrieve_equations()
+        bm_name, bm, options = qdb.util._resource_allocation_plot_helper(
+            self.df, axs[0], 'MaxRSSRaw', mem_models, self.col_name)
+        # check that the algorithm chooses correct model for MaxRSSRaw and
+        # has 0 failures
+        k, a, b = options.x
+        failures_df = qdb.util._resource_allocation_success_failures(
+            self.df, k, a, b, bm, self.col_name, 'MaxRSSRaw')[-1]
+        failures = failures_df.shape[0]
+
+        self.assertEqual(bm_name, 'mem_model4',
+                         msg=f"""Best memory model
+                         doesn't match
+                         {bm_name} != 'mem_model4'""")
+        self.assertEqual(bm, mem_models['mem_model4']['equation'],
+                         msg=f"""Best memory model
+                                 doesn't match
+                                 Coefficients:{k} {a} {b}
+                            """)
+        self.assertEqual(failures, 0, "Number of failures must be 0")
+
+        # check that the algorithm chooses correct model for ElapsedRaw and
+        # has 1 failure
+        bm_name, bm, options = qdb.util._resource_allocation_plot_helper(
+            self.df, axs[1], 'ElapsedRaw', time_models, self.col_name)
+        k, a, b = options.x
+        failures_df = qdb.util._resource_allocation_success_failures(
+            self.df, k, a, b, bm, self.col_name, 'ElapsedRaw')[-1]
+        failures = failures_df.shape[0]
+        self.assertEqual(bm_name, 'time_model4',
+                         msg=f"""Best time model
+                         doesn't match
+                         {bm_name} != 'time_model4'""")
+
+        self.assertEqual(bm, time_models[bm_name]['equation'],
+                         msg=f"""Best time model
+                                doesn't match
+                                Coefficients:{k} {a} {b}
+                                """)
+        self.assertEqual(failures, 0, "Number of failures must be 0")
+
+    def test_MaxRSS_helper(self):
+        tests = [
+            ('6', 6.0),
+            ('6K', 6000),
+            ('6M', 6000000),
+            ('6G', 6000000000),
+            ('6.9', 6.9),
+            ('6.9K', 6900),
+            ('6.9M', 6900000),
+            ('6.9G', 6900000000),
+        ]
+        for x, y in tests:
+            self.assertEqual(qdb.util.MaxRSS_helper(x), y)
+
+    def test_db_update(self):
+        path_to_data = './qiita_db/test/test_data/slurm_data.txt.gz'
+        test_data = pd.read_csv(path_to_data, sep="|")
+        types = {
+            'Split libraries FASTQ': [
+                '6d368e16-2242-4cf8-87b4-a5dc40bb890b',
+                '4c7115e8-4c8e-424c-bf25-96c292ca1931',
+                'b72369f9-a886-4193-8d3d-f7b504168e75',
+                '46b76f74-e100-47aa-9bf2-c0208bcea52d',
+                '6ad4d590-4fa3-44d3-9a8f-ddbb472b1b5f'],
+            'Pick closed-reference OTUs': [
+                '3c9991ab-6c14-4368-a48c-841e8837a79c',
+                '80bf25f3-5f1d-4e10-9369-315e4244f6d5',
+                '9ba5ae7a-41e1-4202-b396-0259aeaac366',
+                'e5609746-a985-41a1-babf-6b3ebe9eb5a9',
+            ],
+            'Single Rarefaction': [
+                '8a7a8461-e8a1-4b4e-a428-1bc2f4d3ebd0'
+            ]
+        }
+
+        qdb.util.update_resource_allocation_table(test=test_data)
+
+        for curr_cname, ids in types.items():
+            updated_df = qdb.util.retrieve_resource_data(
+                    curr_cname, self.sname, self.version, self.columns)
+            updated_ids_set = set(updated_df['processing_job_id'])
+            previous_ids_set = set(self.df['processing_job_id'])
+            for id in ids:
+                self.assertTrue(id in updated_ids_set)
+                self.assertFalse(id in previous_ids_set)
+
+
+STUDY_INFO = {
+    'study_id': 1,
+    'owner': 'Dude',
+    'study_alias': 'Cannabis Soils',
+    'study_abstract':
+        'This is a preliminary study to examine the microbiota '
+        'associated with the Cannabis plant. Soils samples '
+        'from the bulk soil, soil associated with the roots, '
+        'and the rhizosphere were extracted and the DNA '
+        'sequenced. Roots from three independent plants of '
+        'different strains were examined. These roots were '
+        'obtained November 11, 2011 from plants that had been '
+        'harvested in the summer. Future studies will attempt '
+        'to analyze the soils and rhizospheres from the same '
+        'location at different time points in the plant '
+        'lifecycle.',
+    'metadata_complete': True,
+    'autoloaded': False,
+    'ebi_study_accession': 'EBI123456-BB',
+    'study_title':
+        'Identification of the Microbiomes for Cannabis Soils',
+    'number_samples_collected': 27,
+    'shared': [('shared@foo.bar', 'Shared')],
+    'publication_doi': ['10.100/123456', '10.100/7891011'],
+    'publication_pid': ['123456', '7891011'],
+    'pi': ('PI_dude@foo.bar', 'PIDude'),
+    'artifact_biom_ids': [4, 5, 6, 7],
+    'preparation_data_types': ['18S'],
+    'study_tags': None,
+}
+
+
+if __name__ == '__main__':
+    main()