Diff of /qiita_db/artifact.py [000000] .. [879b32]

Switch to unified view

a b/qiita_db/artifact.py
1
# -----------------------------------------------------------------------------
2
# Copyright (c) 2014--, The Qiita Development Team.
3
#
4
# Distributed under the terms of the BSD 3-clause License.
5
#
6
# The full license is in the file LICENSE, distributed with this software.
7
# -----------------------------------------------------------------------------
8
from itertools import chain
9
from datetime import datetime
10
from os import remove
11
from os.path import isfile, relpath
12
from shutil import rmtree
13
from collections import namedtuple
14
from json import dumps
15
from qiita_db.util import create_nested_path
16
17
import networkx as nx
18
19
import qiita_db as qdb
20
21
from qiita_core.qiita_settings import qiita_config
22
23
24
TypeNode = namedtuple('TypeNode', ['id', 'job_id', 'name', 'type'])
25
26
27
class Artifact(qdb.base.QiitaObject):
28
    r"""Any kind of file (or group of files) stored in the system and its
29
    attributes
30
31
    Attributes
32
    ----------
33
    timestamp
34
    processing_parameters
35
    visibility
36
    artifact_type
37
    data_type
38
    can_be_submitted_to_ebi
39
    can_be_submitted_to_vamps
40
    is_submitted_to_vamps
41
    filepaths
42
    parents
43
    prep_template
44
    ebi_run_accession
45
    study
46
    has_human
47
48
    Methods
49
    -------
50
    create
51
    delete
52
    being_deleted_by
53
    archive
54
55
    See Also
56
    --------
57
    qiita_db.QiitaObject
58
    """
59
    _table = "artifact"
60
61
    @classmethod
62
    def iter(cls):
63
        """Iterate over all artifacts in the database
64
65
        Returns
66
        -------
67
        generator
68
            Yields a `Artifact` object for each artifact in the database,
69
            in order of ascending artifact_id
70
        """
71
        with qdb.sql_connection.TRN:
72
            sql = """SELECT artifact_id FROM qiita.{}
73
                     ORDER BY artifact_id""".format(cls._table)
74
            qdb.sql_connection.TRN.add(sql)
75
76
            ids = qdb.sql_connection.TRN.execute_fetchflatten()
77
78
        for id_ in ids:
79
            yield Artifact(id_)
80
81
    @classmethod
82
    def iter_by_visibility(cls, visibility):
83
        r"""Iterator over the artifacts with the given visibility
84
85
        Parameters
86
        ----------
87
        visibility : str
88
            The visibility level
89
90
        Returns
91
        -------
92
        generator of qiita_db.artifact.Artifact
93
            The artifacts available in the system with the given visibility
94
        """
95
        with qdb.sql_connection.TRN:
96
            sql = """SELECT artifact_id
97
                     FROM qiita.artifact
98
                        JOIN qiita.visibility USING (visibility_id)
99
                     WHERE visibility = %s
100
                     ORDER BY artifact_id"""
101
            qdb.sql_connection.TRN.add(sql, [visibility])
102
            for a_id in qdb.sql_connection.TRN.execute_fetchflatten():
103
                yield cls(a_id)
104
105
    @staticmethod
106
    def types():
107
        """Returns list of all artifact types available and their descriptions
108
109
        Returns
110
        -------
111
        list of list of str
112
            The artifact type and description of the artifact type, in the form
113
            [[artifact_type, description, can_be_submitted_to_ebi,
114
              can_be_submitted_to_vamps, is_user_uploadable], ...]
115
        """
116
        with qdb.sql_connection.TRN:
117
            sql = """SELECT artifact_type, description,
118
                            can_be_submitted_to_ebi,
119
                            can_be_submitted_to_vamps, is_user_uploadable
120
                     FROM qiita.artifact_type
121
                     ORDER BY artifact_type"""
122
            qdb.sql_connection.TRN.add(sql)
123
            return qdb.sql_connection.TRN.execute_fetchindex()
124
125
    @staticmethod
126
    def create_type(name, description, can_be_submitted_to_ebi,
127
                    can_be_submitted_to_vamps, is_user_uploadable,
128
                    filepath_types):
129
        """Creates a new artifact type in the system
130
131
        Parameters
132
        ----------
133
        name : str
134
            The artifact type name
135
        description : str
136
            The artifact type description
137
        can_be_submitted_to_ebi : bool
138
            Whether the artifact type can be submitted to EBI or not
139
        can_be_submitted_to_vamps : bool
140
            Whether the artifact type can be submitted to VAMPS or not
141
        is_user_uploadable : bool
142
            Whether the artifact type can be raw: upload directly to qiita
143
        filepath_types : list of (str, bool)
144
            The list filepath types that the new artifact type supports, and
145
            if they're required or not in an artifact instance of this type
146
147
        Raises
148
        ------
149
        qiita_db.exceptions.QiitaDBDuplicateError
150
            If an artifact type with the same name already exists
151
        """
152
        with qdb.sql_connection.TRN:
153
            sql = """SELECT EXISTS(
154
                        SELECT *
155
                        FROM qiita.artifact_type
156
                        WHERE artifact_type=%s)"""
157
            qdb.sql_connection.TRN.add(sql, [name])
158
            if qdb.sql_connection.TRN.execute_fetchlast():
159
                raise qdb.exceptions.QiitaDBDuplicateError(
160
                    'artifact type', 'name: %s' % name)
161
            sql = """INSERT INTO qiita.artifact_type
162
                        (artifact_type, description, can_be_submitted_to_ebi,
163
                         can_be_submitted_to_vamps, is_user_uploadable)
164
                     VALUES (%s, %s, %s, %s, %s)
165
                     RETURNING artifact_type_id"""
166
            qdb.sql_connection.TRN.add(
167
                sql, [name, description, can_be_submitted_to_ebi,
168
                      can_be_submitted_to_vamps, is_user_uploadable])
169
            at_id = qdb.sql_connection.TRN.execute_fetchlast()
170
            sql = """INSERT INTO qiita.artifact_type_filepath_type
171
                        (artifact_type_id, filepath_type_id, required)
172
                     VALUES (%s, %s, %s)"""
173
            sql_args = [
174
                [at_id, qdb.util.convert_to_id(fpt, 'filepath_type'), req]
175
                for fpt, req in filepath_types]
176
            qdb.sql_connection.TRN.add(sql, sql_args, many=True)
177
178
            # When creating a type is expected that a new mountpoint is created
179
            # for that type, note that we are going to check if there is an
180
            # extra path for the mountpoint, which is useful for the test
181
            # environment
182
            qc = qiita_config
183
            mp = relpath(qc.working_dir, qc.base_data_dir).replace(
184
                'working_dir', '')
185
            mp = mp + name if mp != '/' and mp != '' else name
186
            sql = """INSERT INTO qiita.data_directory
187
                        (data_type, mountpoint, subdirectory, active)
188
                        VALUES (%s, %s, %s, %s)"""
189
            qdb.sql_connection.TRN.add(sql, [name, mp, True, True])
190
191
            # We are intersted in the dirpath
192
            create_nested_path(qdb.util.get_mountpoint(name)[0][1])
193
194
            qdb.sql_connection.TRN.execute()
195
196
    @classmethod
197
    def copy(cls, artifact, prep_template):
198
        """Creates a copy of `artifact` and attaches it to `prep_template`
199
200
        Parameters
201
        ----------
202
        artifact : qiita_db.artifact.Artifact
203
            Artifact to copy from
204
        prep_template : qiita_db.metadata_template.prep_template.PrepTemplate
205
            The prep template to attach the new artifact to
206
207
        Returns
208
        -------
209
        qiita_db.artifact.Artifact
210
            A new instance of Artifact
211
        """
212
        with qdb.sql_connection.TRN:
213
            visibility_id = qdb.util.convert_to_id("sandbox", "visibility")
214
            atype = artifact.artifact_type
215
            atype_id = qdb.util.convert_to_id(atype, "artifact_type")
216
            dtype_id = qdb.util.convert_to_id(
217
                prep_template.data_type(), "data_type")
218
            sql = """INSERT INTO qiita.artifact (
219
                        generated_timestamp, visibility_id, artifact_type_id,
220
                        data_type_id, submitted_to_vamps)
221
                     VALUES (%s, %s, %s, %s, %s)
222
                     RETURNING artifact_id"""
223
            sql_args = [datetime.now(), visibility_id, atype_id, dtype_id,
224
                        False]
225
            qdb.sql_connection.TRN.add(sql, sql_args)
226
            a_id = qdb.sql_connection.TRN.execute_fetchlast()
227
228
            # Associate the artifact with the prep template
229
            instance = cls(a_id)
230
            prep_template.artifact = instance
231
232
            # Associate the artifact with the study
233
            sql = """INSERT INTO qiita.study_artifact (study_id, artifact_id)
234
                     VALUES (%s, %s)"""
235
            sql_args = [prep_template.study_id, a_id]
236
            qdb.sql_connection.TRN.add(sql, sql_args)
237
238
            # Associate the artifact with the preparation information
239
            sql = """INSERT INTO qiita.preparation_artifact (
240
                        prep_template_id, artifact_id)
241
                     VALUES (%s, %s)"""
242
            sql_args = [prep_template.id, a_id]
243
            qdb.sql_connection.TRN.add(sql, sql_args)
244
245
            # Associate the artifact with its filepaths
246
            filepaths = [(x['fp'], x['fp_type']) for x in artifact.filepaths]
247
            fp_ids = qdb.util.insert_filepaths(
248
                filepaths, a_id, atype, copy=True)
249
            sql = """INSERT INTO qiita.artifact_filepath
250
                        (artifact_id, filepath_id)
251
                     VALUES (%s, %s)"""
252
            sql_args = [[a_id, fp_id] for fp_id in fp_ids]
253
            qdb.sql_connection.TRN.add(sql, sql_args, many=True)
254
            qdb.sql_connection.TRN.execute()
255
256
        return instance
257
258
    @classmethod
259
    def create(cls, filepaths, artifact_type, name=None, prep_template=None,
260
               parents=None, processing_parameters=None, move_files=True,
261
               analysis=None, data_type=None):
262
        r"""Creates a new artifact in the system
263
264
        The parameters depend on how the artifact was generated:
265
            - If the artifact was uploaded by the user, the parameter
266
            `prep_template` should be provided and the parameters `parents`,
267
            `processing_parameters` and `analysis` should not be provided.
268
            - If the artifact was generated by processing one or more
269
            artifacts, the parameters `parents` and `processing_parameters`
270
            should be provided and the parameters `prep_template` and
271
            `analysis` should not be provided.
272
            - If the artifact is the initial artifact of the analysis, the
273
            parameters `analysis` and `data_type` should be provided and the
274
            parameters `prep_template`, `parents` and `processing_parameters`
275
            should not be provided.
276
277
        Parameters
278
        ----------
279
        filepaths : iterable of tuples (str, int)
280
            A list of 2-tuples in which the first element is the artifact
281
            file path and the second one is the file path type id
282
        artifact_type : str
283
            The type of the artifact
284
        name : str, optional
285
            The artifact's name
286
        prep_template : qiita_db.metadata_template.PrepTemplate, optional
287
            If the artifact is being uploaded by the user, the prep template
288
            to which the artifact should be linked to. If not provided,
289
            `parents` or `analysis` should be provided.
290
        parents : iterable of qiita_db.artifact.Artifact, optional
291
            The list of artifacts from which the new artifact has been
292
            generated. If not provided, `prep_template` or `analysis`
293
            should be provided.
294
        processing_parameters : qiita_db.software.Parameters, optional
295
            The processing parameters used to generate the new artifact
296
            from `parents`. It is required if `parents` is provided. It should
297
            not be provided if `processing_parameters` is not provided.
298
        move_files : bool, optional
299
            If False the files will not be moved but copied
300
        analysis : qiita_db.analysis.Analysis, optional
301
            If the artifact is the inital artifact of an analysis, the analysis
302
            to which the artifact belongs to. If not provided, `prep_template`
303
            or `parents` should be provided.
304
        data_type : str
305
            The data_type of the artifact in the `analysis`. It is required if
306
            `analysis` is provided. It should not be provided if `analysis` is
307
            not provided.
308
309
        Returns
310
        -------
311
        qiita_db.artifact.Artifact
312
            A new instance of Artifact
313
314
        Raises
315
        ------
316
        QiitaDBArtifactCreationError
317
            If `filepaths` is not provided
318
            If both `parents` and `prep_template` are provided
319
            If none of `parents` and `prep_template` are provided
320
            If `parents` is provided but `processing_parameters` is not
321
            If both `prep_template` and `processing_parameters` is provided
322
            If not all the artifacts in `parents` belong to the same study
323
324
        Notes
325
        -----
326
        The visibility of the artifact is set by default to `sandbox` if
327
        prep_template is passed but if parents is passed we will inherit the
328
        most closed visibility.
329
        The timestamp of the artifact is set by default to `datetime.now()`.
330
        The value of `submitted_to_vamps` is set by default to `False`.
331
        """
332
        # We need at least one file
333
        if not filepaths:
334
            raise qdb.exceptions.QiitaDBArtifactCreationError(
335
                "at least one filepath is required.")
336
337
        # Check that the combination of parameters is correct
338
        counts = (int(bool(parents or processing_parameters)) +
339
                  int(prep_template is not None) +
340
                  int(bool(analysis or data_type)))
341
        if counts != 1:
342
            # More than one parameter has been provided
343
            raise qdb.exceptions.QiitaDBArtifactCreationError(
344
                "One and only one of parents, prep template or analysis must "
345
                "be provided")
346
        elif bool(parents) != bool(processing_parameters):
347
            # When provided, parents and processing parameters both should be
348
            # provided (this is effectively doing an XOR)
349
            raise qdb.exceptions.QiitaDBArtifactCreationError(
350
                "When provided, both parents and processing parameters should "
351
                "be provided")
352
        elif bool(analysis) and not bool(data_type):
353
            # When provided, analysis and data_type both should be
354
            # provided (this is effectively doing an XOR)
355
            raise qdb.exceptions.QiitaDBArtifactCreationError(
356
                "When provided, both analysis and data_type should "
357
                "be provided")
358
359
        # There are three different ways of creating an Artifact, but all of
360
        # them execute a set of common operations. Declare functions to avoid
361
        # code duplication. These functions should not be used outside of the
362
        # CREATE OR REPLACE FUNCTION, hence declaring them here
363
        def _common_creation_steps(atype, cmd_id, data_type, cmd_parameters):
364
            gen_timestamp = datetime.now()
365
            visibility_id = qdb.util.convert_to_id("sandbox", "visibility")
366
            atype_id = qdb.util.convert_to_id(atype, "artifact_type")
367
            dtype_id = qdb.util.convert_to_id(data_type, "data_type")
368
            # Create the artifact row in the artifact table
369
            sql = """INSERT INTO qiita.artifact
370
                        (generated_timestamp, command_id, data_type_id,
371
                         command_parameters, visibility_id,
372
                         artifact_type_id, submitted_to_vamps)
373
                     VALUES (%s, %s, %s, %s, %s, %s, %s)
374
                     RETURNING artifact_id"""
375
            sql_args = [gen_timestamp, cmd_id, dtype_id,
376
                        cmd_parameters, visibility_id, atype_id, False]
377
            qdb.sql_connection.TRN.add(sql, sql_args)
378
            a_id = qdb.sql_connection.TRN.execute_fetchlast()
379
            qdb.sql_connection.TRN.execute()
380
381
            return cls(a_id)
382
383
        def _associate_with_study(instance, study_id, prep_template_id):
384
            # Associate the artifact with the study
385
            sql = """INSERT INTO qiita.study_artifact
386
                        (study_id, artifact_id)
387
                     VALUES (%s, %s)"""
388
            sql_args = [study_id, instance.id]
389
            qdb.sql_connection.TRN.add(sql, sql_args)
390
            sql = """INSERT INTO qiita.preparation_artifact
391
                        (prep_template_id, artifact_id)
392
                     VALUES (%s, %s)"""
393
            sql_args = [prep_template_id, instance.id]
394
            qdb.sql_connection.TRN.add(sql, sql_args)
395
            qdb.sql_connection.TRN.execute()
396
397
        def _associate_with_analysis(instance, analysis_id):
398
            # Associate the artifact with the analysis
399
            sql = """INSERT INTO qiita.analysis_artifact
400
                        (analysis_id, artifact_id)
401
                     VALUES (%s, %s)"""
402
            sql_args = [analysis_id, instance.id]
403
            qdb.sql_connection.perform_as_transaction(sql, sql_args)
404
405
        with qdb.sql_connection.TRN:
406
            if parents:
407
                dtypes = {p.data_type for p in parents}
408
                # If an artifact has parents, it can be either from the
409
                # processing pipeline or the analysis pipeline. Decide which
410
                # one here
411
                studies = {p.study for p in parents}
412
                analyses = {p.analysis for p in parents}
413
                studies.discard(None)
414
                analyses.discard(None)
415
                studies = {s.id for s in studies}
416
                analyses = {a.id for a in analyses}
417
418
                # The first 2 cases should never happen, but it doesn't hurt
419
                # to check them
420
                len_studies = len(studies)
421
                len_analyses = len(analyses)
422
                if len_studies > 0 and len_analyses > 0:
423
                    raise qdb.exceptions.QiitaDBArtifactCreationError(
424
                        "All the parents from an artifact should be either "
425
                        "from the analysis pipeline or all from the processing"
426
                        " pipeline")
427
                elif len_studies > 1 or len_studies > 1:
428
                    raise qdb.exceptions.QiitaDBArtifactCreationError(
429
                        "Parents from multiple studies/analyses provided. "
430
                        "Analyses: %s. Studies: %s."
431
                        % (', '.join(analyses), ', '.join(studies)))
432
                elif len_studies == 1:
433
                    # This artifact is part of the processing pipeline
434
                    study_id = studies.pop()
435
                    # In the processing pipeline, artifacts can have only
436
                    # one dtype
437
                    if len(dtypes) > 1:
438
                        raise qdb.exceptions.QiitaDBArtifactCreationError(
439
                            "parents have multiple data types: %s"
440
                            % ", ".join(dtypes))
441
442
                    instance = _common_creation_steps(
443
                        artifact_type, processing_parameters.command.id,
444
                        dtypes.pop(), processing_parameters.dump())
445
                    _associate_with_study(
446
                        instance, study_id, parents[0].prep_templates[0].id)
447
                else:
448
                    # This artifact is part of the analysis pipeline
449
                    analysis_id = analyses.pop()
450
                    # In the processing pipeline, artifact parents can have
451
                    # more than one data type
452
                    data_type = ("Multiomic"
453
                                 if len(dtypes) > 1 else dtypes.pop())
454
                    instance = _common_creation_steps(
455
                        artifact_type, processing_parameters.command.id,
456
                        data_type, processing_parameters.dump())
457
                    _associate_with_analysis(instance, analysis_id)
458
459
                # Associate the artifact with its parents
460
                sql = """INSERT INTO qiita.parent_artifact
461
                            (artifact_id, parent_id)
462
                         VALUES (%s, %s)"""
463
                sql_args = [(instance.id, p.id) for p in parents]
464
                qdb.sql_connection.TRN.add(sql, sql_args, many=True)
465
466
                # inheriting visibility
467
                visibilities = {a.visibility for a in instance.parents}
468
                # set based on the "lowest" visibility
469
                if 'sandbox' in visibilities:
470
                    instance.visibility = 'sandbox'
471
                elif 'private' in visibilities:
472
                    instance.visibility = 'private'
473
                else:
474
                    instance._set_visibility('public')
475
476
            elif prep_template:
477
                # This artifact is uploaded by the user in the
478
                # processing pipeline
479
                instance = _common_creation_steps(
480
                    artifact_type, None, prep_template.data_type(), None)
481
                # Associate the artifact with the prep template
482
                prep_template.artifact = instance
483
                # Associate the artifact with the study
484
                _associate_with_study(
485
                    instance, prep_template.study_id, prep_template.id)
486
            else:
487
                # This artifact is an initial artifact of an analysis
488
                instance = _common_creation_steps(
489
                    artifact_type, None, data_type, None)
490
                # Associate the artifact with the analysis
491
                if bool(analysis):
492
                    analysis.add_artifact(instance)
493
494
            # Associate the artifact with its filepaths
495
            fp_ids = qdb.util.insert_filepaths(
496
                filepaths, instance.id, artifact_type,
497
                move_files=move_files, copy=(not move_files))
498
            sql = """INSERT INTO qiita.artifact_filepath
499
                        (artifact_id, filepath_id)
500
                     VALUES (%s, %s)"""
501
            sql_args = [[instance.id, fp_id] for fp_id in fp_ids]
502
            qdb.sql_connection.TRN.add(sql, sql_args, many=True)
503
504
            if name:
505
                instance.name = name
506
507
        return instance
508
509
    @classmethod
510
    def delete(cls, artifact_id):
511
        r"""Deletes an artifact from the system with its children
512
513
        Parameters
514
        ----------
515
        artifact_id : int
516
            The parent artifact to be removed
517
518
        Raises
519
        ------
520
        QiitaDBArtifactDeletionError
521
            If the artifacts are public
522
            If the artifacts have been analyzed
523
            If the artifacts have been submitted to EBI
524
            If the artifacts have been submitted to VAMPS
525
        """
526
        with qdb.sql_connection.TRN:
527
            # This will fail if the artifact with id=artifact_id doesn't exist
528
            instance = cls(artifact_id)
529
530
            # Check if the artifact is public
531
            if instance.visibility == 'public':
532
                raise qdb.exceptions.QiitaDBArtifactDeletionError(
533
                    artifact_id, "it is public")
534
535
            all_artifacts = list(set(instance.descendants.nodes()))
536
            all_artifacts.reverse()
537
            all_ids = tuple([a.id for a in all_artifacts])
538
539
            # Check if this or any of the children have been analyzed
540
            sql = """SELECT email, analysis_id
541
                     FROM qiita.analysis
542
                     WHERE analysis_id IN (
543
                        SELECT DISTINCT analysis_id
544
                        FROM qiita.analysis_sample
545
                        WHERE artifact_id IN %s)"""
546
            qdb.sql_connection.TRN.add(sql, [all_ids])
547
            analyses = qdb.sql_connection.TRN.execute_fetchindex()
548
            if analyses:
549
                analyses = '\n'.join(
550
                    ['Analysis id: %s, Owner: %s' % (aid, email)
551
                     for email, aid in analyses])
552
                raise qdb.exceptions.QiitaDBArtifactDeletionError(
553
                    artifact_id, 'it or one of its children has been '
554
                    'analyzed by: \n %s' % analyses)
555
556
            # Check if the artifacts have been submitted to EBI
557
            for a in all_artifacts:
558
                if a.can_be_submitted_to_ebi and a.ebi_run_accessions:
559
                    raise qdb.exceptions.QiitaDBArtifactDeletionError(
560
                        artifact_id, "Artifact %d has been submitted to "
561
                        "EBI" % a.id)
562
563
            # Check if the artifacts have been submitted to VAMPS
564
            for a in all_artifacts:
565
                if a.can_be_submitted_to_vamps and a.is_submitted_to_vamps:
566
                    raise qdb.exceptions.QiitaDBArtifactDeletionError(
567
                        artifact_id, "Artifact %d has been submitted to "
568
                        "VAMPS" % a.id)
569
570
            # Check if there is a job queued, running, waiting or
571
            # in_construction that will use/is using the artifact
572
            sql = """SELECT processing_job_id
573
                     FROM qiita.artifact_processing_job
574
                         JOIN qiita.processing_job USING (processing_job_id)
575
                         JOIN qiita.processing_job_status
576
                             USING (processing_job_status_id)
577
                     WHERE artifact_id IN %s
578
                         AND processing_job_status IN (
579
                             'queued', 'running', 'waiting')"""
580
            qdb.sql_connection.TRN.add(sql, [all_ids])
581
            jobs = qdb.sql_connection.TRN.execute_fetchflatten()
582
            if jobs:
583
                # if the artifact has active jobs we need to raise an error
584
                # but we also need to check that if it's only 1 job, that the
585
                # job is not the delete_artifact actual job
586
                raise_error = True
587
                job_name = qdb.processing_job.ProcessingJob(
588
                    jobs[0]).command.name
589
                if len(jobs) == 1 and job_name == 'delete_artifact':
590
                    raise_error = False
591
                if raise_error:
592
                    raise qdb.exceptions.QiitaDBArtifactDeletionError(
593
                        artifact_id, "there is a queued/running job that "
594
                        "uses this artifact or one of it's children")
595
596
            # We can now remove the artifacts
597
            filepaths = [f for a in all_artifacts for f in a.filepaths]
598
            study = instance.study
599
600
            # Delete any failed/successful job that had the artifact as input
601
            sql = """SELECT processing_job_id
602
                     FROM qiita.artifact_processing_job
603
                     WHERE artifact_id IN %s"""
604
            qdb.sql_connection.TRN.add(sql, [all_ids])
605
            job_ids = tuple(qdb.sql_connection.TRN.execute_fetchflatten())
606
607
            if job_ids:
608
                sql = """DELETE FROM qiita.artifact_processing_job
609
                         WHERE artifact_id IN %s"""
610
                qdb.sql_connection.TRN.add(sql, [all_ids])
611
612
            # Delete the entry from the artifact_output_processing_job table
613
            sql = """DELETE FROM qiita.artifact_output_processing_job
614
                     WHERE artifact_id IN %s"""
615
            qdb.sql_connection.TRN.add(sql, [all_ids])
616
617
            # Detach the artifact from its filepaths
618
            sql = """DELETE FROM qiita.artifact_filepath
619
                     WHERE artifact_id IN %s"""
620
            qdb.sql_connection.TRN.add(sql, [all_ids])
621
622
            # If the first artifact to be deleted, instance, doesn't have
623
            # parents and study is not None (None means is an analysis), we
624
            # move the files to the uploads folder. We also need
625
            # to nullify the column in the prep template table
626
            if not instance.parents and study is not None:
627
                qdb.util.move_filepaths_to_upload_folder(study.id,
628
                                                         filepaths)
629
                # there are cases that an artifact would not be linked to a
630
                # study
631
                pt_ids = [tuple([pt.id]) for a in all_artifacts
632
                          for pt in a.prep_templates]
633
                if pt_ids:
634
                    sql = """UPDATE qiita.prep_template
635
                             SET artifact_id = NULL
636
                             WHERE prep_template_id IN %s"""
637
                    qdb.sql_connection.TRN.add(sql, pt_ids)
638
            else:
639
                sql = """DELETE FROM qiita.parent_artifact
640
                         WHERE artifact_id IN %s"""
641
                qdb.sql_connection.TRN.add(sql, [all_ids])
642
643
            # Detach the artifacts from the study_artifact table
644
            sql = "DELETE FROM qiita.study_artifact WHERE artifact_id IN %s"
645
            qdb.sql_connection.TRN.add(sql, [all_ids])
646
647
            # Detach the artifacts from the analysis_artifact table
648
            sql = "DELETE FROM qiita.analysis_artifact WHERE artifact_id IN %s"
649
            qdb.sql_connection.TRN.add(sql, [all_ids])
650
651
            # Detach artifact from preparation_artifact
652
            sql = """DELETE FROM qiita.preparation_artifact
653
                     WHERE artifact_id IN %s"""
654
            qdb.sql_connection.TRN.add(sql, [all_ids])
655
656
            # Delete the rows in the artifact table
657
            sql = "DELETE FROM qiita.artifact WHERE artifact_id IN %s"
658
            qdb.sql_connection.TRN.add(sql, [all_ids])
659
660
    @classmethod
661
    def archive(cls, artifact_id, clean_ancestors=True):
662
        """Archive artifact with artifact_id
663
664
        Parameters
665
        ----------
666
        artifact_id : int
667
            The artifact to be archived
668
        clean_ancestors : bool
669
            If other childless artifacts should be deleted
670
671
        Raises
672
        ------
673
        QiitaDBOperationNotPermittedError
674
            If the artifact is not public
675
            If the artifact_type is not BIOM
676
            If the artifact belongs to an analysis
677
            If the artifact has no parents (raw file)
678
        """
679
        artifact = cls(artifact_id)
680
681
        if artifact.visibility != 'public':
682
            raise qdb.exceptions.QiitaDBOperationNotPermittedError(
683
                'Only public artifacts can be archived')
684
        if artifact.artifact_type != 'BIOM':
685
            raise qdb.exceptions.QiitaDBOperationNotPermittedError(
686
                'Only BIOM artifacts can be archived')
687
        if artifact.analysis is not None:
688
            raise qdb.exceptions.QiitaDBOperationNotPermittedError(
689
                'Only non analysis artifacts can be archived')
690
        if not artifact.parents:
691
            raise qdb.exceptions.QiitaDBOperationNotPermittedError(
692
                'Only non raw artifacts can be archived')
693
694
        to_delete = []
695
        if clean_ancestors:
696
            # let's find all ancestors that can be deleted (it has parents and
697
            # no ancestors (that have no descendants), and delete them
698
            to_delete = [x for x in artifact.ancestors.nodes()
699
                         if x.id != artifact_id and x.parents and
700
                         not [y for y in x.descendants.nodes()
701
                         if y.id not in (artifact_id, x.id)]]
702
            # ignore artifacts that can and has been submitted to EBI
703
            to_delete = [x for x in to_delete if not x.can_be_submitted_to_ebi
704
                         and not x.is_submitted_to_ebi
705
                         and not x.is_submitted_to_vamps]
706
707
        # get the log file so we can delete
708
        fids = [x['fp_id'] for x in artifact.filepaths
709
                if x['fp_type'] == 'log']
710
711
        archive_data = dumps({"merging_scheme": artifact.merging_scheme})
712
        with qdb.sql_connection.TRN:
713
            artifact._set_visibility('archived', propagate=False)
714
            sql = 'DELETE FROM qiita.parent_artifact WHERE artifact_id = %s'
715
            qdb.sql_connection.TRN.add(sql, [artifact_id])
716
717
            sql = '''DELETE FROM qiita.artifact_output_processing_job
718
                     WHERE artifact_id = %s'''
719
            qdb.sql_connection.TRN.add(sql, [artifact_id])
720
721
            if fids:
722
                sql = '''DELETE FROM qiita.artifact_filepath
723
                         WHERE filepath_id IN %s'''
724
                qdb.sql_connection.TRN.add(sql, [tuple(fids)])
725
726
            sql = """UPDATE qiita.{0}
727
                     SET archive_data = %s
728
                     WHERE artifact_id = %s""".format(cls._table)
729
            qdb.sql_connection.TRN.add(sql, [archive_data, artifact_id])
730
731
            qdb.sql_connection.TRN.execute()
732
733
        # cleaning the extra artifacts
734
        for x in to_delete:
735
            x._set_visibility('sandbox', propagate=False)
736
            cls.delete(x.id)
737
738
    @property
739
    def name(self):
740
        """The name of the artifact
741
742
        Returns
743
        -------
744
        str
745
            The artifact name
746
        """
747
        with qdb.sql_connection.TRN:
748
            sql = """SELECT name
749
                     FROM qiita.artifact
750
                     WHERE artifact_id = %s"""
751
            qdb.sql_connection.TRN.add(sql, [self.id])
752
            return qdb.sql_connection.TRN.execute_fetchlast()
753
754
    @name.setter
755
    def name(self, value):
756
        """Set the name of the artifact
757
758
        Parameters
759
        ----------
760
        value : str
761
            The new artifact's name
762
763
        Raises
764
        ------
765
        ValueError
766
            If `value` contains more than 35 chars
767
        """
768
        sql = """UPDATE qiita.artifact
769
                 SET name = %s
770
                 WHERE artifact_id = %s"""
771
        qdb.sql_connection.perform_as_transaction(sql, [value, self.id])
772
773
    @property
774
    def timestamp(self):
775
        """The timestamp when the artifact was generated
776
777
        Returns
778
        -------
779
        datetime
780
            The timestamp when the artifact was generated
781
        """
782
        with qdb.sql_connection.TRN:
783
            sql = """SELECT generated_timestamp
784
                     FROM qiita.artifact
785
                     WHERE artifact_id = %s"""
786
            qdb.sql_connection.TRN.add(sql, [self.id])
787
            return qdb.sql_connection.TRN.execute_fetchlast()
788
789
    @property
790
    def processing_parameters(self):
791
        """The processing parameters used to generate the artifact
792
793
        Returns
794
        -------
795
        qiita_db.software.Parameters or None
796
            The parameters used to generate the artifact if it has parents.
797
            None otherwise.
798
        """
799
        with qdb.sql_connection.TRN:
800
            sql = """SELECT command_id, command_parameters
801
                     FROM qiita.artifact
802
                     WHERE artifact_id = %s"""
803
            qdb.sql_connection.TRN.add(sql, [self.id])
804
            # Only one row will be returned
805
            res = qdb.sql_connection.TRN.execute_fetchindex()[0]
806
            if res[0] is None:
807
                return None
808
            return qdb.software.Parameters.load(
809
                qdb.software.Command(res[0]), values_dict=res[1])
810
811
    @property
812
    def visibility(self):
813
        """The visibility of the artifact
814
815
        Returns
816
        -------
817
        str
818
            The visibility of the artifact
819
        """
820
        with qdb.sql_connection.TRN:
821
            sql = """SELECT visibility
822
                     FROM qiita.artifact
823
                        JOIN qiita.visibility USING (visibility_id)
824
                     WHERE artifact_id = %s"""
825
            qdb.sql_connection.TRN.add(sql, [self.id])
826
            return qdb.sql_connection.TRN.execute_fetchlast()
827
828
    def _set_visibility(self, value, propagate=True):
829
        "helper method to split validation and actual set of the visibility"
830
        # In order to correctly propagate the visibility we need to find
831
        # the root of this artifact and then propagate to all the artifacts
832
        vis_id = qdb.util.convert_to_id(value, "visibility")
833
834
        if propagate:
835
            sql = "SELECT * FROM qiita.find_artifact_roots(%s)"
836
            qdb.sql_connection.TRN.add(sql, [self.id])
837
            root_id = qdb.sql_connection.TRN.execute_fetchlast()
838
            root = qdb.artifact.Artifact(root_id)
839
            # these are the ids of all the children from the root
840
            ids = [a.id for a in root.descendants.nodes()]
841
        else:
842
            ids = [self.id]
843
844
        sql = """UPDATE qiita.artifact
845
                 SET visibility_id = %s
846
                 WHERE artifact_id IN %s"""
847
        qdb.sql_connection.perform_as_transaction(sql, [vis_id, tuple(ids)])
848
849
    @visibility.setter
850
    def visibility(self, value):
851
        """Sets the visibility of the artifact
852
853
        Parameters
854
        ----------
855
        value : str
856
            The new visibility of the artifact
857
858
        Notes
859
        -----
860
        The visibility of an artifact is propagated to its ancestors, but it
861
        only applies when the new visibility is more open than before.
862
        """
863
        with qdb.sql_connection.TRN:
864
            # first let's check that this is a valid visibility
865
            study = self.study
866
867
            # then let's check that the sample/prep info files have the correct
868
            # restrictions
869
            if value != 'sandbox' and study is not None:
870
                reply = study.sample_template.validate_restrictions()
871
                success = [not reply[0]]
872
                message = [reply[1]]
873
                for pt in self.prep_templates:
874
                    reply = pt.validate_restrictions()
875
                    success.append(not reply[0])
876
                    message.append(reply[1])
877
                if any(success):
878
                    raise ValueError(
879
                        "Errors in your info files:%s" % '\n'.join(message))
880
881
            self._set_visibility(value)
882
883
    @property
884
    def artifact_type(self):
885
        """The artifact type
886
887
        Returns
888
        -------
889
        str
890
            The artifact type
891
        """
892
        with qdb.sql_connection.TRN:
893
            sql = """SELECT artifact_type
894
                     FROM qiita.artifact
895
                        JOIN qiita.artifact_type USING (artifact_type_id)
896
                     WHERE artifact_id = %s"""
897
            qdb.sql_connection.TRN.add(sql, [self.id])
898
            return qdb.sql_connection.TRN.execute_fetchlast()
899
900
    @property
901
    def data_type(self):
902
        """The data type of the artifact
903
904
        Returns
905
        -------
906
        str
907
            The artifact data type
908
        """
909
        with qdb.sql_connection.TRN:
910
            sql = """SELECT data_type
911
                     FROM qiita.artifact
912
                        JOIN qiita.data_type USING (data_type_id)
913
                     WHERE artifact_id = %s"""
914
            qdb.sql_connection.TRN.add(sql, [self.id])
915
            return qdb.sql_connection.TRN.execute_fetchlast()
916
917
    @property
918
    def can_be_submitted_to_ebi(self):
919
        """Whether the artifact can be submitted to EBI or not
920
921
        Returns
922
        -------
923
        bool
924
            True if the artifact can be submitted to EBI. False otherwise.
925
        """
926
        with qdb.sql_connection.TRN:
927
            # we should always return False if this artifact is not directly
928
            # attached to the prep_template or is the second after. In other
929
            # words has more that one processing step behind it
930
            fine_to_send = []
931
            fine_to_send.extend([pt.artifact for pt in self.prep_templates])
932
            fine_to_send.extend([c for a in fine_to_send if a is not None
933
                                 for c in a.children])
934
            if self not in fine_to_send:
935
                return False
936
937
            sql = """SELECT can_be_submitted_to_ebi
938
                     FROM qiita.artifact_type
939
                        JOIN qiita.artifact USING (artifact_type_id)
940
                     WHERE artifact_id = %s"""
941
            qdb.sql_connection.TRN.add(sql, [self.id])
942
            return qdb.sql_connection.TRN.execute_fetchlast()
943
944
    @property
945
    def is_submitted_to_ebi(self):
946
        """Whether the artifact has been submitted to EBI or not
947
948
        Returns
949
        -------
950
        bool
951
            True if the artifact has been submitted to EBI. False otherwise
952
953
        Raises
954
        ------
955
        QiitaDBOperationNotPermittedError
956
            If the artifact cannot be submitted to EBI
957
        """
958
        with qdb.sql_connection.TRN:
959
            if not self.can_be_submitted_to_ebi:
960
                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
961
                    "Artifact %s cannot be submitted to EBI" % self.id)
962
            sql = """SELECT EXISTS(
963
                        SELECT *
964
                        FROM qiita.ebi_run_accession
965
                        WHERE artifact_id = %s)"""
966
            qdb.sql_connection.TRN.add(sql, [self.id])
967
            return qdb.sql_connection.TRN.execute_fetchlast()
968
969
    @property
970
    def ebi_run_accessions(self):
971
        """The EBI run accessions attached to this artifact
972
973
        Returns
974
        -------
975
        dict of {str: str}
976
            The EBI run accessions keyed by sample id
977
978
        Raises
979
        ------
980
        QiitaDBOperationNotPermittedError
981
            If the artifact cannot be submitted to EBI
982
        """
983
        with qdb.sql_connection.TRN:
984
            if not self.can_be_submitted_to_ebi:
985
                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
986
                    "Artifact %s cannot be submitted to EBI" % self.id)
987
            sql = """SELECT sample_id, ebi_run_accession
988
                     FROM qiita.ebi_run_accession
989
                     WHERE artifact_id = %s"""
990
            qdb.sql_connection.TRN.add(sql, [self.id])
991
            return {s_id: ebi_acc for s_id, ebi_acc in
992
                    qdb.sql_connection.TRN.execute_fetchindex()}
993
994
    @ebi_run_accessions.setter
995
    def ebi_run_accessions(self, values):
996
        """Set the EBI run accession attached to this artifact
997
998
        Parameters
999
        ----------
1000
        values : dict of {str: str}
1001
            The EBI accession number keyed by sample id
1002
1003
        Raises
1004
        ------
1005
        QiitaDBOperationNotPermittedError
1006
            If the artifact cannot be submitted to EBI
1007
            If the artifact has been already submitted to EBI
1008
        """
1009
        with qdb.sql_connection.TRN:
1010
            if not self.can_be_submitted_to_ebi:
1011
                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
1012
                    "Artifact %s cannot be submitted to EBI" % self.id)
1013
1014
            sql = """SELECT EXISTS(SELECT *
1015
                                   FROM qiita.ebi_run_accession
1016
                                   WHERE artifact_id = %s)"""
1017
            qdb.sql_connection.TRN.add(sql, [self.id])
1018
            if qdb.sql_connection.TRN.execute_fetchlast():
1019
                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
1020
                    "Artifact %s already submitted to EBI" % self.id)
1021
1022
            sql = """INSERT INTO qiita.ebi_run_accession
1023
                        (sample_id, artifact_id, ebi_run_accession)
1024
                     VALUES (%s, %s, %s)"""
1025
            sql_args = [[sample, self.id, accession]
1026
                        for sample, accession in values.items()]
1027
            qdb.sql_connection.TRN.add(sql, sql_args, many=True)
1028
            qdb.sql_connection.TRN.execute()
1029
1030
    @property
1031
    def can_be_submitted_to_vamps(self):
1032
        """Whether the artifact can be submitted to VAMPS or not
1033
1034
        Returns
1035
        -------
1036
        bool
1037
            True if the artifact can be submitted to VAMPS. False otherwise.
1038
        """
1039
        with qdb.sql_connection.TRN:
1040
            sql = """SELECT can_be_submitted_to_vamps
1041
                     FROM qiita.artifact_type
1042
                        JOIN qiita.artifact USING (artifact_type_id)
1043
                     WHERE artifact_id = %s"""
1044
            qdb.sql_connection.TRN.add(sql, [self.id])
1045
            return qdb.sql_connection.TRN.execute_fetchlast()
1046
1047
    @property
1048
    def is_submitted_to_vamps(self):
1049
        """Whether if the artifact has been submitted to VAMPS or not
1050
1051
        Returns
1052
        -------
1053
        bool
1054
            True if the artifact has been submitted to VAMPS. False otherwise
1055
1056
        Raises
1057
        ------
1058
        QiitaDBOperationNotPermittedError
1059
            If the artifact cannot be submitted to VAMPS
1060
        """
1061
        with qdb.sql_connection.TRN:
1062
            if not self.can_be_submitted_to_vamps:
1063
                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
1064
                    "Artifact %s cannot be submitted to VAMPS" % self.id)
1065
            sql = """SELECT submitted_to_vamps
1066
                     FROM qiita.artifact
1067
                     WHERE artifact_id = %s"""
1068
            qdb.sql_connection.TRN.add(sql, [self.id])
1069
            return qdb.sql_connection.TRN.execute_fetchlast()
1070
1071
    @is_submitted_to_vamps.setter
1072
    def is_submitted_to_vamps(self, value):
1073
        """Set if the artifact has been submitted to VAMPS
1074
1075
        Parameters
1076
        ----------
1077
        value : bool
1078
            Whether the artifact has been submitted to VAMPS or not
1079
1080
        Raises
1081
        ------
1082
        QiitaDBOperationNotPermittedError
1083
            If the artifact cannot be submitted to VAMPS
1084
        """
1085
        if not self.can_be_submitted_to_vamps:
1086
            raise qdb.exceptions.QiitaDBOperationNotPermittedError(
1087
                "Artifact %s cannot be submitted to VAMPS" % self.id)
1088
        sql = """UPDATE qiita.artifact
1089
                 SET submitted_to_vamps = %s
1090
                 WHERE artifact_id = %s"""
1091
        qdb.sql_connection.perform_as_transaction(sql, [value, self.id])
1092
1093
    @property
1094
    def filepaths(self):
1095
        """Returns the filepaths associated with the artifact
1096
1097
        Returns
1098
        -------
1099
        list of dict
1100
            A list of dict as defined by qiita_db.util.retrieve_filepaths
1101
        """
1102
        return qdb.util.retrieve_filepaths(
1103
            "artifact_filepath", "artifact_id", self.id, sort='ascending')
1104
1105
    @property
1106
    def html_summary_fp(self):
1107
        """Returns the HTML summary filepath
1108
1109
        Returns
1110
        -------
1111
        tuple of (int, str)
1112
            The filepath id and the path to the HTML summary
1113
        """
1114
        fps = qdb.util.retrieve_filepaths("artifact_filepath", "artifact_id",
1115
                                          self.id, fp_type='html_summary')
1116
        if fps:
1117
            # If fps is not the empty list, then we have exactly one file
1118
            # retrieve_filepaths returns a list of lists of 3 values: the
1119
            # filepath id, the filepath and the filepath type. We don't want
1120
            # to return the filepath type here, so just grabbing the first and
1121
            # second element of the list
1122
            res = (fps[0]['fp_id'], fps[0]['fp'])
1123
        else:
1124
            res = None
1125
1126
        return res
1127
1128
    def set_html_summary(self, html_fp, support_dir=None):
1129
        """Sets the HTML summary of the artifact
1130
1131
        Parameters
1132
        ----------
1133
        html_fp : str
1134
            Path to the new HTML summary
1135
        support_dir : str
1136
            Path to the directory containing any support files needed by
1137
            the HTML file
1138
        """
1139
        with qdb.sql_connection.TRN:
1140
            old_summs = self.html_summary_fp
1141
            to_delete_fps = []
1142
            if old_summs:
1143
                # Delete from the DB current HTML summary; below we will remove
1144
                # files, if necessary
1145
                to_delete_ids = []
1146
                for x in self.filepaths:
1147
                    if x['fp_type'] in ('html_summary', 'html_summary_dir'):
1148
                        to_delete_ids.append([x['fp_id']])
1149
                        to_delete_fps.append(x['fp'])
1150
                # From the artifact_filepath table
1151
                sql = """DELETE FROM qiita.artifact_filepath
1152
                         WHERE filepath_id = %s"""
1153
                qdb.sql_connection.TRN.add(sql, to_delete_ids, many=True)
1154
                # From the filepath table
1155
                sql = "DELETE FROM qiita.filepath WHERE filepath_id=%s"
1156
                qdb.sql_connection.TRN.add(sql, to_delete_ids, many=True)
1157
1158
            # Add the new HTML summary
1159
            filepaths = [(html_fp, 'html_summary')]
1160
            if support_dir is not None:
1161
                filepaths.append((support_dir, 'html_summary_dir'))
1162
            fp_ids = qdb.util.insert_filepaths(
1163
                filepaths, self.id, self.artifact_type)
1164
            sql = """INSERT INTO qiita.artifact_filepath
1165
                        (artifact_id, filepath_id)
1166
                     VALUES (%s, %s)"""
1167
            sql_args = [[self.id, id_] for id_ in fp_ids]
1168
            qdb.sql_connection.TRN.add(sql, sql_args, many=True)
1169
            qdb.sql_connection.TRN.execute()
1170
1171
        # to avoid deleting potentially necessary files, we are going to add
1172
        # that check after the previous transaction is commited
1173
        if to_delete_fps:
1174
            for x in self.filepaths:
1175
                if x['fp'] in to_delete_fps:
1176
                    to_delete_fps.remove(x['fp'])
1177
1178
            for fp in to_delete_fps:
1179
                if isfile(fp):
1180
                    remove(fp)
1181
                else:
1182
                    rmtree(fp)
1183
1184
    @property
1185
    def parents(self):
1186
        """Returns the parents of the artifact
1187
1188
        Returns
1189
        -------
1190
        list of qiita_db.artifact.Artifact
1191
            The parent artifacts
1192
        """
1193
        with qdb.sql_connection.TRN:
1194
            sql = """SELECT parent_id
1195
                     FROM qiita.parent_artifact
1196
                     WHERE artifact_id = %s"""
1197
            qdb.sql_connection.TRN.add(sql, [self.id])
1198
            return [Artifact(p_id)
1199
                    for p_id in qdb.sql_connection.TRN.execute_fetchflatten()]
1200
1201
    def _create_lineage_graph_from_edge_list(self, edge_list):
1202
        """Generates an artifact graph from the given `edge_list`
1203
1204
        Parameters
1205
        ----------
1206
        edge_list : list of (int, int)
1207
            List of (parent_artifact_id, artifact_id)
1208
1209
        Returns
1210
        -------
1211
        networkx.DiGraph
1212
            The graph representing the artifact lineage stored in `edge_list`
1213
        """
1214
        lineage = nx.DiGraph()
1215
        # In case the edge list is empty, only 'self' is present in the graph
1216
        if edge_list:
1217
            # By creating all the artifacts here we are saving DB calls
1218
            nodes = {a_id: Artifact(a_id)
1219
                     for a_id in set(chain.from_iterable(edge_list))}
1220
1221
            for parent, child in edge_list:
1222
                lineage.add_edge(nodes[parent], nodes[child])
1223
        else:
1224
            lineage.add_node(self)
1225
1226
        return lineage
1227
1228
    @property
1229
    def ancestors(self):
1230
        """Returns the ancestors of the artifact
1231
1232
        Returns
1233
        -------
1234
        networkx.DiGraph
1235
            The ancestors of the artifact
1236
        """
1237
        with qdb.sql_connection.TRN:
1238
            sql = """SELECT parent_id, artifact_id
1239
                     FROM qiita.artifact_ancestry(%s)"""
1240
            qdb.sql_connection.TRN.add(sql, [self.id])
1241
            edges = qdb.sql_connection.TRN.execute_fetchindex()
1242
        return self._create_lineage_graph_from_edge_list(edges)
1243
1244
    @property
1245
    def descendants(self):
1246
        """Returns the descendants of the artifact
1247
1248
        Returns
1249
        -------
1250
        networkx.DiGraph
1251
            The descendants of the artifact
1252
        """
1253
        with qdb.sql_connection.TRN:
1254
            sql = """SELECT parent_id, artifact_id
1255
                     FROM qiita.artifact_descendants(%s)"""
1256
            qdb.sql_connection.TRN.add(sql, [self.id])
1257
            edges = qdb.sql_connection.TRN.execute_fetchindex()
1258
        return self._create_lineage_graph_from_edge_list(edges)
1259
1260
    @property
1261
    def descendants_with_jobs(self):
1262
        """Returns the descendants of the artifact with their jobs
1263
1264
        Returns
1265
        -------
1266
        networkx.DiGraph
1267
            The descendants of the artifact
1268
        """
1269
        def _add_edge(edges, src, dest):
1270
            """Aux function to add the edge (src, dest) to edges"""
1271
            edge = (src, dest)
1272
            if edge not in edges:
1273
                edges.add(edge)
1274
1275
        with qdb.sql_connection.TRN:
1276
            sql = """SELECT processing_job_id, input_id, output_id
1277
                     FROM qiita.artifact_descendants_with_jobs(%s)"""
1278
            qdb.sql_connection.TRN.add(sql, [self.id])
1279
            sql_edges = qdb.sql_connection.TRN.execute_fetchindex()
1280
1281
            # helper function to reduce code duplication
1282
            def _helper(sql_edges, edges, nodes):
1283
                for jid, pid, cid in sql_edges:
1284
                    if jid not in nodes:
1285
                        nodes[jid] = ('job',
1286
                                      qdb.processing_job.ProcessingJob(jid))
1287
                    if pid not in nodes:
1288
                        nodes[pid] = ('artifact', qdb.artifact.Artifact(pid))
1289
                    if cid not in nodes:
1290
                        nodes[cid] = ('artifact', qdb.artifact.Artifact(cid))
1291
                    edges.add((nodes[pid], nodes[jid]))
1292
                    edges.add((nodes[jid], nodes[cid]))
1293
1294
            lineage = nx.DiGraph()
1295
            edges = set()
1296
            nodes = dict()
1297
            extra_edges = set()
1298
            extra_nodes = dict()
1299
            if sql_edges:
1300
                _helper(sql_edges, edges, nodes)
1301
            else:
1302
                nodes[self.id] = ('artifact', self)
1303
                lineage.add_node(nodes[self.id])
1304
            # if this is an Analysis we need to check if there are extra
1305
            # edges/nodes as there is a chance that there are connecions
1306
            # between them
1307
            if self.analysis is not None:
1308
                roots = [a for a in self.analysis.artifacts
1309
                         if not a.parents and a != self]
1310
                for r in roots:
1311
                    # add the root to the options then their children
1312
                    extra_nodes[r.id] = ('artifact', r)
1313
                    qdb.sql_connection.TRN.add(sql, [r.id])
1314
                    sql_edges = qdb.sql_connection.TRN.execute_fetchindex()
1315
                    _helper(sql_edges, extra_edges, extra_nodes)
1316
1317
            # The code above returns all the jobs that have been successfully
1318
            # executed. We need to add all the jobs that are in all the other
1319
            # status. Approach: Loop over all the artifacts and add all the
1320
            # jobs that have been attached to them.
1321
            visited = set()
1322
            queue = list(nodes.keys())
1323
            while queue:
1324
                current = queue.pop(0)
1325
                if current not in visited:
1326
                    visited.add(current)
1327
                    n_type, n_obj = nodes[current]
1328
                    if n_type == 'artifact':
1329
                        # Add all the jobs to the queue
1330
                        for job in n_obj.jobs():
1331
                            queue.append(job.id)
1332
                            if job.id not in nodes:
1333
                                nodes[job.id] = ('job', job)
1334
1335
                    elif n_type == 'job':
1336
                        # skip private and artifact definition jobs as they
1337
                        # don't create new artifacts and they would create
1338
                        # edges without artifacts + they can be safely ignored
1339
                        if n_obj.command.software.type in {
1340
                                'private', 'artifact definition'}:
1341
                            continue
1342
                        jstatus = n_obj.status
1343
                        # If the job is in success we don't need to do anything
1344
                        # else since it would've been added by the code above
1345
                        if jstatus != 'success':
1346
1347
                            if jstatus != 'error':
1348
                                # If the job is not errored, we can add the
1349
                                # future outputs and the children jobs to
1350
                                # the graph.
1351
1352
                                # Add all the job outputs as new nodes
1353
                                for o_name, o_type in n_obj.command.outputs:
1354
                                    node_id = '%s:%s' % (n_obj.id, o_name)
1355
                                    node = TypeNode(
1356
                                        id=node_id, job_id=n_obj.id,
1357
                                        name=o_name, type=o_type)
1358
                                    queue.append(node_id)
1359
                                    if node_id not in nodes:
1360
                                        nodes[node_id] = ('type', node)
1361
1362
                                # Add all his children jobs to the queue
1363
                                for cjob in n_obj.children:
1364
                                    queue.append(cjob.id)
1365
                                    if cjob.id not in nodes:
1366
                                        nodes[cjob.id] = ('job', cjob)
1367
1368
                                    # including the outputs
1369
                                    for o_name, o_type in cjob.command.outputs:
1370
                                        node_id = '%s:%s' % (cjob.id, o_name)
1371
                                        node = TypeNode(
1372
                                            id=node_id, job_id=cjob.id,
1373
                                            name=o_name, type=o_type)
1374
                                        if node_id not in nodes:
1375
                                            nodes[node_id] = ('type', node)
1376
1377
                            # Connect the job with his input artifacts, the
1378
                            # input artifacts may or may not exist yet, so we
1379
                            # need to check both the input_artifacts and the
1380
                            # pending properties
1381
                            for in_art in n_obj.input_artifacts:
1382
                                iid = in_art.id
1383
                                if iid not in nodes and iid in extra_nodes:
1384
                                    nodes[iid] = extra_nodes[iid]
1385
                                _add_edge(edges, nodes[iid], nodes[n_obj.id])
1386
1387
                            pending = n_obj.pending
1388
                            for pred_id in pending:
1389
                                for pname in pending[pred_id]:
1390
                                    in_node_id = '%s:%s' % (
1391
                                        pred_id, pending[pred_id][pname])
1392
                                    _add_edge(edges, nodes[in_node_id],
1393
                                              nodes[n_obj.id])
1394
1395
                    elif n_type == 'type':
1396
                        # Connect this 'future artifact' with the job that will
1397
                        # generate it
1398
                        _add_edge(edges, nodes[n_obj.job_id], nodes[current])
1399
                    else:
1400
                        raise ValueError('Unrecognized type: %s' % n_type)
1401
1402
        # Add all edges to the lineage graph - adding the edges creates the
1403
        # nodes in networkx
1404
        for source, dest in edges:
1405
            lineage.add_edge(source, dest)
1406
1407
        return lineage
1408
1409
    @property
1410
    def children(self):
1411
        """Returns the list of children of the artifact
1412
1413
        Returns
1414
        -------
1415
        list of qiita_db.artifact.Artifact
1416
            The children artifacts
1417
        """
1418
        with qdb.sql_connection.TRN:
1419
            sql = """SELECT artifact_id
1420
                     FROM qiita.parent_artifact
1421
                     WHERE parent_id = %s"""
1422
            qdb.sql_connection.TRN.add(sql, [self.id])
1423
            return [Artifact(c_id)
1424
                    for c_id in qdb.sql_connection.TRN.execute_fetchflatten()]
1425
1426
    @property
1427
    def youngest_artifact(self):
1428
        """Returns the youngest artifact of the artifact's lineage
1429
1430
        Returns
1431
        -------
1432
        qiita_db.artifact.Artifact
1433
            The youngest descendant of the artifact's lineage
1434
        """
1435
        with qdb.sql_connection.TRN:
1436
            sql = """SELECT artifact_id
1437
                     FROM qiita.artifact_descendants(%s)
1438
                        JOIN qiita.artifact USING (artifact_id)
1439
                     WHERE visibility_id NOT IN %s
1440
                     ORDER BY generated_timestamp DESC
1441
                     LIMIT 1"""
1442
            qdb.sql_connection.TRN.add(
1443
                sql, [self.id, qdb.util.artifact_visibilities_to_skip()])
1444
            a_id = qdb.sql_connection.TRN.execute_fetchindex()
1445
            # If the current artifact has no children, the previous call will
1446
            # return an empty list, so the youngest artifact in the lineage is
1447
            # the current artifact. On the other hand, if it has descendants,
1448
            # the id of the youngest artifact will be in a_id[0][0]
1449
            result = Artifact(a_id[0][0]) if a_id else self
1450
1451
        return result
1452
1453
    @property
1454
    def prep_templates(self):
1455
        """The prep templates attached to this artifact
1456
1457
        Returns
1458
        -------
1459
        list of qiita_db.metadata_template.PrepTemplate
1460
        """
1461
        with qdb.sql_connection.TRN:
1462
            sql = """SELECT prep_template_id
1463
                     FROM qiita.preparation_artifact
1464
                     WHERE artifact_id = %s"""
1465
            qdb.sql_connection.TRN.add(sql, [self.id])
1466
            return [qdb.metadata_template.prep_template.PrepTemplate(pt_id)
1467
                    for pt_id in qdb.sql_connection.TRN.execute_fetchflatten()]
1468
1469
    @property
1470
    def study(self):
1471
        """The study to which the artifact belongs to
1472
1473
        Returns
1474
        -------
1475
        qiita_db.study.Study or None
1476
            The study that owns the artifact, if any
1477
        """
1478
        with qdb.sql_connection.TRN:
1479
            sql = """SELECT study_id
1480
                     FROM qiita.study_artifact
1481
                     WHERE artifact_id = %s"""
1482
            qdb.sql_connection.TRN.add(sql, [self.id])
1483
            res = qdb.sql_connection.TRN.execute_fetchindex()
1484
            return qdb.study.Study(res[0][0]) if res else None
1485
1486
    @property
1487
    def analysis(self):
1488
        """The analysis to which the artifact belongs to
1489
1490
        Returns
1491
        -------
1492
        qiita_db.analysis.Analysis or None
1493
            The analysis that owns the artifact, if any
1494
        """
1495
        with qdb.sql_connection.TRN:
1496
            sql = """SELECT analysis_id
1497
                     FROM qiita.analysis_artifact
1498
                     WHERE artifact_id = %s"""
1499
            qdb.sql_connection.TRN.add(sql, [self.id])
1500
            res = qdb.sql_connection.TRN.execute_fetchindex()
1501
            return qdb.analysis.Analysis(res[0][0]) if res else None
1502
1503
    @property
1504
    def merging_scheme(self):
1505
        """The merging scheme of this artifact_type
1506
1507
        Returns
1508
        -------
1509
        str, str
1510
            The human readable merging scheme and the parent software
1511
            information for this artifact
1512
        """
1513
        vid = qdb.util.convert_to_id(self.visibility, "visibility")
1514
        if vid in qdb.util.artifact_visibilities_to_skip():
1515
            with qdb.sql_connection.TRN:
1516
                sql = f"""SELECT archive_data
1517
                          FROM qiita.{self._table}
1518
                          WHERE artifact_id = %s"""
1519
                qdb.sql_connection.TRN.add(sql, [self.id])
1520
                archive_data = qdb.sql_connection.TRN.execute_fetchlast()
1521
            merging_schemes = [archive_data['merging_scheme'][0]]
1522
            parent_softwares = [archive_data['merging_scheme'][1]]
1523
        else:
1524
            processing_params = self.processing_parameters
1525
            if processing_params is None:
1526
                return '', ''
1527
1528
            cmd_name = processing_params.command.name
1529
            ms = processing_params.command.merging_scheme
1530
            afps = [x['fp'] for x in self.filepaths
1531
                    if x['fp'].endswith('biom')]
1532
1533
            merging_schemes = []
1534
            parent_softwares = []
1535
            # this loop is necessary as in theory an artifact can be
1536
            # generated from multiple prep info files
1537
            for p in self.parents:
1538
                pparent = p.processing_parameters
1539
                # if parent is None, then is a direct upload; for example
1540
                # per_sample_FASTQ in shotgun data
1541
                if pparent is None:
1542
                    parent_cmd_name = None
1543
                    parent_merging_scheme = None
1544
                    parent_pp = None
1545
                    parent_software = 'N/A'
1546
                else:
1547
                    parent_cmd_name = pparent.command.name
1548
                    parent_merging_scheme = pparent.command.merging_scheme
1549
                    parent_pp = pparent.values
1550
                    psoftware = pparent.command.software
1551
                    parent_software = '%s v%s' % (
1552
                        psoftware.name, psoftware.version)
1553
1554
                merging_schemes.append(qdb.util.human_merging_scheme(
1555
                    cmd_name, ms, parent_cmd_name, parent_merging_scheme,
1556
                    processing_params.values, afps, parent_pp))
1557
                parent_softwares.append(parent_software)
1558
1559
        return ', '.join(merging_schemes), ', '.join(parent_softwares)
1560
1561
    @property
1562
    def being_deleted_by(self):
1563
        """The running job that is deleting this artifact
1564
1565
        Returns
1566
        -------
1567
        qiita_db.processing_job.ProcessingJob
1568
            The running job that is deleting this artifact, None if it
1569
            doesn't exist
1570
        """
1571
1572
        with qdb.sql_connection.TRN:
1573
            sql = """
1574
                SELECT processing_job_id FROM qiita.artifact_processing_job
1575
                  LEFT JOIN qiita.processing_job using (processing_job_id)
1576
                  LEFT JOIN qiita.processing_job_status using (
1577
                    processing_job_status_id)
1578
                  LEFT JOIN qiita.software_command using (command_id)
1579
                  WHERE artifact_id = %s AND name = 'delete_artifact' AND
1580
                    processing_job_status in (
1581
                        'running', 'queued', 'in_construction')"""
1582
            qdb.sql_connection.TRN.add(sql, [self.id])
1583
            res = qdb.sql_connection.TRN.execute_fetchindex()
1584
        return qdb.processing_job.ProcessingJob(res[0][0]) if res else None
1585
1586
    @property
1587
    def has_human(self):
1588
        has_human = False
1589
        # we are going to check the metadata if:
1590
        # - the prep data_type is _not_ target gene
1591
        # - the prep is not current_human_filtering
1592
        # - if the artifact_type is 'per_sample_FASTQ'
1593
        pts = self.prep_templates
1594
        tgs = qdb.metadata_template.constants.TARGET_GENE_DATA_TYPES
1595
        ntg = any([pt.data_type() not in tgs for pt in pts])
1596
        chf = any([not pt.current_human_filtering for pt in pts])
1597
        if ntg and chf and self.artifact_type == 'per_sample_FASTQ':
1598
            st = self.study.sample_template
1599
            if 'env_package' in st.categories:
1600
                sql = f"""SELECT DISTINCT sample_values->>'env_package'
1601
                          FROM qiita.sample_{st.id} WHERE sample_id in (
1602
                              SELECT sample_id from qiita.preparation_artifact
1603
                              LEFT JOIN qiita.prep_template_sample USING (
1604
                                  prep_template_id)
1605
                              WHERE artifact_id = {self.id})"""
1606
                with qdb.sql_connection.TRN:
1607
                    qdb.sql_connection.TRN.add(sql)
1608
                    for v in qdb.sql_connection.TRN.execute_fetchflatten():
1609
                        # str is needed as v could be None
1610
                        if str(v).startswith('human-'):
1611
                            has_human = True
1612
                            break
1613
1614
        return has_human
1615
1616
    def jobs(self, cmd=None, status=None, show_hidden=False):
1617
        """Jobs that used this artifact as input
1618
1619
        Parameters
1620
        ----------
1621
        cmd : qiita_db.software.Command, optional
1622
            If provided, only jobs that executed this command will be returned
1623
        status : str, optional
1624
            If provided, only jobs in this status will be returned
1625
        show_hidden : bool, optional
1626
            If true, return also the "hidden" jobs
1627
1628
        Returns
1629
        -------
1630
        list of qiita_db.processing_job.ProcessingJob
1631
            The list of jobs that used this artifact as input
1632
        """
1633
        with qdb.sql_connection.TRN:
1634
            sql = """SELECT processing_job_id
1635
                     FROM qiita.artifact_processing_job
1636
                        JOIN qiita.processing_job USING (processing_job_id)
1637
                        JOIN qiita.processing_job_status
1638
                            USING (processing_job_status_id)
1639
                     WHERE artifact_id = %s"""
1640
            sql_args = [self.id]
1641
1642
            if cmd:
1643
                sql = "{} AND command_id = %s".format(sql)
1644
                sql_args.append(cmd.id)
1645
1646
            if status:
1647
                sql = "{} AND processing_job_status = %s".format(sql)
1648
                sql_args.append(status)
1649
1650
            if not show_hidden:
1651
                sql = "{} AND hidden = %s".format(sql)
1652
                sql_args.append(False)
1653
1654
            qdb.sql_connection.TRN.add(sql, sql_args)
1655
            return [qdb.processing_job.ProcessingJob(jid)
1656
                    for jid in qdb.sql_connection.TRN.execute_fetchflatten()]
1657
1658
    @property
1659
    def get_commands(self):
1660
        """Returns the active commands that can process this kind of artifact
1661
1662
        Returns
1663
        -------
1664
        list of qiita_db.software.Command
1665
            The commands that can process the given artifact tyoes
1666
        """
1667
        dws = []
1668
        with qdb.sql_connection.TRN:
1669
            # get all the possible commands
1670
            sql = """SELECT DISTINCT qiita.command_parameter.command_id
1671
                     FROM qiita.artifact
1672
                     JOIN qiita.parameter_artifact_type
1673
                        USING (artifact_type_id)
1674
                     JOIN qiita.command_parameter USING (command_parameter_id)
1675
                     JOIN qiita.software_command ON (
1676
                        qiita.command_parameter.command_id =
1677
                        qiita.software_command.command_id)
1678
                     WHERE artifact_id = %s AND active = True"""
1679
            if self.analysis is None:
1680
                sql += " AND is_analysis = False"
1681
                # get the workflows that match this artifact so we can filter
1682
                # the available commands based on the commands in the worflows
1683
                # for that artifact - except is the artifact_type == 'BIOM'
1684
                if self.artifact_type != 'BIOM':
1685
                    dws = [w for w in qdb.software.DefaultWorkflow.iter()
1686
                           if self.data_type in w.data_type]
1687
            else:
1688
                sql += " AND is_analysis = True"
1689
1690
            qdb.sql_connection.TRN.add(sql, [self.id])
1691
            cids = set(qdb.sql_connection.TRN.execute_fetchflatten())
1692
1693
            if dws:
1694
                cmds = {n.default_parameter.command.id
1695
                        for w in dws for n in w.graph.nodes}
1696
                cids = cmds & cids
1697
1698
            return [qdb.software.Command(cid) for cid in cids]
1699
1700
    @property
1701
    def human_reads_filter_method(self):
1702
        """The human_reads_filter_method of the artifact
1703
1704
        Returns
1705
        -------
1706
        str
1707
            The human_reads_filter_method name
1708
        """
1709
        with qdb.sql_connection.TRN:
1710
            sql = """SELECT human_reads_filter_method
1711
                     FROM qiita.artifact
1712
                     LEFT JOIN qiita.human_reads_filter_method
1713
                        USING (human_reads_filter_method_id)
1714
                     WHERE artifact_id = %s"""
1715
            qdb.sql_connection.TRN.add(sql, [self.id])
1716
            return qdb.sql_connection.TRN.execute_fetchlast()
1717
1718
    @human_reads_filter_method.setter
1719
    def human_reads_filter_method(self, value):
1720
        """Set the human_reads_filter_method of the artifact
1721
1722
        Parameters
1723
        ----------
1724
        value : str
1725
            The new artifact's human_reads_filter_method
1726
1727
        Raises
1728
        ------
1729
        ValueError
1730
            If `value` doesn't exist in the database
1731
        """
1732
        with qdb.sql_connection.TRN:
1733
            sql = """SELECT human_reads_filter_method_id
1734
                     FROM qiita.human_reads_filter_method
1735
                     WHERE human_reads_filter_method = %s"""
1736
            qdb.sql_connection.TRN.add(sql, [value])
1737
            idx = qdb.sql_connection.TRN.execute_fetchflatten()
1738
1739
            if len(idx) == 0:
1740
                raise ValueError(
1741
                    f'"{value}" is not a valid human_reads_filter_method')
1742
1743
            sql = """UPDATE qiita.artifact
1744
                     SET human_reads_filter_method_id = %s
1745
                     WHERE artifact_id = %s"""
1746
            qdb.sql_connection.TRN.add(sql, [idx[0], self.id])