# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from unittest import TestCase, main, skipIf
from os.path import join, basename, exists
from tempfile import mkdtemp
import pandas as pd
from datetime import datetime
from shutil import rmtree, copyfile
from os import path
from glob import glob
from paramiko.ssh_exception import AuthenticationException
from h5py import File
from qiita_files.demux import to_hdf5
from qiita_ware.exceptions import ComputeError
from qiita_ware.commands import submit_EBI, list_remote, download_remote
from qiita_db.util import get_mountpoint
from qiita_db.study import Study, StudyPerson
from qiita_db.software import DefaultParameters, Parameters
from qiita_db.artifact import Artifact
from qiita_db.metadata_template.prep_template import PrepTemplate
from qiita_db.metadata_template.sample_template import SampleTemplate
from qiita_db.user import User
from qiita_core.util import qiita_test_checker
from qiita_core.qiita_settings import qiita_config
@qiita_test_checker()
class SSHTests(TestCase):
def setUp(self):
self.self_dir_path = path.dirname(path.abspath(__file__))
self.remote_dir_path = join(self.self_dir_path,
'test_data/test_remote_dir/')
self.test_ssh_key = join(self.self_dir_path, 'test_data/test_key')
self.test_wrong_key = join(self.self_dir_path, 'test_data/random_key')
self.temp_local_dir = mkdtemp()
self.exp_files = ['test_0.fastq.gz', 'test_1.txt']
def tearDown(self):
rmtree(self.temp_local_dir)
def _get_valid_files(self, folder):
files = []
for x in qiita_config.valid_upload_extension:
files.extend([basename(f) for f in glob(join(folder, '*.%s' % x))])
return files
def test_list_scp_wrong_key(self):
with self.assertRaises(AuthenticationException):
list_remote('scp://runner@localhost:'+self.remote_dir_path,
self.test_wrong_key)
def test_list_scp_nonexist_key(self):
with self.assertRaises(IOError):
list_remote('scp://runner@localhost:'+self.remote_dir_path,
join(self.self_dir_path, 'nokey'))
def test_list_scp(self):
kpath = join(self.temp_local_dir, 'tmp-key')
copyfile(self.test_ssh_key, kpath)
read_file_list = list_remote(
'scp://runner@localhost:'+self.remote_dir_path, kpath)
self.assertCountEqual(read_file_list, self.exp_files)
def test_download_scp(self):
kpath = join(self.temp_local_dir, 'tmp-key')
copyfile(self.test_ssh_key, kpath)
download_remote('scp://runner@localhost:'+self.remote_dir_path,
kpath, self.temp_local_dir)
local_files = self._get_valid_files(self.temp_local_dir)
self.assertCountEqual(local_files, self.exp_files)
self.assertFalse(exists(kpath))
class CommandsTests(TestCase):
def setUp(self):
self.files_to_remove = []
self.temp_dir = mkdtemp()
self.files_to_remove.append(self.temp_dir)
_, self.base_fp = get_mountpoint("preprocessed_data")[0]
def write_demux_files(self, prep_template, generate_hdf5=True):
"""Writes a demux test file to avoid duplication of code"""
fna_fp = join(self.temp_dir, 'seqs.fna')
demux_fp = join(self.temp_dir, 'demux.seqs')
if generate_hdf5:
with open(fna_fp, 'w') as f:
f.write(FASTA_EXAMPLE)
with File(demux_fp, "w") as f:
to_hdf5(fna_fp, f)
else:
with open(demux_fp, 'w') as f:
f.write('')
if prep_template.artifact is None:
ppd = Artifact.create(
[(demux_fp, 6)], "Demultiplexed", prep_template=prep_template)
else:
params = Parameters.from_default_params(
DefaultParameters(1),
{'input_data': prep_template.artifact.id})
ppd = Artifact.create(
[(demux_fp, 6)], "Demultiplexed",
parents=[prep_template.artifact], processing_parameters=params)
return ppd
def generate_new_study_with_preprocessed_data(self):
"""Creates a new study up to the processed data for testing"""
info = {
"timeseries_type_id": 1,
"metadata_complete": True,
"mixs_compliant": True,
"study_alias": "Test EBI",
"study_description": "Study for testing EBI",
"study_abstract": "Study for testing EBI",
"principal_investigator_id": StudyPerson(3),
"lab_person_id": StudyPerson(1)
}
study = Study.create(User('test@foo.bar'), "Test EBI study", info)
metadata_dict = {
'Sample1': {'collection_timestamp': datetime(2015, 6, 1, 7, 0, 0),
'physical_specimen_location': 'location1',
'taxon_id': 9606,
'scientific_name': 'homo sapiens',
'Description': 'Test Sample 1'},
'Sample2': {'collection_timestamp': datetime(2015, 6, 2, 7, 0, 0),
'physical_specimen_location': 'location1',
'taxon_id': 9606,
'scientific_name': 'homo sapiens',
'Description': 'Test Sample 2'},
'Sample3': {'collection_timestamp': datetime(2015, 6, 3, 7, 0, 0),
'physical_specimen_location': 'location1',
'taxon_id': 9606,
'scientific_name': 'homo sapiens',
'Description': 'Test Sample 3'}
}
metadata = pd.DataFrame.from_dict(metadata_dict, orient='index',
dtype=str)
SampleTemplate.create(metadata, study)
metadata_dict = {
'Sample1': {'primer': 'GTGCCAGCMGCCGCGGTAA',
'barcode': 'CGTAGAGCTCTC',
'center_name': 'KnightLab',
'platform': 'Illumina',
'instrument_model': 'Illumina MiSeq',
'library_construction_protocol': 'Protocol ABC',
'experiment_design_description': "Random value 1"},
'Sample2': {'primer': 'GTGCCAGCMGCCGCGGTAA',
'barcode': 'CGTAGAGCTCTA',
'center_name': 'KnightLab',
'platform': 'Illumina',
'instrument_model': 'Illumina MiSeq',
'library_construction_protocol': 'Protocol ABC',
'experiment_design_description': "Random value 2"},
'Sample3': {'primer': 'GTGCCAGCMGCCGCGGTAA',
'barcode': 'CGTAGAGCTCTT',
'center_name': 'KnightLab',
'platform': 'Illumina',
'instrument_model': 'Illumina MiSeq',
'library_construction_protocol': 'Protocol ABC',
'experiment_design_description': "Random value 3"},
}
metadata = pd.DataFrame.from_dict(metadata_dict, orient='index',
dtype=str)
pt = PrepTemplate.create(metadata, study, "16S", 'Metagenomics')
fna_fp = join(self.temp_dir, 'seqs.fna')
demux_fp = join(self.temp_dir, 'demux.seqs')
with open(fna_fp, 'w') as f:
f.write(FASTA_EXAMPLE_2.format(study.id))
with File(demux_fp, 'w') as f:
to_hdf5(fna_fp, f)
ppd = Artifact.create(
[(demux_fp, 6)], "Demultiplexed", prep_template=pt)
return ppd
def test_submit_EBI_step_2_failure(self):
ppd = self.write_demux_files(PrepTemplate(1), True)
pid = ppd.id
with self.assertRaises(ComputeError):
submit_EBI(pid, 'VALIDATE', True)
rmtree(join(self.base_fp, '%d_ebi_submission' % pid), True)
@skipIf(
qiita_config.ebi_seq_xfer_pass == '', 'skip: ascp not configured')
def test_submit_EBI_parse_EBI_reply_failure(self):
ppd = self.write_demux_files(PrepTemplate(1))
pid = ppd.id
with self.assertRaises(ComputeError) as error:
submit_EBI(pid, 'VALIDATE', True)
error = str(error.exception)
self.assertIn('EBI Submission failed! Log id:', error)
self.assertIn('The EBI submission failed:', error)
rmtree(join(self.base_fp, '%d_ebi_submission' % pid), True)
@skipIf(
qiita_config.ebi_seq_xfer_pass == '', 'skip: ascp not configured')
def test_full_submission(self):
artifact = self.generate_new_study_with_preprocessed_data()
self.assertEqual(
artifact.study.ebi_submission_status, 'not submitted')
aid = artifact.id
submit_EBI(aid, 'VALIDATE', True, test=True)
self.assertEqual(artifact.study.ebi_submission_status, 'submitted')
rmtree(join(self.base_fp, '%d_ebi_submission' % aid), True)
def test_max_ebiena_curl_error(self):
artifact = self.generate_new_study_with_preprocessed_data()
self.assertEqual(
artifact.study.ebi_submission_status, 'not submitted')
aid = artifact.id
with self.assertRaises(ComputeError) as error:
submit_EBI(aid, 'VALIDATE', True, test_size=True)
error = str(error.exception)
self.assertIn('is too large. Before cleaning:', error)
rmtree(join(self.base_fp, '%d_ebi_submission' % aid), True)
FASTA_EXAMPLE = """>1.SKB2.640194_1 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKB2.640194_2 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKB2.640194_3 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKM4.640180_4 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKM4.640180_5 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKB3.640195_6 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKB6.640176_7 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKD6.640190_8 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKM6.640187_9 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKD9.640182_10 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKM8.640201_11 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>1.SKM2.640199_12 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
"""
FASTA_EXAMPLE_2 = """>{0}.Sample1_1 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>{0}.Sample1_2 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>{0}.Sample1_3 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>{0}.Sample2_4 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>{0}.Sample2_5 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>{0}.Sample2_6 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>{0}.Sample3_7 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>{0}.Sample3_8 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
>{0}.Sample3_9 X orig_bc=X new_bc=X bc_diffs=0
CCACCCAGTAAC
"""
if __name__ == '__main__':
main()