# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from unittest import TestCase, main
from tempfile import mkstemp, mkdtemp, NamedTemporaryFile, TemporaryFile
from os import close, remove, mkdir
from os.path import join, exists, basename
from shutil import rmtree
from datetime import datetime
from functools import partial
from string import punctuation
import h5py
from six import StringIO, BytesIO
import pandas as pd
from qiita_core.util import qiita_test_checker
import qiita_db as qdb
from matplotlib.figure import Figure
from matplotlib.axes import Axes
import matplotlib.pyplot as plt
@qiita_test_checker()
class DBUtilTestsBase(TestCase):
def setUp(self):
self.table = 'study'
self.required = [
'study_title', 'mixs_compliant',
'metadata_complete', 'study_description', 'first_contact',
'reprocess', 'timeseries_type_id', 'study_alias',
'study_abstract', 'principal_investigator_id', 'email']
self.files_to_remove = []
def tearDown(self):
for fp in self.files_to_remove:
if exists(fp):
remove(fp)
class DBUtilTests(DBUtilTestsBase):
def test_max_preparation_samples(self):
"""Test that we get the correct max_preparation_samples"""
obs = qdb.util.max_preparation_samples()
self.assertEqual(obs, 800)
def test_max_artifacts_in_workflow(self):
"""Test that we get the correct max_artifacts_in_workflow"""
obs = qdb.util.max_artifacts_in_workflow()
self.assertEqual(obs, 35)
def test_filepath_id_to_object_id(self):
# filepaths 1, 2 belongs to artifact 1
self.assertEqual(qdb.util.filepath_id_to_object_id(1), 1)
self.assertEqual(qdb.util.filepath_id_to_object_id(2), 1)
# filepaths 3, 4 belongs to artifact 2
self.assertEqual(qdb.util.filepath_id_to_object_id(3), 2)
self.assertEqual(qdb.util.filepath_id_to_object_id(4), 2)
# filepaths 9 belongs to artifact 4
self.assertEqual(qdb.util.filepath_id_to_object_id(9), 4)
# filepath 16 belongs to anlaysis 1
self.assertEqual(qdb.util.filepath_id_to_object_id(16), 1)
# filepath 18 belongs to study 1
self.assertIsNone(qdb.util.filepath_id_to_object_id(18))
# filepath 22 belongs to analysis/artifact 7
self.assertEqual(qdb.util.filepath_id_to_object_id(22), 7)
def test_check_required_columns(self):
# Doesn't do anything if correct info passed, only errors if wrong info
qdb.util.check_required_columns(self.required, self.table)
def test_check_required_columns_fail(self):
self.required.remove('study_title')
with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
qdb.util.check_required_columns(self.required, self.table)
def test_check_table_cols(self):
# Doesn't do anything if correct info passed, only errors if wrong info
qdb.util.check_table_cols(self.required, self.table)
def test_check_table_cols_fail(self):
self.required.append('BADTHINGNOINHERE')
with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
qdb.util.check_table_cols(self.required, self.table)
def test_get_table_cols(self):
obs = qdb.util.get_table_cols("qiita_user")
exp = {"email", "user_level_id", "password", "name", "affiliation",
"address", "phone", "user_verify_code", "pass_reset_code",
"pass_reset_timestamp", "receive_processing_job_emails",
"social_orcid", "social_researchgate", "social_googlescholar",
"creation_timestamp"}
self.assertEqual(set(obs), exp)
def test_exists_table(self):
"""Correctly checks if a table exists"""
# True cases
self.assertTrue(qdb.util.exists_table("filepath"))
self.assertTrue(qdb.util.exists_table("qiita_user"))
self.assertTrue(qdb.util.exists_table("analysis"))
self.assertTrue(qdb.util.exists_table("prep_1"))
self.assertTrue(qdb.util.exists_table("sample_1"))
# False cases
self.assertFalse(qdb.util.exists_table("sample_2"))
self.assertFalse(qdb.util.exists_table("prep_3"))
self.assertFalse(qdb.util.exists_table("foo_table"))
self.assertFalse(qdb.util.exists_table("bar_table"))
def test_convert_to_id(self):
"""Tests that ids are returned correctly"""
self.assertEqual(
qdb.util.convert_to_id("directory", "filepath_type"), 8)
self.assertEqual(
qdb.util.convert_to_id("private", "visibility", "visibility"), 3)
self.assertEqual(
qdb.util.convert_to_id("EMP", "portal_type", "portal"), 2)
def test_convert_to_id_bad_value(self):
"""Tests that ids are returned correctly"""
with self.assertRaises(qdb.exceptions.QiitaDBLookupError):
qdb.util.convert_to_id("FAKE", "filepath_type")
def test_get_artifact_types(self):
obs = qdb.util.get_artifact_types()
exp = {'SFF': 1, 'FASTA_Sanger': 2, 'FASTQ': 3, 'FASTA': 4,
'per_sample_FASTQ': 5, 'Demultiplexed': 6, 'BIOM': 7,
'beta_div_plots': 8, 'rarefaction_curves': 9,
'taxa_summary': 10}
self.assertEqual(obs, exp)
obs = qdb.util.get_artifact_types(key_by_id=True)
exp = {v: k for k, v in exp.items()}
self.assertEqual(obs, exp)
def test_get_filepath_types(self):
"""Tests that get_filepath_types works with valid arguments"""
obs = qdb.util.get_filepath_types()
exp = {'raw_forward_seqs': 1, 'raw_reverse_seqs': 2,
'raw_barcodes': 3, 'preprocessed_fasta': 4,
'preprocessed_fastq': 5, 'preprocessed_demux': 6, 'biom': 7,
'directory': 8, 'plain_text': 9, 'reference_seqs': 10,
'reference_tax': 11, 'reference_tree': 12, 'log': 13,
'sample_template': 14, 'prep_template': 15, 'qiime_map': 16,
'bam': 17
}
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add("SELECT filepath_type,filepath_type_id "
"FROM qiita.filepath_type")
exp = dict(qdb.sql_connection.TRN.execute_fetchindex())
self.assertEqual(obs, exp)
obs = qdb.util.get_filepath_types(key='filepath_type_id')
exp = {v: k for k, v in exp.items()}
self.assertEqual(obs, exp)
def test_get_filepath_types_fail(self):
"""Tests that get_Filetypes fails with invalid argument"""
with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
qdb.util.get_filepath_types(key='invalid')
def test_get_data_types(self):
"""Tests that get_data_types works with valid arguments"""
obs = qdb.util.get_data_types()
exp = {'16S': 1, '18S': 2, 'ITS': 3, 'Proteomic': 4, 'Metabolomic': 5,
'Metagenomic': 6, 'Multiomic': 7, 'Metatranscriptomics': 8,
'Viromics': 9, 'Genomics': 10, 'Transcriptomics': 11,
'Job Output Folder': 12}
self.assertEqual(obs, exp)
obs = qdb.util.get_data_types(key='data_type_id')
exp = {v: k for k, v in exp.items()}
self.assertEqual(obs, exp)
def test_create_rand_string(self):
set_punct = set(punctuation)
obs = qdb.util.create_rand_string(200)
self.assertEqual(len(obs), 200)
self.assertTrue(set_punct.intersection(set(obs)))
obs = qdb.util.create_rand_string(400, punct=False)
self.assertEqual(len(obs), 400)
self.assertFalse(set_punct.intersection(set(obs)))
def test_get_count(self):
"""Checks that get_count retrieves proper count"""
self.assertEqual(qdb.util.get_count('qiita.study_person'), 3)
def test_check_count(self):
"""Checks that check_count returns True and False appropriately"""
self.assertTrue(qdb.util.check_count('qiita.study_person', 3))
self.assertFalse(qdb.util.check_count('qiita.study_person', 2))
def test_insert_filepaths(self):
fd, fp = mkstemp()
close(fd)
with open(fp, "w") as f:
f.write("\n")
self.files_to_remove.append(fp)
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(
"SELECT last_value FROM qiita.filepath_filepath_id_seq")
exp_new_id = 1 + qdb.sql_connection.TRN.execute_fetchflatten()[0]
obs = qdb.util.insert_filepaths([(fp, 1)], 2, "raw_data")
self.assertEqual(obs, [exp_new_id])
# Check that the files have been copied correctly
exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
"2_%s" % basename(fp))
self.assertTrue(exists(exp_fp))
self.assertFalse(exists(fp))
self.files_to_remove.append(exp_fp)
# Check that the filepaths have been added to the DB
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add("SELECT * FROM qiita.filepath "
"WHERE filepath_id=%d" % exp_new_id)
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp_fp = "2_%s" % basename(fp)
exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5, 1]]
self.assertEqual(obs, exp)
qdb.util.purge_filepaths()
def test_insert_filepaths_copy(self):
fd, fp = mkstemp()
close(fd)
with open(fp, "w") as f:
f.write("\n")
self.files_to_remove.append(fp)
# The id's in the database are bigserials, i.e. they get
# autoincremented for each element introduced.
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(
"SELECT last_value FROM qiita.filepath_filepath_id_seq")
exp_new_id = 1 + qdb.sql_connection.TRN.execute_fetchflatten()[0]
obs = qdb.util.insert_filepaths([(fp, 1)], 2, "raw_data",
move_files=False, copy=True)
self.assertEqual(obs, [exp_new_id])
# Check that the files have been copied correctly
exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
"2_%s" % basename(fp))
self.assertTrue(exists(exp_fp))
self.assertTrue(exists(fp))
self.files_to_remove.append(exp_fp)
# Check that the filepaths have been added to the DB
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add("SELECT * FROM qiita.filepath "
"WHERE filepath_id=%d" % exp_new_id)
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp_fp = "2_%s" % basename(fp)
exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5, 1]]
self.assertEqual(obs, exp)
# let's do that again but with move_files = True
exp_new_id += 1
obs = qdb.util.insert_filepaths([(fp, 1)], 2, "raw_data",
move_files=True, copy=True)
self.assertEqual(obs, [exp_new_id])
# Check that the files have been copied correctly
exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
"2_%s" % basename(fp))
self.assertTrue(exists(exp_fp))
self.assertTrue(exists(fp))
self.files_to_remove.append(exp_fp)
qdb.util.purge_filepaths()
def test_insert_filepaths_string(self):
fd, fp = mkstemp()
close(fd)
with open(fp, "w") as f:
f.write("\n")
self.files_to_remove.append(fp)
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(
"SELECT last_value FROM qiita.filepath_filepath_id_seq")
exp_new_id = 1 + qdb.sql_connection.TRN.execute_fetchflatten()[0]
obs = qdb.util.insert_filepaths(
[(fp, "raw_forward_seqs")], 2, "raw_data")
self.assertEqual(obs, [exp_new_id])
# Check that the files have been copied correctly
exp_fp = join(qdb.util.get_db_files_base_dir(), "raw_data",
"2_%s" % basename(fp))
self.assertTrue(exists(exp_fp))
self.files_to_remove.append(exp_fp)
# Check that the filepaths have been added to the DB
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add("SELECT * FROM qiita.filepath "
"WHERE filepath_id=%d" % exp_new_id)
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp_fp = "2_%s" % basename(fp)
exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5, 1]]
self.assertEqual(obs, exp)
qdb.util.purge_filepaths()
def test_retrieve_filepaths(self):
obs = qdb.util.retrieve_filepaths('artifact_filepath',
'artifact_id', 1)
path_builder = partial(
join, qdb.util.get_db_files_base_dir(), "raw_data")
exp = [{'fp_id': 1,
'fp': path_builder("1_s_G1_L001_sequences.fastq.gz"),
'fp_type': "raw_forward_seqs",
'checksum': '2125826711',
'fp_size': 58},
{'fp_id': 2,
'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
'fp_type': "raw_barcodes",
'checksum': '2125826711',
'fp_size': 58}]
self.assertEqual(obs, exp)
def test_retrieve_filepaths_sort(self):
obs = qdb.util.retrieve_filepaths(
'artifact_filepath', 'artifact_id', 1, sort='descending')
path_builder = partial(
join, qdb.util.get_db_files_base_dir(), "raw_data")
exp = [{'fp_id': 2,
'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
'fp_type': "raw_barcodes",
'checksum': '2125826711',
'fp_size': 58},
{'fp_id': 1,
'fp': path_builder("1_s_G1_L001_sequences.fastq.gz"),
'fp_type': "raw_forward_seqs",
'checksum': '2125826711',
'fp_size': 58}]
self.assertEqual(obs, exp)
def test_retrieve_filepaths_type(self):
obs = qdb.util.retrieve_filepaths(
'artifact_filepath', 'artifact_id', 1, sort='descending',
fp_type='raw_barcodes')
path_builder = partial(
join, qdb.util.get_db_files_base_dir(), "raw_data")
exp = [{'fp_id': 2,
'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
'fp_type': "raw_barcodes",
'checksum': '2125826711',
'fp_size': 58}]
self.assertEqual(obs, exp)
obs = qdb.util.retrieve_filepaths(
'artifact_filepath', 'artifact_id', 1, fp_type='raw_barcodes')
path_builder = partial(
join, qdb.util.get_db_files_base_dir(), "raw_data")
exp = [{'fp_id': 2,
'fp': path_builder("1_s_G1_L001_sequences_barcodes.fastq.gz"),
'fp_type': "raw_barcodes",
'checksum': '2125826711',
'fp_size': 58}]
self.assertEqual(obs, exp)
obs = qdb.util.retrieve_filepaths(
'artifact_filepath', 'artifact_id', 1, fp_type='biom')
path_builder = partial(
join, qdb.util.get_db_files_base_dir(), "raw_data")
self.assertEqual(obs, [])
def test_retrieve_filepaths_error(self):
with self.assertRaises(qdb.exceptions.QiitaDBError):
qdb.util.retrieve_filepaths('artifact_filepath', 'artifact_id', 1,
sort='Unknown')
def test_empty_trash_upload_folder(self):
# creating file to delete so we know it actually works
study_id = '1'
uploads_fp = join(qdb.util.get_mountpoint("uploads")[0][1], study_id)
trash = join(uploads_fp, 'trash')
if not exists(trash):
mkdir(trash)
fp = join(trash, 'my_file_to_delete.txt')
open(fp, 'w').close()
self.assertTrue(exists(fp))
qdb.util.empty_trash_upload_folder()
self.assertFalse(exists(fp))
def test_move_filepaths_to_upload_folder(self):
# we are going to test the move_filepaths_to_upload_folder indirectly
# by creating an artifact and deleting it. To accomplish this we need
# to create a new prep info file, attach a biom with html_summary and
# then delete it. However, we will do this twice to assure that
# there are no conflicts with this
study_id = 1
# creating the 2 sets of files for the 2 artifacts
fd, seqs_fp1 = mkstemp(suffix='_seqs.fastq')
close(fd)
html_fp1 = mkdtemp()
html_fp1 = join(html_fp1, 'support_files')
mkdir(html_fp1)
with open(join(html_fp1, 'index.html'), 'w') as fp:
fp.write(">AAA\nAAA")
fd, seqs_fp2 = mkstemp(suffix='_seqs.fastq')
close(fd)
html_fp2 = mkdtemp()
html_fp2 = join(html_fp2, 'support_files')
mkdir(html_fp2)
with open(join(html_fp2, 'index.html'), 'w') as fp:
fp.write(">AAA\nAAA")
# creating new prep info file
metadata_dict = {
'SKB8.640193': {'center_name': 'ANL',
'primer': 'GTGCCAGCMGCCGCGGTAA',
'barcode': 'GTCCGCAAGTTA',
'run_prefix': "s_G1_L001_sequences",
'platform': 'Illumina',
'instrument_model': 'Illumina MiSeq',
'library_construction_protocol': 'AAAA',
'experiment_design_description': 'BBBB'}}
metadata = pd.DataFrame.from_dict(
metadata_dict, orient='index', dtype=str)
pt1 = qdb.metadata_template.prep_template.PrepTemplate.create(
metadata, qdb.study.Study(study_id), "16S")
pt2 = qdb.metadata_template.prep_template.PrepTemplate.create(
metadata, qdb.study.Study(study_id), "16S")
# inserting artifact 1
artifact1 = qdb.artifact.Artifact.create(
[(seqs_fp1, 1), (html_fp1, 'html_summary')], "FASTQ",
prep_template=pt1)
filepaths = artifact1.filepaths
# inserting artifact 2
artifact2 = qdb.artifact.Artifact.create(
[(seqs_fp2, 1), (html_fp2, 'html_summary')], "FASTQ",
prep_template=pt2)
filepaths.extend(artifact2.filepaths)
# get before delete files in upload folders
GUPLOADS = qdb.util.get_files_from_uploads_folders
upload_files = set(GUPLOADS("1"))
# delete artifact 1
qdb.artifact.Artifact.delete(artifact1.id)
# confirm that _only_ the fastq from the file is recovered; this means
# that all the extra files/folders were ignored
diff_upload = set(GUPLOADS("1")) - set(upload_files)
self.assertEqual(len(diff_upload), 1)
self.assertEqual(diff_upload.pop()[1], basename(seqs_fp1))
# finish deleting artifacts :: there should be a new fastq
qdb.artifact.Artifact.delete(artifact2.id)
diff_upload = set(GUPLOADS("1")) - set(upload_files)
self.assertEqual(len(diff_upload), 2)
self.assertCountEqual(
[x[1] for x in diff_upload],
[basename(seqs_fp1), basename(seqs_fp2)])
# now let's create another artifact with the same filenames that
# artifact1 so we can test successfull overlapping of names
with open(seqs_fp1, 'w') as fp:
fp.write(">AAA\nAAA")
mkdir(html_fp1)
with open(join(html_fp1, 'index.html'), 'w') as fp:
fp.write(">AAA\nAAA")
artifact3 = qdb.artifact.Artifact.create(
[(seqs_fp1, 1), (html_fp1, 'html_summary')], "FASTQ",
prep_template=pt1)
filepaths.extend(artifact3.filepaths)
qdb.artifact.Artifact.delete(artifact3.id)
# files should be the same as the previous test
diff_upload = set(GUPLOADS("1")) - set(upload_files)
self.assertEqual(len(diff_upload), 2)
self.assertCountEqual(
[x[1] for x in diff_upload],
[basename(seqs_fp1), basename(seqs_fp2)])
bd = qdb.util.get_mountpoint("uploads")[0][1]
for x in filepaths:
self.files_to_remove.append(join(bd, "1", basename(x['fp'])))
def test_get_mountpoint(self):
exp = [(5, join(qdb.util.get_db_files_base_dir(), 'raw_data'))]
obs = qdb.util.get_mountpoint("raw_data")
self.assertEqual(obs, exp)
exp = [(1, join(qdb.util.get_db_files_base_dir(), 'analysis'))]
obs = qdb.util.get_mountpoint("analysis")
self.assertEqual(obs, exp)
exp = [(2, join(qdb.util.get_db_files_base_dir(), 'job'))]
obs = qdb.util.get_mountpoint("job")
self.assertEqual(obs, exp)
# inserting new ones so we can test that it retrieves these and
# doesn't alter other ones
qdb.sql_connection.perform_as_transaction(
"UPDATE qiita.data_directory SET active=false WHERE "
"data_directory_id=1")
count = qdb.util.get_count('qiita.data_directory')
sql = """INSERT INTO qiita.data_directory (data_type, mountpoint,
subdirectory, active)
VALUES ('analysis', 'analysis_tmp', true, true),
('raw_data', 'raw_data_tmp', true, false)"""
qdb.sql_connection.perform_as_transaction(sql)
# this should have been updated
exp = [(count + 1, join(qdb.util.get_db_files_base_dir(),
'analysis_tmp'))]
obs = qdb.util.get_mountpoint("analysis")
self.assertEqual(obs, exp)
# these 2 shouldn't
exp = [(5, join(qdb.util.get_db_files_base_dir(), 'raw_data'))]
obs = qdb.util.get_mountpoint("raw_data")
self.assertEqual(obs, exp)
exp = [(2, join(qdb.util.get_db_files_base_dir(), 'job'))]
obs = qdb.util.get_mountpoint("job")
self.assertEqual(obs, exp)
# testing multi returns
exp = [(5, join(qdb.util.get_db_files_base_dir(), 'raw_data')),
(count + 2, join(qdb.util.get_db_files_base_dir(),
'raw_data_tmp'))]
obs = qdb.util.get_mountpoint("raw_data", retrieve_all=True)
self.assertEqual(obs, exp)
# testing retrieve subdirectory
exp = [
(5, join(qdb.util.get_db_files_base_dir(), 'raw_data'), False),
(count + 2, join(qdb.util.get_db_files_base_dir(), 'raw_data_tmp'),
True)]
obs = qdb.util.get_mountpoint("raw_data", retrieve_all=True,
retrieve_subdir=True)
self.assertEqual(obs, exp)
def test_get_mountpoint_path_by_id(self):
exp = join(qdb.util.get_db_files_base_dir(), 'raw_data')
obs = qdb.util.get_mountpoint_path_by_id(5)
self.assertEqual(obs, exp)
exp = join(qdb.util.get_db_files_base_dir(), 'analysis')
obs = qdb.util.get_mountpoint_path_by_id(1)
self.assertEqual(obs, exp)
exp = join(qdb.util.get_db_files_base_dir(), 'job')
obs = qdb.util.get_mountpoint_path_by_id(2)
self.assertEqual(obs, exp)
# inserting new ones so we can test that it retrieves these and
# doesn't alter other ones
qdb.sql_connection.perform_as_transaction(
"UPDATE qiita.data_directory SET active=false WHERE "
"data_directory_id=1")
count = qdb.util.get_count('qiita.data_directory')
sql = """INSERT INTO qiita.data_directory (data_type, mountpoint,
subdirectory, active)
VALUES ('analysis', 'analysis_tmp', true, true),
('raw_data', 'raw_data_tmp', true, false)"""
qdb.sql_connection.perform_as_transaction(sql)
# this should have been updated
exp = join(qdb.util.get_db_files_base_dir(), 'analysis_tmp')
obs = qdb.util.get_mountpoint_path_by_id(count + 1)
self.assertEqual(obs, exp)
# these 2 shouldn't
exp = join(qdb.util.get_db_files_base_dir(), 'raw_data')
obs = qdb.util.get_mountpoint_path_by_id(5)
self.assertEqual(obs, exp)
exp = join(qdb.util.get_db_files_base_dir(), 'job')
obs = qdb.util.get_mountpoint_path_by_id(2)
self.assertEqual(obs, exp)
def test_get_files_from_uploads_folders(self):
# something has been uploaded and ignoring hidden files/folders
# and folders
exp = (7, 'uploaded_file.txt', '0B')
obs = qdb.util.get_files_from_uploads_folders("1")
self.assertIn(exp, obs)
# nothing has been uploaded
exp = []
obs = qdb.util.get_files_from_uploads_folders("2")
self.assertEqual(obs, exp)
def test_move_upload_files_to_trash(self):
test_filename = 'this_is_a_test_file.txt'
# create file to move to trash
fid, folder = qdb.util.get_mountpoint("uploads")[0]
test_fp = join(folder, '1', test_filename)
with open(test_fp, 'w') as f:
f.write('test')
self.files_to_remove.append(test_fp)
exp = (fid, 'this_is_a_test_file.txt', '4B')
obs = qdb.util.get_files_from_uploads_folders("1")
self.assertIn(exp, obs)
# move file
qdb.util.move_upload_files_to_trash(1, [(fid, test_filename)])
obs = qdb.util.get_files_from_uploads_folders("1")
self.assertNotIn(obs, exp)
# if the file doesn't exist, don't raise any errors
qdb.util.move_upload_files_to_trash(1, [(fid, test_filename)])
# testing errors
# - study doesn't exist
with self.assertRaises(qdb.exceptions.QiitaDBError):
qdb.util.move_upload_files_to_trash(100, [(fid, test_filename)])
# - fid doen't exist
with self.assertRaises(qdb.exceptions.QiitaDBError):
qdb.util.move_upload_files_to_trash(1, [(10, test_filename)])
# removing trash folder
rmtree(join(folder, '1', 'trash'))
def test_get_environmental_packages(self):
obs = qdb.util.get_environmental_packages()
exp = [['air', 'ep_air'],
['built environment', 'ep_built_environment'],
['host-associated', 'ep_host_associated'],
['human-amniotic-fluid', 'ep_human_amniotic_fluid'],
['human-associated', 'ep_human_associated'],
['human-blood', 'ep_human_blood'],
['human-gut', 'ep_human_gut'],
['human-oral', 'ep_human_oral'],
['human-skin', 'ep_human_skin'],
['human-urine', 'ep_human_urine'],
['human-vaginal', 'ep_human_vaginal'],
['microbial mat/biofilm', 'ep_microbial_mat_biofilm'],
['miscellaneous natural or artificial environment',
'ep_misc_artif'],
['plant-associated', 'ep_plant_associated'],
['sediment', 'ep_sediment'],
['soil', 'ep_soil'],
['wastewater/sludge', 'ep_wastewater_sludge'],
['water', 'ep_water']]
self.assertEqual(sorted(obs), sorted(exp))
def test_get_timeseries_types(self):
obs = qdb.util.get_timeseries_types()
exp = [[1, 'None', 'None'],
[2, 'real', 'single intervention'],
[3, 'real', 'multiple intervention'],
[4, 'real', 'combo intervention'],
[5, 'pseudo', 'single intervention'],
[6, 'pseudo', 'multiple intervention'],
[7, 'pseudo', 'combo intervention'],
[8, 'mixed', 'single intervention'],
[9, 'mixed', 'multiple intervention'],
[10, 'mixed', 'combo intervention']]
self.assertEqual(obs, exp)
def test_get_filepath_information(self):
obs = qdb.util.get_filepath_information(1)
# This path is machine specific. Just checking that is not empty
self.assertIsNotNone(obs.pop('fullpath'))
exp = {'filepath_id': 1, 'filepath': '1_s_G1_L001_sequences.fastq.gz',
'filepath_type': 'raw_forward_seqs', 'checksum': '2125826711',
'data_type': 'raw_data', 'mountpoint': 'raw_data',
'subdirectory': False, 'active': True}
self.assertEqual(obs, exp)
def test_filepath_id_to_rel_path(self):
obs = qdb.util.filepath_id_to_rel_path(1)
exp = 'raw_data/1_s_G1_L001_sequences.fastq.gz'
self.assertEqual(obs, exp)
obs = qdb.util.filepath_id_to_rel_path(3)
exp = 'preprocessed_data/1_seqs.fna'
self.assertEqual(obs, exp)
fd, fp = mkstemp()
close(fd)
with open(fp, 'w') as f:
f.write('\n')
self.files_to_remove.append(fp)
test = qdb.util.insert_filepaths(
[(fp, "raw_forward_seqs")], 2, "FASTQ")[0]
sql = """INSERT INTO qiita.artifact_filepath
(artifact_id, filepath_id)
VALUES (%s, %s)"""
qdb.sql_connection.perform_as_transaction(sql, [2, test])
obs = qdb.util.filepath_id_to_rel_path(test)
exp = 'FASTQ/2/%s' % basename(fp)
self.assertEqual(obs, exp)
def test_filepath_ids_to_rel_paths(self):
fd, fp = mkstemp()
close(fd)
with open(fp, 'w') as f:
f.write('\n')
self.files_to_remove.append(fp)
test = qdb.util.insert_filepaths(
[(fp, "raw_forward_seqs")], 2, "FASTQ")[0]
sql = """INSERT INTO qiita.artifact_filepath
(artifact_id, filepath_id)
VALUES (%s, %s)"""
qdb.sql_connection.perform_as_transaction(sql, [2, test])
obs = qdb.util.filepath_ids_to_rel_paths([1, 3, test])
exp = {1: 'raw_data/1_s_G1_L001_sequences.fastq.gz',
3: 'preprocessed_data/1_seqs.fna',
test: 'FASTQ/2/%s' % basename(fp)}
self.assertEqual(obs, exp)
def test_add_message(self):
count = qdb.util.get_count('qiita.message') + 1
user = qdb.user.User.create('new@test.bar', 'password')
users = [user]
qdb.util.add_message("TEST MESSAGE", users)
obs = [[x[0], x[1]] for x in user.messages()]
exp = [[count, 'TEST MESSAGE']]
self.assertEqual(obs, exp)
def test_add_system_message(self):
count = qdb.util.get_count('qiita.message') + 1
qdb.util.add_system_message("SYS MESSAGE",
datetime(2015, 8, 5, 19, 41))
obs = [[x[0], x[1]]
for x in qdb.user.User('shared@foo.bar').messages()]
exp = [[count, 'SYS MESSAGE'], [1, 'message 1']]
self.assertEqual(obs, exp)
obs = [[x[0], x[1]] for x in qdb.user.User('admin@foo.bar').messages()]
exp = [[count, 'SYS MESSAGE']]
self.assertEqual(obs, exp)
sql = "SELECT expiration from qiita.message WHERE message_id = %s"
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql, [count])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[datetime(2015, 8, 5, 19, 41)]]
self.assertEqual(obs, exp)
def test_clear_system_messages(self):
message_id = qdb.util.get_count('qiita.message') + 1
user = qdb.user.User.create('csm@test.bar', 'password')
obs = [[x[0], x[1]] for x in user.messages()]
exp = []
self.assertEqual(obs, exp)
qdb.util.add_system_message("SYS MESSAGE",
datetime(2015, 8, 5, 19, 41))
obs = [[x[0], x[1]] for x in user.messages()]
exp = [[message_id, 'SYS MESSAGE']]
self.assertCountEqual(obs, exp)
qdb.util.clear_system_messages()
obs = [[x[0], x[1]] for x in user.messages()]
exp = []
self.assertEqual(obs, exp)
# Run again with no system messages to make sure no errors
qdb.util.clear_system_messages()
def test_supported_filepath_types(self):
obs = qdb.util.supported_filepath_types("FASTQ")
exp = [["raw_forward_seqs", True], ["raw_reverse_seqs", False],
["raw_barcodes", True]]
self.assertCountEqual(obs, exp)
obs = qdb.util.supported_filepath_types("BIOM")
exp = [["biom", True], ["directory", False], ["log", False]]
self.assertCountEqual(obs, exp)
def test_generate_analysis_list(self):
self.assertEqual(qdb.util.generate_analysis_list([]), [])
obs = qdb.util.generate_analysis_list([1, 2, 3, 5])
exp = [{'mapping_files': [
(16, qdb.util.get_filepath_information(16)['fullpath'])],
'description': 'A test analysis', 'artifacts': [8, 9], 'name':
'SomeAnalysis', 'owner': 'test@foo.bar', 'analysis_id': 1,
'visibility': 'private'},
{'mapping_files': [], 'description': 'Another test analysis',
'artifacts': [], 'name': 'SomeSecondAnalysis',
'owner': 'admin@foo.bar',
'analysis_id': 2, 'visibility': 'private'}]
# removing timestamp for testing
for i in range(len(obs)):
del obs[i]['timestamp']
self.assertEqual(obs, exp)
self.assertEqual(
qdb.util.generate_analysis_list([1, 2, 3, 5], True), [])
@qiita_test_checker()
class UtilTests(TestCase):
"""Tests for the util functions that do not need to access the DB"""
def setUp(self):
fh, self.filepath = mkstemp()
close(fh)
with open(self.filepath, "w") as f:
f.write("Some text so we can actually compute a checksum")
def test_compute_checksum(self):
"""Correctly returns the file checksum"""
obs = qdb.util.compute_checksum(self.filepath)
exp = 1719580229
self.assertEqual(obs, exp)
def test_scrub_data_nothing(self):
"""Returns the same string without changes"""
self.assertEqual(qdb.util.scrub_data("nothing_changes"),
"nothing_changes")
def test_scrub_data_semicolon(self):
"""Correctly removes the semicolon from the string"""
self.assertEqual(qdb.util.scrub_data("remove_;_char"), "remove__char")
def test_scrub_data_single_quote(self):
"""Correctly removes single quotes from the string"""
self.assertEqual(qdb.util.scrub_data("'quotes'"), "quotes")
def test_get_visibilities(self):
obs = qdb.util.get_visibilities()
exp = ['awaiting_approval', 'sandbox', 'private', 'public', 'archived']
self.assertEqual(obs, exp)
def test_infer_status(self):
obs = qdb.util.infer_status([])
self.assertEqual(obs, 'sandbox')
obs = qdb.util.infer_status([['private']])
self.assertEqual(obs, 'private')
obs = qdb.util.infer_status([['private'], ['public']])
self.assertEqual(obs, 'public')
obs = qdb.util.infer_status([['sandbox'], ['awaiting_approval']])
self.assertEqual(obs, 'awaiting_approval')
obs = qdb.util.infer_status([['sandbox'], ['sandbox']])
self.assertEqual(obs, 'sandbox')
def test_get_pubmed_ids_from_dois(self):
exp = {'10.100/123456': '123456'}
obs = qdb.util.get_pubmed_ids_from_dois(['', '10.100/123456'])
self.assertEqual(obs, exp)
def test_generate_study_list(self):
USER = qdb.user.User
STUDY = qdb.study.Study
PREP = qdb.metadata_template.prep_template.PrepTemplate
UTIL = qdb.util
# testing owner email as name
user = USER('test@foo.bar')
username = user.info['name']
# test without changes
self.assertDictEqual(
STUDY_INFO, UTIL.generate_study_list(user, 'user')[0])
# change user's name to None and tests again
user.info = {'name': None}
exp = STUDY_INFO.copy()
exp['owner'] = 'test@foo.bar'
self.assertDictEqual(
exp, qdb.util.generate_study_list(user, 'user')[0])
# returning original name
user.info = {'name': username}
# creating a new study to make sure that empty studies are also
# returned
info = {"timeseries_type_id": 1, "metadata_complete": True,
"mixs_compliant": True, "study_alias": "TST",
"study_description": "Some description of the study goes here",
"study_abstract": "Some abstract goes here",
"principal_investigator_id": qdb.study.StudyPerson(1),
"lab_person_id": qdb.study.StudyPerson(1)}
new_study = STUDY.create(
USER('shared@foo.bar'), 'test_study_1', info=info)
snew_info = {
'study_title': 'test_study_1',
'metadata_complete': True, 'publication_pid': [],
'artifact_biom_ids': [], 'autoloaded': False,
'study_id': new_study.id, 'ebi_study_accession': None,
'owner': 'Shared', 'shared': [],
'study_abstract': 'Some abstract goes here',
'pi': ('lab_dude@foo.bar', 'LabDude'), 'publication_doi': [],
'study_alias': 'TST', 'study_tags': None,
'preparation_data_types': [], 'number_samples_collected': 0}
exp1 = [STUDY_INFO]
exp2 = [snew_info]
exp_both = [STUDY_INFO, snew_info]
# let's make sure that everything is private for study 1
for a in STUDY(1).artifacts():
a.visibility = 'private'
# owner of study
obs = UTIL.generate_study_list(USER('test@foo.bar'), 'user')
self.assertEqual(len(obs), 1)
self.assertDictEqual(obs[0], exp1[0])
# shared with
obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'user')
self.assertEqual(len(obs), 2)
self.assertDictEqual(obs[0], exp_both[0])
self.assertDictEqual(obs[1], exp_both[1])
# admin
obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'user')
self.assertEqual(obs, exp_both)
# no access/hidden
obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'user')
self.assertEqual(obs, [])
# public - none for everyone
obs = UTIL.generate_study_list(USER('test@foo.bar'), 'public')
self.assertEqual(obs, [])
obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'public')
self.assertEqual(obs, [])
obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'public')
self.assertEqual(obs, [])
obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'public')
self.assertEqual(obs, [])
def _avoid_duplicated_tests(all_artifacts=False):
# nothing should shange for owner, shared
obs = UTIL.generate_study_list(USER('test@foo.bar'), 'user')
self.assertEqual(obs, exp1)
obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'user')
self.assertEqual(obs, exp_both)
# for admin it should be shown in public and user cause there are
# 2 preps and only one is public
obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'user')
if not all_artifacts:
self.assertEqual(obs, exp_both)
else:
self.assertEqual(obs, exp2)
obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'user')
self.assertEqual(obs, [])
# for the public query, everything should be same for owner, share
# and admin but demo should now see it as public but with limited
# artifacts
obs = UTIL.generate_study_list(USER('test@foo.bar'), 'public')
self.assertEqual(obs, [])
obs = UTIL.generate_study_list(USER('shared@foo.bar'), 'public')
self.assertEqual(obs, [])
obs = UTIL.generate_study_list(USER('admin@foo.bar'), 'public')
if not all_artifacts:
exp1[0]['artifact_biom_ids'] = [7]
self.assertEqual(obs, exp1)
obs = UTIL.generate_study_list(USER('demo@microbio.me'), 'public')
self.assertEqual(obs, exp1)
# returning artifacts
exp1[0]['artifact_biom_ids'] = [4, 5, 6, 7]
# make artifacts of prep 2 public
PREP(2).artifact.visibility = 'public'
_avoid_duplicated_tests()
# make artifacts of prep 1 awaiting_approval
PREP(1).artifact.visibility = 'awaiting_approval'
_avoid_duplicated_tests()
# making all studies public
PREP(1).artifact.visibility = 'public'
_avoid_duplicated_tests(True)
# deleting the new study study and returning artifact status
qdb.study.Study.delete(new_study.id)
PREP(1).artifact.visibility = 'private'
PREP(2).artifact.visibility = 'private'
def test_generate_study_list_errors(self):
with self.assertRaises(ValueError):
qdb.util.generate_study_list(qdb.user.User('test@foo.bar'), 'bad')
def test_generate_study_list_without_artifacts(self):
# creating a new study to make sure that empty studies are also
# returned
info = {"timeseries_type_id": 1, "metadata_complete": True,
"mixs_compliant": True, "study_alias": "TST",
"study_description": "Some description of the study goes here",
"study_abstract": "Some abstract goes here",
"principal_investigator_id": qdb.study.StudyPerson(1),
"lab_person_id": qdb.study.StudyPerson(1)}
new_study = qdb.study.Study.create(
qdb.user.User('shared@foo.bar'), 'test_study_1', info=info)
exp_info = [
{'study_title': (
'Identification of the Microbiomes for Cannabis Soils'),
'metadata_complete': True, 'publication_pid': [
'123456', '7891011'],
'study_id': 1, 'ebi_study_accession': 'EBI123456-BB',
'autoloaded': False,
'study_abstract': (
'This is a preliminary study to examine the microbiota '
'associated with the Cannabis plant. Soils samples from '
'the bulk soil, soil associated with the roots, and the '
'rhizosphere were extracted and the DNA sequenced. Roots '
'from three independent plants of different strains were '
'examined. These roots were obtained November 11, 2011 from '
'plants that had been harvested in the summer. Future studies '
'will attempt to analyze the soils and rhizospheres from the '
'same location at different time points in the plant '
'lifecycle.'), 'pi': ('PI_dude@foo.bar', 'PIDude'),
'publication_doi': ['10.100/123456', '10.100/7891011'],
'study_alias': 'Cannabis Soils', 'number_samples_collected': 27},
{'study_title': 'test_study_1',
'metadata_complete': True, 'publication_pid': [],
'autoloaded': False,
'study_id': new_study.id, 'ebi_study_accession': None,
'study_abstract': 'Some abstract goes here',
'pi': ('lab_dude@foo.bar', 'LabDude'), 'publication_doi': [],
'study_alias': 'TST', 'number_samples_collected': 0}]
obs_info = qdb.util.generate_study_list_without_artifacts([1, 2, 3, 4])
self.assertEqual(obs_info, exp_info)
obs_info = qdb.util.generate_study_list_without_artifacts(
[1, 2, 3, 4], 'EMP')
self.assertEqual(obs_info, [])
# deleting the old study
qdb.study.Study.delete(new_study.id)
def test_get_artifacts_information(self):
# we are going to test that it ignores 1 and 2 cause they are not biom,
# 4 has all information and 7 and 8 don't
obs = qdb.util.get_artifacts_information([1, 2, 4, 6, 7, 8])
# not testing timestamp
for i in range(len(obs)):
del obs[i]['timestamp']
exp = [
{'artifact_id': 6, 'target_subfragment': ['V4'],
'prep_samples': 27, 'platform': 'Illumina',
'target_gene': '16S rRNA', 'name': 'BIOM', 'data_type': '16S',
'parameters': {'reference': '2', 'similarity': '0.97',
'sortmerna_e_value': '1',
'sortmerna_max_pos': '10000', 'threads': '1',
'sortmerna_coverage': '0.97'},
'algorithm': 'Pick closed-reference OTUs | Split libraries FASTQ',
'algorithm_az': 'd480799a0a7a2fbe0e9022bc9c602018',
'deprecated': False, 'active': True,
'files': ['1_study_1001_closed_reference_otu_table_Silva.biom']},
{'artifact_id': 4, 'target_subfragment': ['V4'],
'prep_samples': 27, 'platform': 'Illumina',
'target_gene': '16S rRNA', 'name': 'BIOM', 'data_type': '18S',
'parameters': {'reference': '1', 'similarity': '0.97',
'sortmerna_e_value': '1',
'sortmerna_max_pos': '10000', 'threads': '1',
'sortmerna_coverage': '0.97'},
'algorithm': 'Pick closed-reference OTUs | Split libraries FASTQ',
'algorithm_az': 'd480799a0a7a2fbe0e9022bc9c602018',
'deprecated': False, 'active': True,
'files': ['1_study_1001_closed_reference_otu_table.biom']},
{'artifact_id': 7, 'target_subfragment': ['V4'],
'prep_samples': 27, 'platform': 'Illumina',
'target_gene': '16S rRNA', 'name': 'BIOM', 'data_type': '16S',
'parameters': {}, 'algorithm': '', 'algorithm_az': '',
'deprecated': False, 'active': True,
'files': ['biom_table.biom']},
{'artifact_id': 8, 'target_subfragment': [], 'prep_samples': 0,
'platform': 'not provided', 'target_gene': 'not provided', 'name':
'noname', 'data_type': '18S', 'parameters': {}, 'algorithm': '',
'algorithm_az': '', 'deprecated': False, 'active': True,
'files': ['biom_table.biom']}]
self.assertCountEqual(obs, exp)
exp = exp[1:]
# now let's test that the order given by the commands actually give the
# correct results
with qdb.sql_connection.TRN:
# setting up database changes for just checking commands
qdb.sql_connection.TRN.add(
"""UPDATE qiita.command_parameter SET check_biom_merge = True
WHERE parameter_name = 'reference'""")
qdb.sql_connection.TRN.execute()
# testing that it works as expected
obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
# not testing timestamp
for i in range(len(obs)):
del obs[i]['timestamp']
exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1) '
'| Split libraries FASTQ')
exp[0]['algorithm_az'] = '33fed1b35728417d7ba4139b8f817d44'
self.assertCountEqual(obs, exp)
# setting up database changes for also command output
qdb.sql_connection.TRN.add(
"UPDATE qiita.command_output SET check_biom_merge = True")
qdb.sql_connection.TRN.execute()
obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
# not testing timestamp
for i in range(len(obs)):
del obs[i]['timestamp']
exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1, '
'BIOM: 1_study_1001_closed_reference_'
'otu_table.biom) | Split libraries FASTQ')
exp[0]['algorithm_az'] = 'de5b794a2cacd428f36fea86df196bfd'
self.assertCountEqual(obs, exp)
# let's test that we ignore the parent_info
qdb.sql_connection.TRN.add("""UPDATE qiita.software_command
SET ignore_parent_command = True""")
qdb.sql_connection.TRN.execute()
obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
# not testing timestamp
for i in range(len(obs)):
del obs[i]['timestamp']
exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1, '
'BIOM: 1_study_1001_closed_reference_'
'otu_table.biom)')
exp[0]['algorithm_az'] = '7f59a45b2f0d30cd1ed1929391c26e07'
self.assertCountEqual(obs, exp)
# let's test that we ignore the parent_info
qdb.sql_connection.TRN.add("""UPDATE qiita.software_command
SET ignore_parent_command = True""")
qdb.sql_connection.TRN.execute()
obs = qdb.util.get_artifacts_information([1, 2, 4, 7, 8])
# not testing timestamp
for i in range(len(obs)):
del obs[i]['timestamp']
exp[0]['algorithm'] = ('Pick closed-reference OTUs (reference: 1, '
'BIOM: 1_study_1001_closed_reference_'
'otu_table.biom)')
exp[0]['algorithm_az'] = '7f59a45b2f0d30cd1ed1929391c26e07'
self.assertCountEqual(obs, exp)
# returning database as it was
qdb.sql_connection.TRN.add(
"UPDATE qiita.command_output SET check_biom_merge = False")
qdb.sql_connection.TRN.add("""UPDATE qiita.software_command
SET ignore_parent_command = False""")
qdb.sql_connection.TRN.add(
"""UPDATE qiita.command_parameter SET check_biom_merge = False
WHERE parameter_name = 'reference'""")
qdb.sql_connection.TRN.execute()
class TestFilePathOpening(TestCase):
"""Tests adapted from scikit-bio's skbio.io.util tests"""
def test_is_string_or_bytes(self):
self.assertTrue(qdb.util._is_string_or_bytes('foo'))
self.assertTrue(qdb.util._is_string_or_bytes(u'foo'))
self.assertTrue(qdb.util._is_string_or_bytes(b'foo'))
self.assertFalse(qdb.util._is_string_or_bytes(StringIO('bar')))
self.assertFalse(qdb.util._is_string_or_bytes([1]))
def test_file_closed(self):
"""File gets closed in decorator"""
f = NamedTemporaryFile('r')
filepath = f.name
with qdb.util.open_file(filepath) as fh:
pass
self.assertTrue(fh.closed)
def test_file_closed_harder(self):
"""File gets closed in decorator, even if exceptions happen."""
f = NamedTemporaryFile('r')
filepath = f.name
try:
with qdb.util.open_file(filepath) as fh:
raise TypeError
except TypeError:
self.assertTrue(fh.closed)
else:
# If we're here, no exceptions have been raised inside the
# try clause, so the context manager swallowed them. No
# good.
raise Exception("`open_file` didn't propagate exceptions")
def test_filehandle(self):
"""Filehandles slip through untouched"""
with TemporaryFile('r') as fh:
with qdb.util.open_file(fh) as ffh:
self.assertTrue(fh is ffh)
# And it doesn't close the file-handle
self.assertFalse(fh.closed)
def test_StringIO(self):
"""StringIO (useful e.g. for testing) slips through."""
f = StringIO("File contents")
with qdb.util.open_file(f) as fh:
self.assertTrue(fh is f)
def test_BytesIO(self):
"""BytesIO (useful e.g. for testing) slips through."""
f = BytesIO(b"File contents")
with qdb.util.open_file(f) as fh:
self.assertTrue(fh is f)
def test_hdf5IO(self):
"""This tests that if we send a file handler it returns it"""
f = h5py.File('test', driver='core', backing_store=False, mode='w')
with qdb.util.open_file(f) as fh:
self.assertTrue(fh is f)
def test_hdf5IO_open(self):
with NamedTemporaryFile(delete=False) as fh:
name = fh.name
fh.close()
h5file = h5py.File(name, 'w')
h5file.close()
with qdb.util.open_file(name) as fh_inner:
self.assertTrue(isinstance(fh_inner, h5py.File))
remove(name)
class PurgeFilepathsTests(DBUtilTestsBase):
def _get_current_filepaths(self):
sql_fp = "SELECT filepath_id FROM qiita.filepath"
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql_fp)
results = qdb.sql_connection.TRN.execute_fetchflatten()
return [qdb.util.get_filepath_information(_id)['fullpath']
for _id in results]
def _create_files(self, files):
# format is: [mp_id, fp_type_id, file_name]
sql = """INSERT INTO qiita.filepath (
data_directory_id, filepath_type_id, filepath, checksum,
checksum_algorithm_id)
VALUES (%s, %s, %s, '852952723', 1) RETURNING filepath_id"""
with qdb.sql_connection.TRN:
for f in files:
qdb.sql_connection.TRN.add(sql, tuple(f))
fid = qdb.sql_connection.TRN.execute_fetchflatten()[0]
qdb.util.get_filepath_information(fid)
def test_purge_filepaths_test(self):
# Get all the filepaths so we can test if they've been removed or not
fps_expected = self._get_current_filepaths()
# Make sure that the files exist - specially for travis
for fp in fps_expected:
if not exists(fp):
with open(fp, 'w') as f:
f.write('\n')
self.files_to_remove.append(fp)
# nothing shold be removed
qdb.util.purge_filepaths()
fps_viewed = self._get_current_filepaths()
self.assertCountEqual(fps_expected, fps_viewed)
# testing study filepath delete by inserting a new study sample info
# and make sure it gets deleted
mp_id, mp = qdb.util.get_mountpoint('templates')[0]
txt_id = qdb.util.convert_to_id('sample_template', "filepath_type")
self._create_files([[mp_id, txt_id, '100_filepath.txt']])
qdb.util.purge_filepaths()
fps_viewed = self._get_current_filepaths()
self.assertCountEqual(fps_expected, fps_viewed)
# testing artifact [A], creating a folder with an artifact that
# doesn't exist
_, mp = qdb.util.get_mountpoint('per_sample_FASTQ')[0]
not_an_artifact_fp = join(mp, '10000')
mkdir(not_an_artifact_fp)
# now let's add test for [B] by creating 2 filepaths without a
# link to the artifacts tables
mp_id, mp = qdb.util.get_mountpoint('BIOM')[0]
biom_id = qdb.util.convert_to_id('biom', "filepath_type")
self._create_files([
[mp_id, txt_id, 'artifact_filepath.txt'],
[mp_id, biom_id, 'my_biom.biom']
])
# adding files to tests
qdb.util.purge_filepaths()
fps_viewed = self._get_current_filepaths()
self.assertCountEqual(fps_expected, fps_viewed)
self.assertFalse(exists(not_an_artifact_fp))
# testing analysis filepath delete by filepaths for 2 different files
# and making sure they get deleted
mp_id, mp = qdb.util.get_mountpoint('analysis')[0]
biom_id = qdb.util.convert_to_id('biom', "filepath_type")
self._create_files([
[mp_id, txt_id, '10000_my_analysis_map.txt'],
[mp_id, biom_id, '10000_my_analysis_biom.biom']
])
qdb.util.purge_filepaths()
fps_viewed = self._get_current_filepaths()
self.assertCountEqual(fps_expected, fps_viewed)
def test_quick_mounts_purge(self):
# one of the tests creates a conflicting artifact_type so this test
# will always raise this ValueError
with self.assertRaises(ValueError):
qdb.util.quick_mounts_purge()
class ResourceAllocationPlotTests(TestCase):
def setUp(self):
self.cname = "Split libraries FASTQ"
self.sname = "QIIMEq2"
self.version = "1.9.1"
self.col_name = 'samples * columns'
self.columns = [
"sName", "sVersion", "cID", "cName", "processing_job_id",
"parameters", "samples", "columns", "input_size", "extra_info",
"MaxRSSRaw", "ElapsedRaw", "Start", "node_name", "node_model"]
# df is a dataframe that represents a table with columns specified in
# self.columns
self.df = qdb.util.retrieve_resource_data(
self.cname, self.sname, self.version, self.columns)
def test_plot_return(self):
# check the plot returns correct objects
fig1, axs1 = qdb.util.resource_allocation_plot(self.df, self.col_name)
self.assertIsInstance(
fig1, Figure,
"Returned object fig1 is not a Matplotlib Figure")
for ax in axs1:
self.assertIsInstance(
ax, Axes,
"Returned object axs1 is not a single Matplotlib Axes object")
def test_minimize_const(self):
self.df = self.df[
(self.df.cName == self.cname) & (self.df.sName == self.sname)]
self.df.dropna(subset=['samples', 'columns'], inplace=True)
self.df[self.col_name] = self.df.samples * self.df['columns']
fig, axs = plt.subplots(ncols=2, figsize=(10, 4), sharey=False)
mem_models, time_models = qdb.util.retrieve_equations()
bm_name, bm, options = qdb.util._resource_allocation_plot_helper(
self.df, axs[0], 'MaxRSSRaw', mem_models, self.col_name)
# check that the algorithm chooses correct model for MaxRSSRaw and
# has 0 failures
k, a, b = options.x
failures_df = qdb.util._resource_allocation_success_failures(
self.df, k, a, b, bm, self.col_name, 'MaxRSSRaw')[-1]
failures = failures_df.shape[0]
self.assertEqual(bm_name, 'mem_model4',
msg=f"""Best memory model
doesn't match
{bm_name} != 'mem_model4'""")
self.assertEqual(bm, mem_models['mem_model4']['equation'],
msg=f"""Best memory model
doesn't match
Coefficients:{k} {a} {b}
""")
self.assertEqual(failures, 0, "Number of failures must be 0")
# check that the algorithm chooses correct model for ElapsedRaw and
# has 1 failure
bm_name, bm, options = qdb.util._resource_allocation_plot_helper(
self.df, axs[1], 'ElapsedRaw', time_models, self.col_name)
k, a, b = options.x
failures_df = qdb.util._resource_allocation_success_failures(
self.df, k, a, b, bm, self.col_name, 'ElapsedRaw')[-1]
failures = failures_df.shape[0]
self.assertEqual(bm_name, 'time_model4',
msg=f"""Best time model
doesn't match
{bm_name} != 'time_model4'""")
self.assertEqual(bm, time_models[bm_name]['equation'],
msg=f"""Best time model
doesn't match
Coefficients:{k} {a} {b}
""")
self.assertEqual(failures, 0, "Number of failures must be 0")
def test_MaxRSS_helper(self):
tests = [
('6', 6.0),
('6K', 6000),
('6M', 6000000),
('6G', 6000000000),
('6.9', 6.9),
('6.9K', 6900),
('6.9M', 6900000),
('6.9G', 6900000000),
]
for x, y in tests:
self.assertEqual(qdb.util.MaxRSS_helper(x), y)
def test_db_update(self):
path_to_data = './qiita_db/test/test_data/slurm_data.txt.gz'
test_data = pd.read_csv(path_to_data, sep="|")
types = {
'Split libraries FASTQ': [
'6d368e16-2242-4cf8-87b4-a5dc40bb890b',
'4c7115e8-4c8e-424c-bf25-96c292ca1931',
'b72369f9-a886-4193-8d3d-f7b504168e75',
'46b76f74-e100-47aa-9bf2-c0208bcea52d',
'6ad4d590-4fa3-44d3-9a8f-ddbb472b1b5f'],
'Pick closed-reference OTUs': [
'3c9991ab-6c14-4368-a48c-841e8837a79c',
'80bf25f3-5f1d-4e10-9369-315e4244f6d5',
'9ba5ae7a-41e1-4202-b396-0259aeaac366',
'e5609746-a985-41a1-babf-6b3ebe9eb5a9',
],
'Single Rarefaction': [
'8a7a8461-e8a1-4b4e-a428-1bc2f4d3ebd0'
]
}
qdb.util.update_resource_allocation_table(test=test_data)
for curr_cname, ids in types.items():
updated_df = qdb.util.retrieve_resource_data(
curr_cname, self.sname, self.version, self.columns)
updated_ids_set = set(updated_df['processing_job_id'])
previous_ids_set = set(self.df['processing_job_id'])
for id in ids:
self.assertTrue(id in updated_ids_set)
self.assertFalse(id in previous_ids_set)
STUDY_INFO = {
'study_id': 1,
'owner': 'Dude',
'study_alias': 'Cannabis Soils',
'study_abstract':
'This is a preliminary study to examine the microbiota '
'associated with the Cannabis plant. Soils samples '
'from the bulk soil, soil associated with the roots, '
'and the rhizosphere were extracted and the DNA '
'sequenced. Roots from three independent plants of '
'different strains were examined. These roots were '
'obtained November 11, 2011 from plants that had been '
'harvested in the summer. Future studies will attempt '
'to analyze the soils and rhizospheres from the same '
'location at different time points in the plant '
'lifecycle.',
'metadata_complete': True,
'autoloaded': False,
'ebi_study_accession': 'EBI123456-BB',
'study_title':
'Identification of the Microbiomes for Cannabis Soils',
'number_samples_collected': 27,
'shared': [('shared@foo.bar', 'Shared')],
'publication_doi': ['10.100/123456', '10.100/7891011'],
'publication_pid': ['123456', '7891011'],
'pi': ('PI_dude@foo.bar', 'PIDude'),
'artifact_biom_ids': [4, 5, 6, 7],
'preparation_data_types': ['18S'],
'study_tags': None,
}
if __name__ == '__main__':
main()