[879b32]: / qiita_db / test / test_sql.py

Download this file

261 lines (230 with data), 10.9 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from unittest import TestCase, main
from tempfile import mkstemp
from os import close, remove
from os.path import exists
import pandas as pd
from qiita_core.util import qiita_test_checker
import qiita_db as qdb
@qiita_test_checker()
class TestSQL(TestCase):
"""Tests that the database triggers and procedures work properly"""
def setUp(self):
self._files_to_remove = []
def tearDown(self):
for fp in self._files_to_remove:
if exists(fp):
remove(fp)
def test_find_artifact_roots_is_root(self):
"""Correctly returns the root if the artifact is already the root"""
with qdb.sql_connection.TRN:
sql = "SELECT * FROM qiita.find_artifact_roots(%s)"
qdb.sql_connection.TRN.add(sql, [1])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[1]]
self.assertEqual(obs, exp)
def test_find_artifact_roots_is_child(self):
"""Correctly returns the root if the artifact is a child"""
with qdb.sql_connection.TRN:
sql = "SELECT * FROM qiita.find_artifact_roots(%s)"
qdb.sql_connection.TRN.add(sql, [4])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[1]]
self.assertEqual(obs, exp)
def test_find_artifact_roots_is_child_multiple_parents_one_root(self):
"""Correctly returns the roots if the children has multiple parents
but a single root
"""
fd, fp = mkstemp(suffix='_table.biom')
close(fd)
self._files_to_remove.append(fp)
with open(fp, 'w') as f:
f.write("test")
fp = [(fp, 7)]
params = qdb.software.Parameters.from_default_params(
qdb.software.DefaultParameters(10), {'input_data': 2})
new = qdb.artifact.Artifact.create(
fp, "BIOM",
parents=[qdb.artifact.Artifact(2), qdb.artifact.Artifact(3)],
processing_parameters=params)
self._files_to_remove.extend([x['fp'] for x in new.filepaths])
with qdb.sql_connection.TRN:
sql = "SELECT * FROM qiita.find_artifact_roots(%s)"
qdb.sql_connection.TRN.add(sql, [new.id])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[1]]
self.assertEqual(obs, exp)
def _create_root_artifact(self):
"""Creates a new root artifact"""
metadata = pd.DataFrame.from_dict(
{'SKB8.640193': {'center_name': 'ANL',
'primer': 'GTGCCAGCMGCCGCGGTAA',
'barcode': 'GTCCGCAAGTTA',
'run_prefix': "s_G1_L001_sequences",
'platform': 'Illumina',
'target_gene': '16S rRNA',
'target_subfragment': 'V4',
'instrument_model': 'Illumina MiSeq',
'library_construction_protocol': 'AAAA',
'experiment_design_description': 'BBBB'}},
orient='index', dtype=str)
pt = qdb.metadata_template.prep_template.PrepTemplate.create(
metadata, qdb.study.Study(1), "18S")
fd, fp = mkstemp(suffix='_seqs.fastq')
close(fd)
self._files_to_remove.append(fp)
with open(fp, 'w') as f:
f.write("test")
fp = [(fp, 1)]
new_root = qdb.artifact.Artifact.create(fp, "FASTQ", prep_template=pt)
self._files_to_remove.extend([x['fp'] for x in new_root.filepaths])
return new_root
def _create_child_artifact(self, parents):
"""Creates a new artifact with the given parents"""
# Add a child of 2 roots
fd, fp = mkstemp(suffix='_seqs.fna')
close(fd)
self._files_to_remove.append(fp)
with open(fp, 'w') as f:
f.write("test")
fp = [(fp, 4)]
params = qdb.software.Parameters.from_default_params(
qdb.software.DefaultParameters(1), {'input_data': 2})
new = qdb.artifact.Artifact.create(
fp, "Demultiplexed", parents=parents,
processing_parameters=params)
return new
def test_find_artifact_roots_is_root_without_children(self):
"""Correctly returns the root if the artifact is already the root
and doesn't have any children
"""
sql = "SELECT * FROM qiita.find_artifact_roots(%s)"
# Add a new root
new_root = self._create_root_artifact()
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql, [new_root.id])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[new_root.id]]
self.assertEqual(obs, exp)
def test_find_artifact_roots_is_child_multiple_parents_multiple_root(self):
"""Correctly returns the roots if the children has multiple roots"""
sql = "SELECT * FROM qiita.find_artifact_roots(%s)"
new_root = self._create_root_artifact()
# Add a child of 2 roots
fd, fp = mkstemp(suffix='_seqs.fna')
close(fd)
self._files_to_remove.append(fp)
with open(fp, 'w') as f:
f.write("test")
fp = [(fp, 4)]
params = qdb.software.Parameters.from_default_params(
qdb.software.DefaultParameters(1), {'input_data': 2})
new = qdb.artifact.Artifact.create(
fp, "Demultiplexed", parents=[qdb.artifact.Artifact(1), new_root],
processing_parameters=params)
self._files_to_remove.extend([x['fp'] for x in new.filepaths])
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql, [new.id])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[1], [new_root.id]]
self.assertCountEqual(obs, exp)
def test_artifact_ancestry_root(self):
"""Correctly returns the ancestry of a root artifact"""
sql = "SELECT * FROM qiita.artifact_ancestry(%s)"
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql, [1])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = []
self.assertEqual(obs, exp)
def test_artifact_ancestry_leaf(self):
"""Correctly returns the ancestry of a leaf artifact"""
with qdb.sql_connection.TRN:
sql = "SELECT * FROM qiita.artifact_ancestry(%s)"
qdb.sql_connection.TRN.add(sql, [4])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[4, 2], [2, 1]]
self.assertCountEqual(obs, exp)
def test_artifact_ancestry_leaf_multiple_parents(self):
"""Correctly returns the ancestry of a leaf artifact w multiple parents
"""
root = self._create_root_artifact()
parent1 = self._create_child_artifact([root])
parent2 = self._create_child_artifact([root])
child = self._create_child_artifact([parent1, parent2])
with qdb.sql_connection.TRN:
sql = "SELECT * FROM qiita.artifact_ancestry(%s)"
qdb.sql_connection.TRN.add(sql, [child.id])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[child.id, parent1.id], [child.id, parent2.id],
[parent1.id, root.id], [parent2.id, root.id]]
self.assertCountEqual(obs, exp)
def test_artifact_ancestry_middle(self):
"""Correctly returns the ancestry of an artifact in the middle of the
DAG"""
with qdb.sql_connection.TRN:
sql = "SELECT * FROM qiita.artifact_ancestry(%s)"
qdb.sql_connection.TRN.add(sql, [2])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[2, 1]]
self.assertEqual(obs, exp)
def test_artifact_descendants_leaf(self):
"""Correctly returns the descendants of a leaf artifact"""
with qdb.sql_connection.TRN:
sql = "SELECT * FROM qiita.artifact_descendants(%s)"
qdb.sql_connection.TRN.add(sql, [4])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = []
self.assertEqual(obs, exp)
def test_artifact_descendants_root(self):
"""Correctly returns the descendants of a root artifact"""
with qdb.sql_connection.TRN:
sql = "SELECT * FROM qiita.artifact_descendants(%s)"
qdb.sql_connection.TRN.add(sql, [1])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[2, 1], [3, 1], [4, 2], [5, 2], [6, 2]]
self.assertCountEqual(obs, exp)
def test_artifact_descendants_middle(self):
"""Correctly returns the descendants of an artifact in the middle of
the DAG"""
with qdb.sql_connection.TRN:
sql = "SELECT * FROM qiita.artifact_descendants(%s)"
qdb.sql_connection.TRN.add(sql, [2])
obs = qdb.sql_connection.TRN.execute_fetchindex()
exp = [[4, 2], [5, 2], [6, 2]]
self.assertCountEqual(obs, exp)
def test_isnumeric(self):
"""Test SQL function isnumeric"""
exp = [['', False], ['.', False], ['.0', True], ['0.', True],
['0', True], ['1', True], ['123', True], ['123.456', True],
['abc', False], ['1..2', False], ['1.2.3.4', False],
['1x234', False], ['1.234e-5', True]]
sql = ("WITH test(x) AS ("
"VALUES (''), ('.'), ('.0'), ('0.'), ('0'), ('1'), ('123'), "
"('123.456'), ('abc'), ('1..2'), ('1.2.3.4'), ('1x234'), "
"('1.234e-5')) SELECT x, isnumeric(x) FROM test;")
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)
obs = qdb.sql_connection.TRN.execute_fetchindex()
self.assertEqual(exp, obs)
def test_artifact_descendants_with_jobs(self):
"""Test SQL function artifact_descendants_with_jobs"""
exp = [['c350b068-add7-49a5-8846-604ac032cc88', 1, 2],
['d883dab4-503b-45c2-815d-2126ff52dede', 1, 3],
['a4c4b9b9-20ca-47f5-bd30-725cce71df2b', 2, 4],
['624dce65-43a5-4156-a4b6-6c1d02114b67', 2, 5],
['81bbe8d0-b4c2-42eb-ada9-f07c1c91e59f', 2, 6]]
sql = """SELECT * FROM qiita.artifact_descendants_with_jobs(1)"""
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)
obs = qdb.sql_connection.TRN.execute_fetchindex()
# lopping on results to not test the job id as is randomly generated
for e, o in zip(exp, obs):
self.assertEqual(e[1:], o[1:])
if __name__ == '__main__':
main()