--- a +++ b/qiita_db/test/test_sql.py @@ -0,0 +1,260 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) 2014--, The Qiita Development Team. +# +# Distributed under the terms of the BSD 3-clause License. +# +# The full license is in the file LICENSE, distributed with this software. +# ----------------------------------------------------------------------------- + +from unittest import TestCase, main +from tempfile import mkstemp +from os import close, remove +from os.path import exists + +import pandas as pd + +from qiita_core.util import qiita_test_checker +import qiita_db as qdb + + +@qiita_test_checker() +class TestSQL(TestCase): + """Tests that the database triggers and procedures work properly""" + def setUp(self): + self._files_to_remove = [] + + def tearDown(self): + for fp in self._files_to_remove: + if exists(fp): + remove(fp) + + def test_find_artifact_roots_is_root(self): + """Correctly returns the root if the artifact is already the root""" + with qdb.sql_connection.TRN: + sql = "SELECT * FROM qiita.find_artifact_roots(%s)" + qdb.sql_connection.TRN.add(sql, [1]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[1]] + self.assertEqual(obs, exp) + + def test_find_artifact_roots_is_child(self): + """Correctly returns the root if the artifact is a child""" + with qdb.sql_connection.TRN: + sql = "SELECT * FROM qiita.find_artifact_roots(%s)" + qdb.sql_connection.TRN.add(sql, [4]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[1]] + self.assertEqual(obs, exp) + + def test_find_artifact_roots_is_child_multiple_parents_one_root(self): + """Correctly returns the roots if the children has multiple parents + but a single root + """ + fd, fp = mkstemp(suffix='_table.biom') + close(fd) + self._files_to_remove.append(fp) + with open(fp, 'w') as f: + f.write("test") + fp = [(fp, 7)] + params = qdb.software.Parameters.from_default_params( + qdb.software.DefaultParameters(10), {'input_data': 2}) + new = qdb.artifact.Artifact.create( + fp, "BIOM", + parents=[qdb.artifact.Artifact(2), qdb.artifact.Artifact(3)], + processing_parameters=params) + self._files_to_remove.extend([x['fp'] for x in new.filepaths]) + with qdb.sql_connection.TRN: + sql = "SELECT * FROM qiita.find_artifact_roots(%s)" + qdb.sql_connection.TRN.add(sql, [new.id]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[1]] + self.assertEqual(obs, exp) + + def _create_root_artifact(self): + """Creates a new root artifact""" + metadata = pd.DataFrame.from_dict( + {'SKB8.640193': {'center_name': 'ANL', + 'primer': 'GTGCCAGCMGCCGCGGTAA', + 'barcode': 'GTCCGCAAGTTA', + 'run_prefix': "s_G1_L001_sequences", + 'platform': 'Illumina', + 'target_gene': '16S rRNA', + 'target_subfragment': 'V4', + 'instrument_model': 'Illumina MiSeq', + 'library_construction_protocol': 'AAAA', + 'experiment_design_description': 'BBBB'}}, + orient='index', dtype=str) + pt = qdb.metadata_template.prep_template.PrepTemplate.create( + metadata, qdb.study.Study(1), "18S") + fd, fp = mkstemp(suffix='_seqs.fastq') + close(fd) + self._files_to_remove.append(fp) + with open(fp, 'w') as f: + f.write("test") + fp = [(fp, 1)] + new_root = qdb.artifact.Artifact.create(fp, "FASTQ", prep_template=pt) + self._files_to_remove.extend([x['fp'] for x in new_root.filepaths]) + return new_root + + def _create_child_artifact(self, parents): + """Creates a new artifact with the given parents""" + # Add a child of 2 roots + fd, fp = mkstemp(suffix='_seqs.fna') + close(fd) + self._files_to_remove.append(fp) + with open(fp, 'w') as f: + f.write("test") + fp = [(fp, 4)] + params = qdb.software.Parameters.from_default_params( + qdb.software.DefaultParameters(1), {'input_data': 2}) + new = qdb.artifact.Artifact.create( + fp, "Demultiplexed", parents=parents, + processing_parameters=params) + return new + + def test_find_artifact_roots_is_root_without_children(self): + """Correctly returns the root if the artifact is already the root + and doesn't have any children + """ + sql = "SELECT * FROM qiita.find_artifact_roots(%s)" + + # Add a new root + new_root = self._create_root_artifact() + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql, [new_root.id]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[new_root.id]] + self.assertEqual(obs, exp) + + def test_find_artifact_roots_is_child_multiple_parents_multiple_root(self): + """Correctly returns the roots if the children has multiple roots""" + sql = "SELECT * FROM qiita.find_artifact_roots(%s)" + + new_root = self._create_root_artifact() + + # Add a child of 2 roots + fd, fp = mkstemp(suffix='_seqs.fna') + close(fd) + self._files_to_remove.append(fp) + with open(fp, 'w') as f: + f.write("test") + fp = [(fp, 4)] + params = qdb.software.Parameters.from_default_params( + qdb.software.DefaultParameters(1), {'input_data': 2}) + new = qdb.artifact.Artifact.create( + fp, "Demultiplexed", parents=[qdb.artifact.Artifact(1), new_root], + processing_parameters=params) + self._files_to_remove.extend([x['fp'] for x in new.filepaths]) + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql, [new.id]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[1], [new_root.id]] + self.assertCountEqual(obs, exp) + + def test_artifact_ancestry_root(self): + """Correctly returns the ancestry of a root artifact""" + sql = "SELECT * FROM qiita.artifact_ancestry(%s)" + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql, [1]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [] + self.assertEqual(obs, exp) + + def test_artifact_ancestry_leaf(self): + """Correctly returns the ancestry of a leaf artifact""" + with qdb.sql_connection.TRN: + sql = "SELECT * FROM qiita.artifact_ancestry(%s)" + qdb.sql_connection.TRN.add(sql, [4]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[4, 2], [2, 1]] + self.assertCountEqual(obs, exp) + + def test_artifact_ancestry_leaf_multiple_parents(self): + """Correctly returns the ancestry of a leaf artifact w multiple parents + """ + root = self._create_root_artifact() + parent1 = self._create_child_artifact([root]) + parent2 = self._create_child_artifact([root]) + child = self._create_child_artifact([parent1, parent2]) + + with qdb.sql_connection.TRN: + sql = "SELECT * FROM qiita.artifact_ancestry(%s)" + qdb.sql_connection.TRN.add(sql, [child.id]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[child.id, parent1.id], [child.id, parent2.id], + [parent1.id, root.id], [parent2.id, root.id]] + self.assertCountEqual(obs, exp) + + def test_artifact_ancestry_middle(self): + """Correctly returns the ancestry of an artifact in the middle of the + DAG""" + with qdb.sql_connection.TRN: + sql = "SELECT * FROM qiita.artifact_ancestry(%s)" + qdb.sql_connection.TRN.add(sql, [2]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[2, 1]] + self.assertEqual(obs, exp) + + def test_artifact_descendants_leaf(self): + """Correctly returns the descendants of a leaf artifact""" + with qdb.sql_connection.TRN: + sql = "SELECT * FROM qiita.artifact_descendants(%s)" + qdb.sql_connection.TRN.add(sql, [4]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [] + self.assertEqual(obs, exp) + + def test_artifact_descendants_root(self): + """Correctly returns the descendants of a root artifact""" + with qdb.sql_connection.TRN: + sql = "SELECT * FROM qiita.artifact_descendants(%s)" + qdb.sql_connection.TRN.add(sql, [1]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[2, 1], [3, 1], [4, 2], [5, 2], [6, 2]] + self.assertCountEqual(obs, exp) + + def test_artifact_descendants_middle(self): + """Correctly returns the descendants of an artifact in the middle of + the DAG""" + with qdb.sql_connection.TRN: + sql = "SELECT * FROM qiita.artifact_descendants(%s)" + qdb.sql_connection.TRN.add(sql, [2]) + obs = qdb.sql_connection.TRN.execute_fetchindex() + exp = [[4, 2], [5, 2], [6, 2]] + self.assertCountEqual(obs, exp) + + def test_isnumeric(self): + """Test SQL function isnumeric""" + exp = [['', False], ['.', False], ['.0', True], ['0.', True], + ['0', True], ['1', True], ['123', True], ['123.456', True], + ['abc', False], ['1..2', False], ['1.2.3.4', False], + ['1x234', False], ['1.234e-5', True]] + + sql = ("WITH test(x) AS (" + "VALUES (''), ('.'), ('.0'), ('0.'), ('0'), ('1'), ('123'), " + "('123.456'), ('abc'), ('1..2'), ('1.2.3.4'), ('1x234'), " + "('1.234e-5')) SELECT x, isnumeric(x) FROM test;") + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + obs = qdb.sql_connection.TRN.execute_fetchindex() + self.assertEqual(exp, obs) + + def test_artifact_descendants_with_jobs(self): + """Test SQL function artifact_descendants_with_jobs""" + exp = [['c350b068-add7-49a5-8846-604ac032cc88', 1, 2], + ['d883dab4-503b-45c2-815d-2126ff52dede', 1, 3], + ['a4c4b9b9-20ca-47f5-bd30-725cce71df2b', 2, 4], + ['624dce65-43a5-4156-a4b6-6c1d02114b67', 2, 5], + ['81bbe8d0-b4c2-42eb-ada9-f07c1c91e59f', 2, 6]] + sql = """SELECT * FROM qiita.artifact_descendants_with_jobs(1)""" + with qdb.sql_connection.TRN: + qdb.sql_connection.TRN.add(sql) + obs = qdb.sql_connection.TRN.execute_fetchindex() + + # lopping on results to not test the job id as is randomly generated + for e, o in zip(exp, obs): + self.assertEqual(e[1:], o[1:]) + + +if __name__ == '__main__': + main()