Diff of /qiita_ware/commands.py [000000] .. [879b32]

Switch to unified view

a b/qiita_ware/commands.py
1
# -----------------------------------------------------------------------------
2
# Copyright (c) 2014--, The Qiita Development Team.
3
#
4
# Distributed under the terms of the BSD 3-clause License.
5
#
6
# The full license is in the file LICENSE, distributed with this software.
7
# -----------------------------------------------------------------------------
8
9
from os.path import basename, isdir, join, exists
10
from shutil import rmtree
11
from tarfile import open as taropen
12
from tempfile import mkdtemp
13
from os import environ, stat, remove
14
from traceback import format_exc
15
from paramiko import AutoAddPolicy, RSAKey, SSHClient
16
from scp import SCPClient
17
from urllib.parse import urlparse
18
from functools import partial
19
import pandas as pd
20
21
from qiita_db.artifact import Artifact
22
from qiita_db.logger import LogEntry
23
from qiita_db.processing_job import _system_call as system_call
24
from qiita_core.qiita_settings import qiita_config
25
from qiita_ware.ebi import EBISubmission
26
from qiita_ware.exceptions import ComputeError, EBISubmissionError
27
28
29
def _ssh_session(p_url, private_key):
30
    """Initializes an SSH session
31
32
    Parameters
33
    ----------
34
    p_url : urlparse object
35
        a parsed url
36
    private_key : str
37
        Path to the private key used to authenticate connection
38
39
    Returns
40
    -------
41
    paramiko.SSHClient
42
        the SSH session
43
    """
44
    scheme = p_url.scheme
45
    hostname = p_url.hostname
46
47
    port = p_url.port
48
    username = p_url.username
49
50
    if scheme == 'scp':
51
        # if port not specified, use default 22 as port
52
        if port is None:
53
            port = 22
54
55
        # step 1: both schemes require an SSH connection
56
        ssh = SSHClient()
57
        ssh.load_system_host_keys()
58
        ssh.set_missing_host_key_policy(AutoAddPolicy)
59
60
        # step 2: connect to fileserver
61
        key = RSAKey.from_private_key_file(private_key)
62
        ssh.connect(hostname, port=port, username=username,
63
                    pkey=key, look_for_keys=False)
64
        return ssh
65
    else:
66
        raise ValueError(
67
            'Not valid scheme. Valid options is scp.')
68
69
70
def _list_valid_files(ssh, directory):
71
    """Gets a list of valid study files from ssh session
72
73
    Parameters
74
    ----------
75
    ssh : paramiko.SSHClient
76
        An initializeed ssh session
77
    directory : str
78
        the directory to search for files
79
80
    Returns
81
    -------
82
    list of str
83
        list of valid study files (basenames)
84
    """
85
    valid_file_extensions = tuple(qiita_config.valid_upload_extension)
86
87
    stdin, stdout, stderr = ssh.exec_command('ls %s' % directory)
88
    stderr = stderr.read().decode("utf-8")
89
    if stderr:
90
        raise ValueError(stderr)
91
    files = stdout.read().decode("utf-8").split('\n')
92
93
    valid_files = [f for f in files if f.endswith(valid_file_extensions)]
94
95
    return valid_files
96
97
98
def list_remote(URL, private_key):
99
    """Retrieve valid study files from a remote directory
100
101
    Parameters
102
    ----------
103
    URL : str
104
        The url to the remote directory
105
    private_key : str
106
        Path to the private key used to authenticate connection
107
108
    Returns
109
    -------
110
    list of str
111
        list of files that are valid study files
112
113
    Notes
114
    -----
115
    Only the allowed extensions described by the config file
116
    will be listed.
117
    """
118
    p_url = urlparse(URL)
119
    directory = p_url.path
120
    try:
121
        ssh = _ssh_session(p_url, private_key)
122
        valid_files = _list_valid_files(ssh, directory)
123
        ssh.close()
124
    except Exception as ex:
125
        raise ex
126
    finally:
127
        # for security, remove key
128
        if exists(private_key):
129
            remove(private_key)
130
131
    return valid_files
132
133
134
def download_remote(URL, private_key, destination):
135
    """Add study files by specifying a remote directory to download from
136
137
    Parameters
138
    ----------
139
    URL : str
140
        The url to the remote directory
141
    private_key : str
142
        Path to the private key used to authenticate connection
143
    destination : str
144
        The path to the study upload folder
145
    """
146
147
    # step 1: initialize connection and list valid files
148
    p_url = urlparse(URL)
149
    ssh = _ssh_session(p_url, private_key)
150
151
    directory = p_url.path
152
    valid_files = _list_valid_files(ssh, directory)
153
    file_paths = [join(directory, f) for f in valid_files]
154
155
    # step 2: download files
156
    scheme = p_url.scheme
157
    if scheme == 'scp':
158
        scp = SCPClient(ssh.get_transport())
159
        for f in file_paths:
160
            download = partial(
161
                scp.get, local_path=join(destination, basename(f)))
162
            download(f)
163
164
    # step 3: close the connection
165
    ssh.close()
166
167
    # for security, remove key
168
    if exists(private_key):
169
        remove(private_key)
170
171
172
def submit_EBI(artifact_id, action, send, test=False, test_size=False):
173
    """Submit an artifact to EBI
174
175
    Parameters
176
    ----------
177
    artifact_id : int
178
        The artifact id
179
    action : %s
180
        The action to perform with this data
181
    send : bool
182
        True to actually send the files
183
    test : bool
184
        If True some restrictions will be ignored, only used in parse_EBI_reply
185
    test_size : bool
186
        If True the EBI-ENA restriction size will be changed to 6000
187
    """
188
    # step 1: init and validate
189
    ebi_submission = EBISubmission(artifact_id, action)
190
191
    # step 2: generate demux fastq files
192
    try:
193
        ebi_submission.generate_demultiplexed_fastq()
194
    except Exception:
195
        error_msg = format_exc()
196
        if isdir(ebi_submission.full_ebi_dir):
197
            rmtree(ebi_submission.full_ebi_dir)
198
        LogEntry.create('Runtime', error_msg,
199
                        info={'ebi_submission': artifact_id})
200
        raise
201
202
    # step 3: generate and write xml files
203
    ebi_submission.generate_xml_files()
204
205
    # before we continue let's check the size of the submission
206
    to_review = [ebi_submission.study_xml_fp,
207
                 ebi_submission.sample_xml_fp,
208
                 ebi_submission.experiment_xml_fp,
209
                 ebi_submission.run_xml_fp,
210
                 ebi_submission.submission_xml_fp]
211
    total_size = sum([stat(tr).st_size for tr in to_review if tr is not None])
212
    # note that the max for EBI is 10M but let's play it safe
213
    max_size = 10e+6 if not test_size else 5000
214
    if total_size > max_size:
215
        LogEntry.create(
216
            'Runtime', 'The submission: %d is larger than allowed (%d), will '
217
            'try to fix: %d' % (artifact_id, max_size, total_size))
218
219
        def _reduce_metadata(low=0.01, high=0.5):
220
            # helper function to
221
            # transform current metadata to dataframe for easier curation
222
            rows = {k: dict(v) for k, v in ebi_submission.samples.items()}
223
            df = pd.DataFrame.from_dict(rows, orient='index')
224
            # remove unique columns and same value in all columns
225
            nunique = df.apply(pd.Series.nunique)
226
            nsamples = len(df.index)
227
            cols_to_drop = set(
228
                nunique[(nunique == 1) | (nunique == nsamples)].index)
229
            # maximize deletion by removing also columns that are almost all
230
            # the same or almost all unique
231
            cols_to_drop = set(
232
                nunique[(nunique <= int(nsamples * low)) |
233
                        (nunique >= int(nsamples * high))].index)
234
            cols_to_drop = cols_to_drop - {'taxon_id', 'scientific_name',
235
                                           'description', 'country',
236
                                           'collection_date'}
237
            all_samples = ebi_submission.sample_template.ebi_sample_accessions
238
239
            if action == 'ADD':
240
                samples = [k for k in ebi_submission.samples
241
                           if all_samples[k] is None]
242
            else:
243
                samples = [k for k in ebi_submission.samples
244
                           if all_samples[k] is not None]
245
            if samples:
246
                ebi_submission.write_xml_file(
247
                    ebi_submission.generate_sample_xml(samples, cols_to_drop),
248
                    ebi_submission.sample_xml_fp)
249
250
        # let's try with the default pameters
251
        _reduce_metadata()
252
        # now let's recalculate the size to make sure it's fine
253
        new_total_size = sum([stat(tr).st_size
254
                              for tr in to_review if tr is not None])
255
        LogEntry.create(
256
            'Runtime',
257
            'The submission: %d after defaul cleaning is %d and was %d' % (
258
                artifact_id, total_size, new_total_size))
259
        if new_total_size > max_size:
260
            LogEntry.create(
261
                'Runtime', 'Submission %d still too big, will try more '
262
                'stringent parameters' % (artifact_id))
263
264
            _reduce_metadata(0.05, 0.4)
265
            new_total_size = sum([stat(tr).st_size
266
                                  for tr in to_review if tr is not None])
267
            LogEntry.create(
268
                'Runtime',
269
                'The submission: %d after defaul cleaning is %d and was %d' % (
270
                    artifact_id, total_size, new_total_size))
271
            if new_total_size > max_size:
272
                raise ComputeError(
273
                    'Even after cleaning the submission: %d is too large. '
274
                    'Before cleaning: %d, after: %d' % (
275
                        artifact_id, total_size, new_total_size))
276
277
    st_acc, sa_acc, bio_acc, ex_acc, run_acc = None, None, None, None, None
278
    if send:
279
        # getting aspera's password
280
        old_ascp_pass = environ.get('ASPERA_SCP_PASS', '')
281
        if old_ascp_pass == '':
282
            environ['ASPERA_SCP_PASS'] = qiita_config.ebi_seq_xfer_pass
283
        ascp_passwd = environ['ASPERA_SCP_PASS']
284
        LogEntry.create('Runtime',
285
                        ('Submission of sequences of pre_processed_id: '
286
                         '%d completed successfully' % artifact_id))
287
288
        # step 4: sending sequences
289
        if action != 'MODIFY':
290
            LogEntry.create('Runtime',
291
                            ("Submitting sequences for pre_processed_id: "
292
                             "%d" % artifact_id))
293
            for cmd in ebi_submission.generate_send_sequences_cmd():
294
                stdout, stderr, rv = system_call(cmd)
295
                if rv != 0:
296
                    error_msg = ("ASCP Error:\nStd output:%s\nStd error:%s" % (
297
                        stdout, stderr))
298
                    environ['ASPERA_SCP_PASS'] = old_ascp_pass
299
                    raise ComputeError(error_msg)
300
                open(ebi_submission.ascp_reply, 'a').write(
301
                    'stdout:\n%s\n\nstderr: %s' % (stdout, stderr))
302
        environ['ASPERA_SCP_PASS'] = old_ascp_pass
303
304
        # step 5: sending xml
305
        xmls_cmds = ebi_submission.generate_curl_command(
306
            ebi_seq_xfer_pass=ascp_passwd)
307
        LogEntry.create('Runtime',
308
                        ("Submitting XMLs for pre_processed_id: "
309
                         "%d" % artifact_id))
310
        xml_content, stderr, rv = system_call(xmls_cmds)
311
        if rv != 0:
312
            error_msg = ("Error:\nStd output:%s\nStd error:%s" % (
313
                xml_content, stderr))
314
            raise ComputeError(error_msg)
315
        else:
316
            LogEntry.create('Runtime',
317
                            ('Submission of sequences of pre_processed_id: '
318
                             '%d completed successfully' % artifact_id))
319
        open(ebi_submission.curl_reply, 'w').write(
320
            'stdout:\n%s\n\nstderr: %s' % (xml_content, stderr))
321
322
        # parsing answer / only if adding
323
        if action == 'ADD' or test:
324
            try:
325
                st_acc, sa_acc, bio_acc, ex_acc, run_acc = \
326
                    ebi_submission.parse_EBI_reply(xml_content, test=test)
327
            except EBISubmissionError as e:
328
                error = str(e)
329
                le = LogEntry.create(
330
                    'Fatal', "Command: %s\nError: %s\n" % (xml_content, error),
331
                    info={'ebi_submission': artifact_id})
332
                raise ComputeError(
333
                    "EBI Submission failed! Log id: %d\n%s" % (le.id, error))
334
335
            if st_acc:
336
                ebi_submission.study.ebi_study_accession = st_acc
337
            if sa_acc:
338
                ebi_submission.sample_template.ebi_sample_accessions = sa_acc
339
            if bio_acc:
340
                ebi_submission.sample_template.biosample_accessions = bio_acc
341
            if ex_acc:
342
                ebi_submission.prep_template.ebi_experiment_accessions = ex_acc
343
            ebi_submission.artifact.ebi_run_accessions = run_acc
344
345
    return st_acc, sa_acc, bio_acc, ex_acc, run_acc
346
347
348
def submit_VAMPS(artifact_id):
349
    """Submit artifact to VAMPS
350
351
    Parameters
352
    ----------
353
    artifact_id : int
354
        The artifact id
355
356
    Raises
357
    ------
358
    ComputeError
359
        - If the artifact cannot be submitted to VAMPS
360
        - If the artifact is associated with more than one prep template
361
    """
362
    artifact = Artifact(artifact_id)
363
    if not artifact.can_be_submitted_to_vamps:
364
        raise ComputeError("Artifact %d cannot be submitted to VAMPS"
365
                           % artifact_id)
366
    study = artifact.study
367
    sample_template = study.sample_template
368
    prep_templates = artifact.prep_templates
369
    if len(prep_templates) > 1:
370
        raise ComputeError(
371
            "Multiple prep templates associated with the artifact: %s"
372
            % artifact_id)
373
    prep_template = prep_templates[0]
374
375
    # Also need to check that is not submitting (see item in #1523)
376
    if artifact.is_submitted_to_vamps:
377
        raise ValueError("Cannot resubmit artifact %s to VAMPS!" % artifact_id)
378
379
    # Generating a tgz
380
    targz_folder = mkdtemp(prefix=qiita_config.working_dir)
381
    targz_fp = join(targz_folder, '%d_%d_%d.tgz' % (study.id,
382
                                                    prep_template.id,
383
                                                    artifact_id))
384
    targz = taropen(targz_fp, mode='w:gz')
385
386
    # adding sample/prep
387
    samp_fp = join(targz_folder, 'sample_metadata.txt')
388
    sample_template.to_file(samp_fp)
389
    targz.add(samp_fp, arcname='sample_metadata.txt')
390
    prep_fp = join(targz_folder, 'prep_metadata.txt')
391
    prep_template.to_file(prep_fp)
392
    targz.add(prep_fp, arcname='prep_metadata.txt')
393
394
    # adding preprocessed data
395
    for x in artifact.filepaths:
396
        if x['fp_type'] == 'preprocessed_fasta':
397
            targz.add(x['fp'], arcname='preprocessed_fasta.fna')
398
399
    targz.close()
400
401
    # submitting
402
    cmd = ("curl -F user=%s -F pass='%s' -F uploadFile=@%s -F "
403
           "press=UploadFile %s" % (qiita_config.vamps_user,
404
                                    qiita_config.vamps_pass,
405
                                    targz_fp,
406
                                    qiita_config.vamps_url))
407
    obs, stderr, rv = system_call(cmd)
408
    if rv != 0:
409
        error_msg = ("Error:\nStd output:%s\nStd error:%s" % (obs, stderr))
410
        raise ComputeError(error_msg)
411
412
    exp = ("<html>\n<head>\n<title>Process Uploaded File</title>\n</head>\n"
413
           "<body>\n</body>\n</html>")
414
415
    if obs != exp:
416
        return False
417
    else:
418
        artifact.is_submitted_to_vamps = True
419
        return True