Diff of /qiita_db/analysis.py [000000] .. [879b32]

Switch to unified view

a b/qiita_db/analysis.py
1
"""
2
Objects for dealing with Qiita analyses
3
4
This module provides the implementation of the Analysis and Collection classes.
5
6
Classes
7
-------
8
- `Analysis` -- A Qiita Analysis class
9
- `Collection` -- A Qiita Collection class for grouping multiple analyses
10
"""
11
12
# -----------------------------------------------------------------------------
13
# Copyright (c) 2014--, The Qiita Development Team.
14
#
15
# Distributed under the terms of the BSD 3-clause License.
16
#
17
# The full license is in the file LICENSE, distributed with this software.
18
# -----------------------------------------------------------------------------
19
from itertools import product
20
from os.path import join, exists
21
from os import mkdir
22
from collections import defaultdict
23
24
from biom import load_table
25
from biom.util import biom_open
26
from biom.exception import DisjointIDError
27
from re import sub
28
import pandas as pd
29
30
from qiita_core.exceptions import IncompetentQiitaDeveloperError
31
from qiita_core.qiita_settings import qiita_config
32
import qiita_db as qdb
33
from json import loads, dump
34
35
36
class Analysis(qdb.base.QiitaObject):
37
    """
38
    Analysis object to access to the Qiita Analysis information
39
40
    Attributes
41
    ----------
42
    owner
43
    name
44
    description
45
    samples
46
    data_types
47
    artifacts
48
    shared_with
49
    jobs
50
    pmid
51
52
    Methods
53
    -------
54
    has_access
55
    add_samples
56
    remove_samples
57
    share
58
    unshare
59
    build_files
60
    summary_data
61
    exists
62
    create
63
    delete
64
    add_artifact
65
    set_error
66
    """
67
68
    _table = "analysis"
69
    _portal_table = "analysis_portal"
70
    _analysis_id_column = 'analysis_id'
71
72
    @classmethod
73
    def iter(cls):
74
        """Iter over the analyses"""
75
        with qdb.sql_connection.TRN:
76
            sql = """SELECT DISTINCT analysis_id
77
                     FROM qiita.analysis
78
                     JOIN qiita.analysis_portal USING (analysis_id)
79
                     JOIN qiita.portal_type USING (portal_type_id)
80
                     WHERE portal = %s
81
                     ORDER BY analysis_id"""
82
            qdb.sql_connection.TRN.add(sql, [qiita_config.portal])
83
            aids = qdb.sql_connection.TRN.execute_fetchflatten()
84
85
        for aid in aids:
86
            yield cls(aid)
87
88
    @classmethod
89
    def get_by_status(cls, status):
90
        """Returns all Analyses with given status
91
92
        Parameters
93
        ----------
94
        status : str
95
            Status to search analyses for
96
97
        Returns
98
        -------
99
        set of Analysis
100
            All analyses in the database with the given status
101
        """
102
        with qdb.sql_connection.TRN:
103
            # Sandboxed analyses are the analyses that have not been started
104
            # and hence they don't have an artifact yet
105
            if status == 'sandbox':
106
                sql = """SELECT DISTINCT analysis
107
                         FROM qiita.analysis
108
                            JOIN qiita.analysis_portal USING (analysis_id)
109
                            JOIN qiita.portal_type USING (portal_type_id)
110
                         WHERE portal = %s AND analysis_id NOT IN (
111
                            SELECT analysis_id
112
                            FROM qiita.analysis_artifact)"""
113
                qdb.sql_connection.TRN.add(sql, [qiita_config.portal])
114
            else:
115
                sql = """SELECT DISTINCT analysis_id
116
                         FROM qiita.analysis_artifact
117
                            JOIN qiita.artifact USING (artifact_id)
118
                            JOIN qiita.visibility USING (visibility_id)
119
                            JOIN qiita.analysis_portal USING (analysis_id)
120
                            JOIN qiita.portal_type USING (portal_type_id)
121
                         WHERE visibility = %s AND portal = %s"""
122
                qdb.sql_connection.TRN.add(sql, [status, qiita_config.portal])
123
124
            return set(
125
                cls(aid)
126
                for aid in qdb.sql_connection.TRN.execute_fetchflatten())
127
128
    @classmethod
129
    def create(cls, owner, name, description, from_default=False,
130
               merge_duplicated_sample_ids=False, categories=None,
131
               reservation=None):
132
        """Creates a new analysis on the database
133
134
        Parameters
135
        ----------
136
        owner : User object
137
            The analysis' owner
138
        name : str
139
            Name of the analysis
140
        description : str
141
            Description of the analysis
142
        from_default : bool, optional
143
            If True, use the default analysis to populate selected samples.
144
            Default False.
145
        merge_duplicated_sample_ids : bool, optional
146
            If the duplicated sample ids in the selected studies should be
147
            merged or prepended with the artifact ids. False (default) prepends
148
            the artifact id
149
        categories : list of str, optional
150
            If not None, use _only_ these categories for the metaanalysis
151
        reservation : str
152
            The slurm reservation to asign to the analysis
153
154
        Returns
155
        -------
156
        qdb.analysis.Analysis
157
            The newly created analysis
158
        """
159
        with qdb.sql_connection.TRN:
160
            portal_id = qdb.util.convert_to_id(
161
                qiita_config.portal, 'portal_type', 'portal')
162
163
            # Create the row in the analysis table
164
            sql = """INSERT INTO qiita.{0}
165
                        (email, name, description)
166
                    VALUES (%s, %s, %s)
167
                    RETURNING analysis_id""".format(cls._table)
168
            qdb.sql_connection.TRN.add(
169
                sql, [owner.id, name, description])
170
            a_id = qdb.sql_connection.TRN.execute_fetchlast()
171
172
            if from_default:
173
                # Move samples into that new analysis
174
                dflt_id = owner.default_analysis.id
175
                sql = """UPDATE qiita.analysis_sample
176
                         SET analysis_id = %s
177
                         WHERE analysis_id = %s"""
178
                qdb.sql_connection.TRN.add(sql, [a_id, dflt_id])
179
180
            # Add to both QIITA and given portal (if not QIITA)
181
            sql = """INSERT INTO qiita.analysis_portal
182
                        (analysis_id, portal_type_id)
183
                     VALUES (%s, %s)"""
184
            args = [[a_id, portal_id]]
185
186
            if qiita_config.portal != 'QIITA':
187
                qp_id = qdb.util.convert_to_id(
188
                    'QIITA', 'portal_type', 'portal')
189
                args.append([a_id, qp_id])
190
            qdb.sql_connection.TRN.add(sql, args, many=True)
191
192
            instance = cls(a_id)
193
            if reservation is not None:
194
                instance.slurm_reservation = reservation
195
196
            # Once the analysis is created, we can create the mapping file and
197
            # the initial set of artifacts
198
            plugin = qdb.software.Software.from_name_and_version(
199
                'Qiita', 'alpha')
200
            cmd = plugin.get_command('build_analysis_files')
201
            params = qdb.software.Parameters.load(
202
                cmd, values_dict={
203
                    'analysis': a_id,
204
                    'merge_dup_sample_ids': merge_duplicated_sample_ids,
205
                    'categories': categories})
206
            job = qdb.processing_job.ProcessingJob.create(
207
                owner, params, True)
208
            sql = """INSERT INTO qiita.analysis_processing_job
209
                        (analysis_id, processing_job_id)
210
                     VALUES (%s, %s)"""
211
            qdb.sql_connection.TRN.add(sql, [a_id, job.id])
212
            qdb.sql_connection.TRN.execute()
213
214
        # Doing the submission outside of the transaction
215
        job.submit()
216
        return instance
217
218
    @classmethod
219
    def delete_analysis_artifacts(cls, _id):
220
        """Deletes the artifacts linked to an artifact and then the analysis
221
222
        Parameters
223
        ----------
224
        _id : int
225
            The analysis id
226
        """
227
        analysis = cls(_id)
228
        aids = [a.id for a in analysis.artifacts if not a.parents]
229
        aids.sort(reverse=True)
230
        for aid in aids:
231
            qdb.artifact.Artifact.delete(aid)
232
        cls.delete(analysis.id)
233
234
    @classmethod
235
    def delete(cls, _id):
236
        """Deletes an analysis
237
238
        Parameters
239
        ----------
240
        _id : int
241
            The analysis id
242
243
        Raises
244
        ------
245
        QiitaDBUnknownIDError
246
            If the analysis id doesn't exist
247
        """
248
        with qdb.sql_connection.TRN:
249
            # check if the analysis exist
250
            if not cls.exists(_id):
251
                raise qdb.exceptions.QiitaDBUnknownIDError(_id, "analysis")
252
253
            # Check if the analysis has any artifact
254
            sql = """SELECT EXISTS(SELECT *
255
                                   FROM qiita.analysis_artifact
256
                                   WHERE analysis_id = %s)"""
257
            qdb.sql_connection.TRN.add(sql, [_id])
258
            if qdb.sql_connection.TRN.execute_fetchlast():
259
                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
260
                    "Can't delete analysis %d, has artifacts attached"
261
                    % _id)
262
263
            sql = "DELETE FROM qiita.analysis_filepath WHERE {0} = %s".format(
264
                cls._analysis_id_column)
265
            args = [_id]
266
            qdb.sql_connection.TRN.add(sql, args)
267
268
            sql = "DELETE FROM qiita.analysis_portal WHERE {0} = %s".format(
269
                cls._analysis_id_column)
270
            qdb.sql_connection.TRN.add(sql, args)
271
272
            sql = "DELETE FROM qiita.analysis_sample WHERE {0} = %s".format(
273
                cls._analysis_id_column)
274
            qdb.sql_connection.TRN.add(sql, args)
275
276
            sql = """DELETE FROM qiita.analysis_processing_job
277
                     WHERE {0} = %s""".format(cls._analysis_id_column)
278
            qdb.sql_connection.TRN.add(sql, args)
279
280
            # TODO: issue #1176
281
282
            sql = """DELETE FROM qiita.{0} WHERE {1} = %s""".format(
283
                cls._table, cls._analysis_id_column)
284
            qdb.sql_connection.TRN.add(sql, args)
285
286
            qdb.sql_connection.TRN.execute()
287
288
    @classmethod
289
    def exists(cls, analysis_id):
290
        r"""Checks if the given analysis _id exists
291
292
        Parameters
293
        ----------
294
        analysis_id : int
295
            The id of the analysis we are searching for
296
297
        Returns
298
        -------
299
        bool
300
            True if exists, false otherwise.
301
        """
302
        with qdb.sql_connection.TRN:
303
            sql = """SELECT EXISTS(
304
                        SELECT *
305
                        FROM qiita.{0}
306
                            JOIN qiita.analysis_portal USING (analysis_id)
307
                            JOIN qiita.portal_type USING (portal_type_id)
308
                        WHERE {1}=%s
309
                            AND portal=%s)""".format(cls._table,
310
                                                     cls._analysis_id_column)
311
            qdb.sql_connection.TRN.add(sql, [analysis_id, qiita_config.portal])
312
            return qdb.sql_connection.TRN.execute_fetchlast()
313
314
    @property
315
    def owner(self):
316
        """The owner of the analysis
317
318
        Returns
319
        -------
320
        qiita_db.user.User
321
            The owner of the Analysis
322
        """
323
        with qdb.sql_connection.TRN:
324
            sql = "SELECT email FROM qiita.{0} WHERE analysis_id = %s".format(
325
                self._table)
326
            qdb.sql_connection.TRN.add(sql, [self._id])
327
            return qdb.user.User(qdb.sql_connection.TRN.execute_fetchlast())
328
329
    @property
330
    def name(self):
331
        """The name of the analysis
332
333
        Returns
334
        -------
335
        str
336
            Name of the Analysis
337
        """
338
        with qdb.sql_connection.TRN:
339
            sql = "SELECT name FROM qiita.{0} WHERE analysis_id = %s".format(
340
                self._table)
341
            qdb.sql_connection.TRN.add(sql, [self._id])
342
            return qdb.sql_connection.TRN.execute_fetchlast()
343
344
    @property
345
    def _portals(self):
346
        """The portals used to create the analysis
347
348
        Returns
349
        -------
350
        str
351
            Name of the portal
352
        """
353
        with qdb.sql_connection.TRN:
354
            sql = """SELECT portal
355
                     FROM qiita.analysis_portal
356
                        JOIN qiita.portal_type USING (portal_type_id)
357
                     WHERE analysis_id = %s"""
358
            qdb.sql_connection.TRN.add(sql, [self._id])
359
            return qdb.sql_connection.TRN.execute_fetchflatten()
360
361
    @property
362
    def timestamp(self):
363
        """The timestamp of the analysis
364
365
        Returns
366
        -------
367
        datetime
368
            Timestamp of the Analysis
369
        """
370
        with qdb.sql_connection.TRN:
371
            sql = """SELECT timestamp FROM qiita.{0}
372
                     WHERE analysis_id = %s""".format(self._table)
373
            qdb.sql_connection.TRN.add(sql, [self._id])
374
            return qdb.sql_connection.TRN.execute_fetchlast()
375
376
    @property
377
    def description(self):
378
        """Returns the description of the analysis"""
379
        with qdb.sql_connection.TRN:
380
            sql = """SELECT description FROM qiita.{0}
381
                     WHERE analysis_id = %s""".format(self._table)
382
            qdb.sql_connection.TRN.add(sql, [self._id])
383
            return qdb.sql_connection.TRN.execute_fetchlast()
384
385
    @description.setter
386
    def description(self, description):
387
        """Changes the description of the analysis
388
389
        Parameters
390
        ----------
391
        description : str
392
            New description for the analysis
393
394
        Raises
395
        ------
396
        QiitaDBStatusError
397
            Analysis is public
398
        """
399
        sql = """UPDATE qiita.{0} SET description = %s
400
                 WHERE analysis_id = %s""".format(self._table)
401
        qdb.sql_connection.perform_as_transaction(sql, [description, self._id])
402
403
    @property
404
    def samples(self):
405
        """The artifact and samples attached to the analysis
406
407
        Returns
408
        -------
409
        dict
410
            Format is {artifact_id: [sample_id, sample_id, ...]}
411
        """
412
        with qdb.sql_connection.TRN:
413
            sql = """SELECT artifact_id, array_agg(
414
                        sample_id ORDER BY sample_id)
415
                     FROM qiita.analysis_sample
416
                     WHERE analysis_id = %s
417
                     GROUP BY artifact_id"""
418
            qdb.sql_connection.TRN.add(sql, [self._id])
419
            return dict(qdb.sql_connection.TRN.execute_fetchindex())
420
421
    @property
422
    def data_types(self):
423
        """Returns all data types used in the analysis
424
425
        Returns
426
        -------
427
        list of str
428
            Data types in the analysis
429
        """
430
        with qdb.sql_connection.TRN:
431
            sql = """SELECT DISTINCT data_type
432
                     FROM qiita.data_type
433
                        JOIN qiita.artifact USING (data_type_id)
434
                        JOIN qiita.analysis_sample USING (artifact_id)
435
                     WHERE analysis_id = %s
436
                     ORDER BY data_type"""
437
            qdb.sql_connection.TRN.add(sql, [self._id])
438
            return qdb.sql_connection.TRN.execute_fetchflatten()
439
440
    @property
441
    def shared_with(self):
442
        """The user the analysis is shared with
443
444
        Returns
445
        -------
446
        list of int
447
            User ids analysis is shared with
448
        """
449
        with qdb.sql_connection.TRN:
450
            sql = """SELECT email FROM qiita.analysis_users
451
                     WHERE analysis_id = %s"""
452
            qdb.sql_connection.TRN.add(sql, [self._id])
453
            return [qdb.user.User(uid)
454
                    for uid in qdb.sql_connection.TRN.execute_fetchflatten()]
455
456
    @property
457
    def artifacts(self):
458
        with qdb.sql_connection.TRN:
459
            sql = """SELECT artifact_id
460
                     FROM qiita.analysis_artifact
461
                     WHERE analysis_id = %s"""
462
            qdb.sql_connection.TRN.add(sql, [self.id])
463
            return [qdb.artifact.Artifact(aid)
464
                    for aid in qdb.sql_connection.TRN.execute_fetchflatten()]
465
466
    @property
467
    def mapping_file(self):
468
        """Returns the mapping file for the analysis
469
470
        Returns
471
        -------
472
        int or None
473
            The filepath id of the analysis mapping file or None
474
            if not generated
475
        """
476
        fp = [x['fp_id'] for x in qdb.util.retrieve_filepaths(
477
                "analysis_filepath", "analysis_id", self._id)
478
              if x['fp_type'] == 'plain_text']
479
480
        if fp:
481
            # returning the actual filepath id vs. an array
482
            return fp[0]
483
        else:
484
            return None
485
486
    @property
487
    def metadata_categories(self):
488
        """Returns all metadata categories in the current analyses based
489
           on the available studies
490
491
        Returns
492
        -------
493
        dict of dict
494
            a dict with study_id as the key & the values are another dict with
495
            'sample' & 'prep' as keys and the metadata categories as values
496
        """
497
        ST = qdb.metadata_template.sample_template.SampleTemplate
498
        PT = qdb.metadata_template.prep_template.PrepTemplate
499
        with qdb.sql_connection.TRN:
500
            sql = """SELECT DISTINCT study_id, artifact_id
501
                     FROM qiita.analysis_sample
502
                     LEFT JOIN qiita.study_artifact USING (artifact_id)
503
                     WHERE analysis_id = %s"""
504
            qdb.sql_connection.TRN.add(sql, [self._id])
505
506
            metadata = defaultdict(dict)
507
            for sid, aid in qdb.sql_connection.TRN.execute_fetchindex():
508
                if sid not in metadata:
509
                    metadata[sid]['sample'] = set(ST(sid).categories)
510
                    metadata[sid]['prep'] = set()
511
                for pt in qdb.artifact.Artifact(aid).prep_templates:
512
                    metadata[sid]['prep'] = metadata[sid]['prep'] | set(
513
                        PT(pt.id).categories)
514
515
        return metadata
516
517
    @property
518
    def tgz(self):
519
        """Returns the tgz file of the analysis
520
521
        Returns
522
        -------
523
        str or None
524
            full filepath to the mapping file or None if not generated
525
        """
526
        fp = [x['fp'] for x in qdb.util.retrieve_filepaths(
527
            "analysis_filepath", "analysis_id", self._id)
528
            if x['fp_type'] == 'tgz']
529
530
        if fp:
531
            # returning the actual path vs. an array
532
            return fp[0]
533
        else:
534
            return None
535
536
    @property
537
    def jobs(self):
538
        """The jobs generating the initial artifacts for the analysis
539
540
        Returns
541
        -------
542
        list of qiita_db.processing_job.Processing_job
543
            Job ids for jobs in analysis. Empty list if no jobs attached.
544
        """
545
        with qdb.sql_connection.TRN:
546
            sql = """SELECT processing_job_id
547
                     FROM qiita.analysis_processing_job
548
                     WHERE analysis_id = %s"""
549
            qdb.sql_connection.TRN.add(sql, [self._id])
550
            return [qdb.processing_job.ProcessingJob(jid)
551
                    for jid in qdb.sql_connection.TRN.execute_fetchflatten()]
552
553
    @property
554
    def pmid(self):
555
        """Returns pmid attached to the analysis
556
557
        Returns
558
        -------
559
        str or None
560
            returns the PMID or None if none is attached
561
        """
562
        with qdb.sql_connection.TRN:
563
            sql = "SELECT pmid FROM qiita.{0} WHERE analysis_id = %s".format(
564
                self._table)
565
            qdb.sql_connection.TRN.add(sql, [self._id])
566
            return qdb.sql_connection.TRN.execute_fetchlast()
567
568
    @pmid.setter
569
    def pmid(self, pmid):
570
        """adds pmid to the analysis
571
572
        Parameters
573
        ----------
574
        pmid: str
575
            pmid to set for study
576
577
        Raises
578
        ------
579
        QiitaDBStatusError
580
            Analysis is public
581
582
        Notes
583
        -----
584
        An analysis should only ever have one PMID attached to it.
585
        """
586
        sql = """UPDATE qiita.{0} SET pmid = %s
587
                 WHERE analysis_id = %s""".format(self._table)
588
        qdb.sql_connection.perform_as_transaction(sql, [pmid, self._id])
589
590
    @property
591
    def can_be_publicized(self):
592
        """Returns whether the analysis can be made public
593
594
        Returns
595
        -------
596
        bool
597
            Whether the analysis can be publicized
598
        list
599
            A list of not public (private) artifacts
600
        """
601
        # The analysis can be made public if all the artifacts used
602
        # to get the samples from are public
603
        with qdb.sql_connection.TRN:
604
            non_public = []
605
            sql = """SELECT DISTINCT artifact_id
606
                     FROM qiita.analysis_sample
607
                     WHERE analysis_id = %s
608
                     ORDER BY artifact_id"""
609
            qdb.sql_connection.TRN.add(sql, [self.id])
610
            for aid in qdb.sql_connection.TRN.execute_fetchflatten():
611
                if qdb.artifact.Artifact(aid).visibility != 'public':
612
                    non_public.append(aid)
613
614
            return (non_public == [], non_public)
615
616
    @property
617
    def is_public(self):
618
        """Returns if the analysis is public
619
620
        Returns
621
        -------
622
        bool
623
            If the analysis is public
624
        """
625
        with qdb.sql_connection.TRN:
626
            # getting all root artifacts / command_id IS NULL
627
            sql = """SELECT DISTINCT visibility
628
                     FROM qiita.analysis_artifact
629
                     LEFT JOIN qiita.artifact USING (artifact_id)
630
                     LEFT JOIN qiita.visibility USING (visibility_id)
631
                     WHERE analysis_id = %s AND command_id IS NULL"""
632
            qdb.sql_connection.TRN.add(sql, [self.id])
633
            visibilities = set(qdb.sql_connection.TRN.execute_fetchflatten())
634
635
            return visibilities == {'public'}
636
637
    def make_public(self):
638
        """Makes an analysis public
639
640
        Raises
641
        ------
642
        ValueError
643
            If can_be_publicized is not true
644
        """
645
        with qdb.sql_connection.TRN:
646
            can_be_publicized, non_public = self.can_be_publicized
647
            if not can_be_publicized:
648
                raise ValueError('Not all artifacts that generated this '
649
                                 'analysis are public: %s' % ', '.join(
650
                                     map(str, non_public)))
651
652
            # getting all root artifacts / command_id IS NULL
653
            sql = """SELECT artifact_id
654
                     FROM qiita.analysis_artifact
655
                     LEFT JOIN qiita.artifact USING (artifact_id)
656
                     WHERE analysis_id = %s AND command_id IS NULL"""
657
            qdb.sql_connection.TRN.add(sql, [self.id])
658
            aids = qdb.sql_connection.TRN.execute_fetchflatten()
659
            for aid in aids:
660
                qdb.artifact.Artifact(aid).visibility = 'public'
661
662
    def add_artifact(self, artifact):
663
        """Adds an artifact to the analysis
664
665
        Parameters
666
        ----------
667
        artifact : qiita_db.artifact.Artifact
668
            The artifact to be added
669
        """
670
        with qdb.sql_connection.TRN:
671
            sql = """INSERT INTO qiita.analysis_artifact
672
                        (analysis_id, artifact_id)
673
                     SELECT %s, %s
674
                     WHERE NOT EXISTS(SELECT *
675
                                      FROM qiita.analysis_artifact
676
                                      WHERE analysis_id = %s
677
                                        AND artifact_id = %s)"""
678
            qdb.sql_connection.TRN.add(sql, [self.id, artifact.id,
679
                                             self.id, artifact.id])
680
681
    def set_error(self, error_msg):
682
        """Sets the analysis error
683
684
        Parameters
685
        ----------
686
        error_msg : str
687
            The error message
688
        """
689
        le = qdb.logger.LogEntry.create('Runtime', error_msg)
690
        sql = """UPDATE qiita.analysis
691
                 SET logging_id = %s
692
                 WHERE analysis_id = %s"""
693
        qdb.sql_connection.perform_as_transaction(sql, [le.id, self.id])
694
695
    def has_access(self, user):
696
        """Returns whether the given user has access to the analysis
697
698
        Parameters
699
        ----------
700
        user : User object
701
            User we are checking access for
702
703
        Returns
704
        -------
705
        bool
706
            Whether user has access to analysis or not
707
        """
708
        with qdb.sql_connection.TRN:
709
            # if admin or superuser, just return true
710
            if user.level in {'superuser', 'admin'}:
711
                return True
712
713
            return self in Analysis.get_by_status('public') | \
714
                user.private_analyses | user.shared_analyses
715
716
    def can_edit(self, user):
717
        """Returns whether the given user can edit the analysis
718
719
        Parameters
720
        ----------
721
        user : User object
722
            User we are checking edit permissions for
723
724
        Returns
725
        -------
726
        bool
727
            Whether user can edit the study or not
728
        """
729
        # The analysis is editable only if the user is the owner, is in the
730
        # shared list or the user is an admin
731
        return (user.level in {'superuser', 'admin'} or self.owner == user or
732
                user in self.shared_with)
733
734
    def summary_data(self):
735
        """Return number of studies, artifacts, and samples selected
736
737
        Returns
738
        -------
739
        dict
740
            counts keyed to their relevant type
741
        """
742
        with qdb.sql_connection.TRN:
743
            sql = """SELECT
744
                        COUNT(DISTINCT study_id) as studies,
745
                        COUNT(DISTINCT artifact_id) as artifacts,
746
                        COUNT(DISTINCT sample_id) as samples
747
                    FROM qiita.study_artifact
748
                        JOIN qiita.analysis_sample USING (artifact_id)
749
                    WHERE analysis_id = %s"""
750
            qdb.sql_connection.TRN.add(sql, [self._id])
751
            return dict(qdb.sql_connection.TRN.execute_fetchindex()[0])
752
753
    def share(self, user):
754
        """Share the analysis with another user
755
756
        Parameters
757
        ----------
758
        user: User object
759
            The user to share the analysis with
760
        """
761
        # Make sure the analysis is not already shared with the given user
762
        if user.id == self.owner or user.id in self.shared_with:
763
            return
764
765
        sql = """INSERT INTO qiita.analysis_users (analysis_id, email)
766
                 VALUES (%s, %s)"""
767
        qdb.sql_connection.perform_as_transaction(sql, [self._id, user.id])
768
769
    def unshare(self, user):
770
        """Unshare the analysis with another user
771
772
        Parameters
773
        ----------
774
        user: User object
775
            The user to unshare the analysis with
776
        """
777
        sql = """DELETE FROM qiita.analysis_users
778
                 WHERE analysis_id = %s AND email = %s"""
779
        qdb.sql_connection.perform_as_transaction(sql, [self._id, user.id])
780
781
    def _lock_samples(self):
782
        """Only dflt analyses can have samples added/removed
783
784
        Raises
785
        ------
786
        qiita_db.exceptions.QiitaDBOperationNotPermittedError
787
            If the analysis is not a default analysis
788
        """
789
        with qdb.sql_connection.TRN:
790
            sql = "SELECT dflt FROM qiita.analysis WHERE analysis_id = %s"
791
            qdb.sql_connection.TRN.add(sql, [self.id])
792
            if not qdb.sql_connection.TRN.execute_fetchlast():
793
                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
794
                    "Can't add/remove samples from this analysis")
795
796
    def add_samples(self, samples):
797
        """Adds samples to the analysis
798
799
        Parameters
800
        ----------
801
        samples : dictionary of lists
802
            samples and the artifact id they come from in form
803
            {artifact_id: [sample1, sample2, ...], ...}
804
        """
805
        with qdb.sql_connection.TRN:
806
            self._lock_samples()
807
808
            for aid, samps in samples.items():
809
                # get previously selected samples for aid and filter them out
810
                sql = """SELECT sample_id
811
                         FROM qiita.analysis_sample
812
                         WHERE artifact_id = %s AND analysis_id = %s"""
813
                qdb.sql_connection.TRN.add(sql, [aid, self._id])
814
                prev_selected = qdb.sql_connection.TRN.execute_fetchflatten()
815
816
                select = set(samps).difference(prev_selected)
817
                sql = """INSERT INTO qiita.analysis_sample
818
                            (analysis_id, artifact_id, sample_id)
819
                         VALUES (%s, %s, %s)"""
820
                args = [[self._id, aid, s] for s in select]
821
                qdb.sql_connection.TRN.add(sql, args, many=True)
822
                qdb.sql_connection.TRN.execute()
823
824
    def remove_samples(self, artifacts=None, samples=None):
825
        """Removes samples from the analysis
826
827
        Parameters
828
        ----------
829
        artifacts : list, optional
830
            Artifacts to remove, default None
831
        samples : list, optional
832
            sample ids to remove, default None
833
834
        Notes
835
        -----
836
        - When only a list of samples given, the samples will be removed from
837
          all artifacts it is associated with
838
        - When only a list of artifacts is given, all samples associated with
839
          that artifact are removed
840
        - If both are passed, the given samples are removed from the given
841
          artifacts
842
        """
843
        with qdb.sql_connection.TRN:
844
            self._lock_samples()
845
            if artifacts and samples:
846
                sql = """DELETE FROM qiita.analysis_sample
847
                         WHERE analysis_id = %s
848
                            AND artifact_id = %s
849
                            AND sample_id = %s"""
850
                # Build the SQL arguments to remove the samples of the
851
                # given artifacts.
852
                args = [[self._id, a.id, s]
853
                        for a, s in product(artifacts, samples)]
854
            elif artifacts:
855
                sql = """DELETE FROM qiita.analysis_sample
856
                         WHERE analysis_id = %s AND artifact_id = %s"""
857
                args = [[self._id, a.id] for a in artifacts]
858
            elif samples:
859
                sql = """DELETE FROM qiita.analysis_sample
860
                         WHERE analysis_id = %s AND sample_id = %s"""
861
                args = [[self._id, s] for s in samples]
862
            else:
863
                raise IncompetentQiitaDeveloperError(
864
                    "Must provide list of samples and/or proc_data for "
865
                    "removal")
866
867
            qdb.sql_connection.TRN.add(sql, args, many=True)
868
            qdb.sql_connection.TRN.execute()
869
870
    def build_files(self, merge_duplicated_sample_ids, categories=None):
871
        """Builds biom and mapping files needed for analysis
872
873
        Parameters
874
        ----------
875
        merge_duplicated_sample_ids : bool
876
            If the duplicated sample ids in the selected studies should be
877
            merged or prepended with the artifact ids. If false prepends
878
            the artifact id
879
        categories : set of str, optional
880
            If not None, use _only_ these categories for the metaanalysis
881
882
        Notes
883
        -----
884
        Creates biom tables for each requested data type
885
        Creates mapping file for requested samples
886
        """
887
        with qdb.sql_connection.TRN:
888
            # in practice we could retrieve samples in each of the following
889
            # calls but this will mean calling the DB multiple times and will
890
            # make testing much harder as we will need to have analyses at
891
            # different stages and possible errors.
892
            samples = self.samples
893
            # retrieving all info on artifacts to save SQL time
894
            bioms_info = qdb.util.get_artifacts_information(samples.keys())
895
896
            # figuring out if we are going to have duplicated samples, again
897
            # doing it here cause it's computationally cheaper
898
            # 1. merge samples per: data_type, reference used and
899
            # the command id
900
            # Note that grouped_samples is basically how many biom tables we
901
            # are going to create
902
            grouped_samples = {}
903
904
            # post_processing_cmds is a list of dictionaries, each describing
905
            # an operation to be performed on the final merged BIOM. The order
906
            # of operations will be list-order. Thus, in the case that
907
            # multiple post_processing_cmds are implemented, ensure proper
908
            # order before passing off to _build_biom_tables().
909
            post_processing_cmds = dict()
910
            for aid, asamples in samples.items():
911
                # find the artifact info, [0] there should be only one info
912
                ainfo = [bi for bi in bioms_info
913
                         if bi['artifact_id'] == aid][0]
914
                data_type = ainfo['data_type']
915
916
                # ainfo['algorithm'] is the original merging scheme
917
                label = "%s || %s" % (data_type, ainfo['algorithm'])
918
                if label not in grouped_samples:
919
                    aparams = qdb.artifact.Artifact(aid).processing_parameters
920
                    if aparams is not None:
921
                        cmd = aparams.command.post_processing_cmd
922
                        if cmd is not None:
923
                            # preserve label, in case it's needed.
924
                            merging_scheme = sub(
925
                                ', BIOM: [0-9a-zA-Z-.]+', '',
926
                                ainfo['algorithm'])
927
                            post_processing_cmds[ainfo['algorithm']] = (
928
                                merging_scheme, cmd)
929
                    grouped_samples[label] = []
930
                grouped_samples[label].append((aid, asamples))
931
932
            # We need to negate merge_duplicated_sample_ids because in
933
            # _build_mapping_file is acually rename: merge yes == rename no
934
            rename_dup_samples = not merge_duplicated_sample_ids
935
            self._build_mapping_file(
936
                samples, rename_dup_samples, categories=categories)
937
938
            if post_processing_cmds:
939
                biom_files = self._build_biom_tables(
940
                                    grouped_samples,
941
                                    rename_dup_samples,
942
                                    post_processing_cmds=post_processing_cmds)
943
            else:
944
                # preserve the legacy path
945
                biom_files = self._build_biom_tables(
946
                                                    grouped_samples,
947
                                                    rename_dup_samples)
948
949
            # if post_processing_cmds exists, biom_files will be a triplet,
950
            # instead of a pair; the final element in the tuple will be an
951
            # file path to the new phylogenetic tree.
952
            return biom_files
953
954
    def _build_biom_tables(self,
955
                           grouped_samples,
956
                           rename_dup_samples=False,
957
                           post_processing_cmds=None):
958
        """Build tables and add them to the analysis"""
959
        with qdb.sql_connection.TRN:
960
            # creating per analysis output folder
961
            _, base_fp = qdb.util.get_mountpoint(self._table)[0]
962
            base_fp = join(base_fp, 'analysis_%d' % self.id)
963
            if not exists(base_fp):
964
                mkdir(base_fp)
965
966
            biom_files = []
967
            for label, tables in grouped_samples.items():
968
969
                data_type, algorithm = [
970
                    line.strip() for line in label.split('||')]
971
972
                new_table = None
973
                artifact_ids = []
974
                for aid, samples in tables:
975
                    artifact = qdb.artifact.Artifact(aid)
976
                    artifact_ids.append(str(aid))
977
978
                    # the next loop is assuming that an artifact can have only
979
                    # one biom, which is a safe assumption until we generate
980
                    # artifacts from multiple bioms and even then we might
981
                    # only have one biom
982
                    biom_table_fp = None
983
                    for x in artifact.filepaths:
984
                        if x['fp_type'] == 'biom':
985
                            biom_table_fp = x['fp']
986
                            break
987
                    if not biom_table_fp:
988
                        raise RuntimeError(
989
                            "Artifact %s does not have a biom table associated"
990
                            % aid)
991
992
                    # loading the found biom table
993
                    biom_table = load_table(biom_table_fp)
994
                    # filtering samples to keep those selected by the user
995
                    biom_table_samples = set(biom_table.ids())
996
                    selected_samples = biom_table_samples.intersection(samples)
997
                    biom_table.filter(selected_samples, axis='sample',
998
                                      inplace=True)
999
                    if len(biom_table.ids()) == 0:
1000
                        continue
1001
1002
                    if rename_dup_samples:
1003
                        ids_map = {_id: "%d.%s" % (aid, _id)
1004
                                   for _id in biom_table.ids()}
1005
                        biom_table.update_ids(ids_map, 'sample', True, True)
1006
1007
                    if new_table is None:
1008
                        new_table = biom_table
1009
                    else:
1010
                        try:
1011
                            new_table = new_table.concat([biom_table])
1012
                        except DisjointIDError:
1013
                            new_table = new_table.merge(biom_table)
1014
1015
                if not new_table or len(new_table.ids()) == 0:
1016
                    # if we get to this point the only reason for failure is
1017
                    # rarefaction
1018
                    raise RuntimeError("All samples filtered out from "
1019
                                       "analysis due to rarefaction level")
1020
1021
                # write out the file
1022
                # data_type and algorithm values become part of the file
1023
                # name(s).
1024
                info = "%s_%s" % (
1025
                    sub('[^0-9a-zA-Z]+', '', data_type),
1026
                    sub('[^0-9a-zA-Z]+', '', algorithm))
1027
                fn = "%d_analysis_%s.biom" % (self._id, info)
1028
                biom_fp = join(base_fp, fn)
1029
                # save final biom here
1030
                with biom_open(biom_fp, 'w') as f:
1031
                    new_table.to_hdf5(
1032
                        f, "Generated by Qiita, analysis id: %d, info: %s" % (
1033
                            self._id, label))
1034
1035
                # let's add the regular biom without post processing
1036
                biom_files.append((data_type, biom_fp, None))
1037
1038
                # post_processing_cmds can be None, default, or a dict of
1039
                # algorithm: merging_scheme, command
1040
                if (post_processing_cmds is not None and
1041
                        algorithm in post_processing_cmds):
1042
                    merging_scheme, pp_cmd = post_processing_cmds[algorithm]
1043
                    # assuming all commands require archives, obtain
1044
                    # archives once, instead of for every cmd.
1045
                    features = load_table(biom_fp).ids(axis='observation')
1046
                    features = list(features)
1047
                    archives = qdb.archive.Archive.retrieve_feature_values(
1048
                        archive_merging_scheme=merging_scheme,
1049
                        features=features)
1050
1051
                    # remove archives that SEPP could not match
1052
                    archives = {f: loads(archives[f])
1053
                                for f, plc
1054
                                in archives.items()
1055
                                if plc != ''}
1056
1057
                    # since biom_fp uses base_fp as its location, assume it's
1058
                    # suitable for other files as well.
1059
                    output_dir = join(base_fp, info)
1060
                    if not exists(output_dir):
1061
                        mkdir(output_dir)
1062
1063
                    fp_archive = join(output_dir,
1064
                                      'archive_%d.json' % (self._id))
1065
1066
                    with open(fp_archive, 'w') as out_file:
1067
                        dump(archives, out_file)
1068
1069
                    # assume archives file is passed as:
1070
                    # --fp_archive=<path_to_archives_file>
1071
                    # assume output dir is passed as:
1072
                    # --output_dir=<path_to_output_dir>
1073
                    # assume input biom file is passed as:
1074
                    # --fp_biom=<path_to_biom_file>
1075
1076
                    # concatenate any other parameters into a string
1077
                    params = ' '.join(["%s=%s" % (k, v) for k, v in
1078
                                      pp_cmd['script_params'].items()])
1079
1080
                    # append archives file and output dir parameters
1081
                    params = ("%s --fp_biom=%s --fp_archive=%s "
1082
                              "--output_dir=%s" % (
1083
                                  params, biom_fp, fp_archive, output_dir))
1084
1085
                    # if environment is successfully activated,
1086
                    # run script with parameters
1087
                    # script_env e.g.: 'deactivate; source activate qiita'
1088
                    # script_path e.g.:
1089
                    # python 'qiita_db/test/support_files/worker.py'
1090
                    cmd = "%s %s %s" % (
1091
                        pp_cmd['script_env'], pp_cmd['script_path'], params)
1092
                    p_out, p_err, rv = qdb.processing_job._system_call(cmd)
1093
                    p_out = p_out.rstrip()
1094
                    # based on the set of commands ran, we could get a
1095
                    # rv !=0 but still have a successful return from the
1096
                    # command, thus checking both rv and p_out. Note that
1097
                    # p_out will return either an error message or
1098
                    # the file path to the new tree, depending on p's
1099
                    # return code.
1100
                    if rv != 0:
1101
                        raise ValueError('Error %d: %s' % (rv, p_err))
1102
                    p_out = loads(p_out)
1103
1104
                    if p_out['archive'] is not None:
1105
                        biom_files.append(
1106
                            (data_type, p_out['biom'], p_out['archive']))
1107
1108
        # return the biom files, either with or without needed tree, to
1109
        # the user.
1110
        return biom_files
1111
1112
    def _build_mapping_file(self, samples, rename_dup_samples=False,
1113
                            categories=None):
1114
        """Builds the combined mapping file for all samples
1115
           Code modified slightly from qiime.util.MetadataMap.__add__"""
1116
        with qdb.sql_connection.TRN:
1117
            all_ids = set()
1118
            to_concat = []
1119
            sample_infos = dict()
1120
            for aid, samps in samples.items():
1121
                artifact = qdb.artifact.Artifact(aid)
1122
                si = artifact.study.sample_template
1123
                if si not in sample_infos:
1124
                    si_df = si.to_dataframe()
1125
                    if categories is not None:
1126
                        si_df = si_df[list(set(categories) &
1127
                                      set(si_df.columns))]
1128
                    sample_infos[si] = si_df
1129
                pt = artifact.prep_templates[0]
1130
                pt_df = pt.to_dataframe()
1131
                if categories is not None:
1132
                    pt_df = pt_df[list(set(categories) &
1133
                                       set(pt_df.columns))]
1134
1135
                qm = pt_df.join(sample_infos[si], lsuffix="_prep")
1136
1137
                # if we are not going to merge the duplicated samples
1138
                # append the aid to the sample name
1139
                qm['qiita_artifact_id'] = aid
1140
                qm['qiita_prep_deprecated'] = pt.deprecated
1141
                if rename_dup_samples:
1142
                    qm['original_SampleID'] = qm.index
1143
                    qm['#SampleID'] = "%d." % aid + qm.index
1144
                    samps = set(['%d.%s' % (aid, _id) for _id in samps])
1145
                    qm.set_index('#SampleID', inplace=True, drop=True)
1146
                else:
1147
                    samps = set(samps) - all_ids
1148
                    all_ids.update(samps)
1149
1150
                # appending study metadata to the analysis
1151
                study = qdb.artifact.Artifact(aid).study
1152
                study_owner = study.owner
1153
                study_info = study.info
1154
                pi = study_info['principal_investigator']
1155
                qm['qiita_study_title'] = study.title
1156
                qm['qiita_study_alias'] = study.info['study_alias']
1157
                qm['qiita_owner'] = study_owner.info['name']
1158
                qm['qiita_principal_investigator'] = pi.name
1159
1160
                qm = qm.loc[list(samps)]
1161
                to_concat.append(qm)
1162
1163
            merged_map = pd.concat(to_concat)
1164
1165
            # Save the mapping file
1166
            _, base_fp = qdb.util.get_mountpoint(self._table)[0]
1167
            mapping_fp = join(base_fp, "%d_analysis_mapping.txt" % self._id)
1168
            merged_map.to_csv(mapping_fp, index_label='#SampleID',
1169
                              na_rep='unknown', sep='\t', encoding='utf-8')
1170
1171
            self._add_file("%d_analysis_mapping.txt" % self._id, "plain_text")
1172
1173
    def _add_file(self, filename, filetype, data_type=None):
1174
        """adds analysis item to database
1175
1176
        Parameters
1177
        ----------
1178
        filename : str
1179
            filename to add to analysis
1180
        filetype : {plain_text, biom}
1181
        data_type : str, optional
1182
        """
1183
        with qdb.sql_connection.TRN:
1184
            filetype_id = qdb.util.convert_to_id(filetype, 'filepath_type')
1185
            _, mp = qdb.util.get_mountpoint('analysis')[0]
1186
            fpid = qdb.util.insert_filepaths([
1187
                (join(mp, filename), filetype_id)], -1, 'analysis',
1188
                move_files=False)[0]
1189
1190
            col = ""
1191
            dtid = ""
1192
            if data_type:
1193
                col = ", data_type_id"
1194
                dtid = ", %d" % qdb.util.convert_to_id(data_type, "data_type")
1195
1196
            sql = """INSERT INTO qiita.analysis_filepath
1197
                        (analysis_id, filepath_id{0})
1198
                     VALUES (%s, %s{1})""".format(col, dtid)
1199
            qdb.sql_connection.TRN.add(sql, [self._id, fpid])
1200
            qdb.sql_connection.TRN.execute()
1201
1202
    def _slurm_reservation(self):
1203
        """Helper method for the slurm_reservation property"""
1204
        with qdb.sql_connection.TRN:
1205
            sql = """SELECT slurm_reservation
1206
                     FROM qiita.{0}
1207
                     WHERE analysis_id = %s""".format(self._table)
1208
            qdb.sql_connection.TRN.add(sql, [self._id])
1209
            return qdb.sql_connection.TRN.execute_fetchflatten()
1210
1211
    @property
1212
    def slurm_reservation(self):
1213
        """Returns a valid reservation if it exists
1214
1215
        Returns
1216
        -------
1217
        str or None
1218
            returns the slurm reservation or None
1219
        """
1220
        slurm_reservation = self._slurm_reservation()
1221
1222
        if slurm_reservation and slurm_reservation[0] != '':
1223
            cmd = f"scontrol show reservations {slurm_reservation[0]}"
1224
            p_out, p_err, rv = qdb.processing_job._system_call(cmd)
1225
            if rv == 0 and p_out != 'No reservations in the system\n':
1226
                return slurm_reservation[0]
1227
1228
        return None
1229
1230
    @slurm_reservation.setter
1231
    def slurm_reservation(self, slurm_reservation):
1232
        """Changes the slurm reservation of the analysis
1233
1234
        Parameters
1235
        ----------
1236
        slurm_reservation : str
1237
            New slurm_reservation for the analysis
1238
        """
1239
        sql = """UPDATE qiita.{0}
1240
                 SET slurm_reservation = %s
1241
                 WHERE analysis_id = %s""".format(self._table)
1242
        qdb.sql_connection.perform_as_transaction(
1243
            sql, [slurm_reservation, self._id])