# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from itertools import chain
from datetime import datetime
from os import remove
from os.path import isfile, relpath
from shutil import rmtree
from collections import namedtuple
from json import dumps
from qiita_db.util import create_nested_path
import networkx as nx
import qiita_db as qdb
from qiita_core.qiita_settings import qiita_config
TypeNode = namedtuple('TypeNode', ['id', 'job_id', 'name', 'type'])
class Artifact(qdb.base.QiitaObject):
r"""Any kind of file (or group of files) stored in the system and its
attributes
Attributes
----------
timestamp
processing_parameters
visibility
artifact_type
data_type
can_be_submitted_to_ebi
can_be_submitted_to_vamps
is_submitted_to_vamps
filepaths
parents
prep_template
ebi_run_accession
study
has_human
Methods
-------
create
delete
being_deleted_by
archive
See Also
--------
qiita_db.QiitaObject
"""
_table = "artifact"
@classmethod
def iter(cls):
"""Iterate over all artifacts in the database
Returns
-------
generator
Yields a `Artifact` object for each artifact in the database,
in order of ascending artifact_id
"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id FROM qiita.{}
ORDER BY artifact_id""".format(cls._table)
qdb.sql_connection.TRN.add(sql)
ids = qdb.sql_connection.TRN.execute_fetchflatten()
for id_ in ids:
yield Artifact(id_)
@classmethod
def iter_by_visibility(cls, visibility):
r"""Iterator over the artifacts with the given visibility
Parameters
----------
visibility : str
The visibility level
Returns
-------
generator of qiita_db.artifact.Artifact
The artifacts available in the system with the given visibility
"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id
FROM qiita.artifact
JOIN qiita.visibility USING (visibility_id)
WHERE visibility = %s
ORDER BY artifact_id"""
qdb.sql_connection.TRN.add(sql, [visibility])
for a_id in qdb.sql_connection.TRN.execute_fetchflatten():
yield cls(a_id)
@staticmethod
def types():
"""Returns list of all artifact types available and their descriptions
Returns
-------
list of list of str
The artifact type and description of the artifact type, in the form
[[artifact_type, description, can_be_submitted_to_ebi,
can_be_submitted_to_vamps, is_user_uploadable], ...]
"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_type, description,
can_be_submitted_to_ebi,
can_be_submitted_to_vamps, is_user_uploadable
FROM qiita.artifact_type
ORDER BY artifact_type"""
qdb.sql_connection.TRN.add(sql)
return qdb.sql_connection.TRN.execute_fetchindex()
@staticmethod
def create_type(name, description, can_be_submitted_to_ebi,
can_be_submitted_to_vamps, is_user_uploadable,
filepath_types):
"""Creates a new artifact type in the system
Parameters
----------
name : str
The artifact type name
description : str
The artifact type description
can_be_submitted_to_ebi : bool
Whether the artifact type can be submitted to EBI or not
can_be_submitted_to_vamps : bool
Whether the artifact type can be submitted to VAMPS or not
is_user_uploadable : bool
Whether the artifact type can be raw: upload directly to qiita
filepath_types : list of (str, bool)
The list filepath types that the new artifact type supports, and
if they're required or not in an artifact instance of this type
Raises
------
qiita_db.exceptions.QiitaDBDuplicateError
If an artifact type with the same name already exists
"""
with qdb.sql_connection.TRN:
sql = """SELECT EXISTS(
SELECT *
FROM qiita.artifact_type
WHERE artifact_type=%s)"""
qdb.sql_connection.TRN.add(sql, [name])
if qdb.sql_connection.TRN.execute_fetchlast():
raise qdb.exceptions.QiitaDBDuplicateError(
'artifact type', 'name: %s' % name)
sql = """INSERT INTO qiita.artifact_type
(artifact_type, description, can_be_submitted_to_ebi,
can_be_submitted_to_vamps, is_user_uploadable)
VALUES (%s, %s, %s, %s, %s)
RETURNING artifact_type_id"""
qdb.sql_connection.TRN.add(
sql, [name, description, can_be_submitted_to_ebi,
can_be_submitted_to_vamps, is_user_uploadable])
at_id = qdb.sql_connection.TRN.execute_fetchlast()
sql = """INSERT INTO qiita.artifact_type_filepath_type
(artifact_type_id, filepath_type_id, required)
VALUES (%s, %s, %s)"""
sql_args = [
[at_id, qdb.util.convert_to_id(fpt, 'filepath_type'), req]
for fpt, req in filepath_types]
qdb.sql_connection.TRN.add(sql, sql_args, many=True)
# When creating a type is expected that a new mountpoint is created
# for that type, note that we are going to check if there is an
# extra path for the mountpoint, which is useful for the test
# environment
qc = qiita_config
mp = relpath(qc.working_dir, qc.base_data_dir).replace(
'working_dir', '')
mp = mp + name if mp != '/' and mp != '' else name
sql = """INSERT INTO qiita.data_directory
(data_type, mountpoint, subdirectory, active)
VALUES (%s, %s, %s, %s)"""
qdb.sql_connection.TRN.add(sql, [name, mp, True, True])
# We are intersted in the dirpath
create_nested_path(qdb.util.get_mountpoint(name)[0][1])
qdb.sql_connection.TRN.execute()
@classmethod
def copy(cls, artifact, prep_template):
"""Creates a copy of `artifact` and attaches it to `prep_template`
Parameters
----------
artifact : qiita_db.artifact.Artifact
Artifact to copy from
prep_template : qiita_db.metadata_template.prep_template.PrepTemplate
The prep template to attach the new artifact to
Returns
-------
qiita_db.artifact.Artifact
A new instance of Artifact
"""
with qdb.sql_connection.TRN:
visibility_id = qdb.util.convert_to_id("sandbox", "visibility")
atype = artifact.artifact_type
atype_id = qdb.util.convert_to_id(atype, "artifact_type")
dtype_id = qdb.util.convert_to_id(
prep_template.data_type(), "data_type")
sql = """INSERT INTO qiita.artifact (
generated_timestamp, visibility_id, artifact_type_id,
data_type_id, submitted_to_vamps)
VALUES (%s, %s, %s, %s, %s)
RETURNING artifact_id"""
sql_args = [datetime.now(), visibility_id, atype_id, dtype_id,
False]
qdb.sql_connection.TRN.add(sql, sql_args)
a_id = qdb.sql_connection.TRN.execute_fetchlast()
# Associate the artifact with the prep template
instance = cls(a_id)
prep_template.artifact = instance
# Associate the artifact with the study
sql = """INSERT INTO qiita.study_artifact (study_id, artifact_id)
VALUES (%s, %s)"""
sql_args = [prep_template.study_id, a_id]
qdb.sql_connection.TRN.add(sql, sql_args)
# Associate the artifact with the preparation information
sql = """INSERT INTO qiita.preparation_artifact (
prep_template_id, artifact_id)
VALUES (%s, %s)"""
sql_args = [prep_template.id, a_id]
qdb.sql_connection.TRN.add(sql, sql_args)
# Associate the artifact with its filepaths
filepaths = [(x['fp'], x['fp_type']) for x in artifact.filepaths]
fp_ids = qdb.util.insert_filepaths(
filepaths, a_id, atype, copy=True)
sql = """INSERT INTO qiita.artifact_filepath
(artifact_id, filepath_id)
VALUES (%s, %s)"""
sql_args = [[a_id, fp_id] for fp_id in fp_ids]
qdb.sql_connection.TRN.add(sql, sql_args, many=True)
qdb.sql_connection.TRN.execute()
return instance
@classmethod
def create(cls, filepaths, artifact_type, name=None, prep_template=None,
parents=None, processing_parameters=None, move_files=True,
analysis=None, data_type=None):
r"""Creates a new artifact in the system
The parameters depend on how the artifact was generated:
- If the artifact was uploaded by the user, the parameter
`prep_template` should be provided and the parameters `parents`,
`processing_parameters` and `analysis` should not be provided.
- If the artifact was generated by processing one or more
artifacts, the parameters `parents` and `processing_parameters`
should be provided and the parameters `prep_template` and
`analysis` should not be provided.
- If the artifact is the initial artifact of the analysis, the
parameters `analysis` and `data_type` should be provided and the
parameters `prep_template`, `parents` and `processing_parameters`
should not be provided.
Parameters
----------
filepaths : iterable of tuples (str, int)
A list of 2-tuples in which the first element is the artifact
file path and the second one is the file path type id
artifact_type : str
The type of the artifact
name : str, optional
The artifact's name
prep_template : qiita_db.metadata_template.PrepTemplate, optional
If the artifact is being uploaded by the user, the prep template
to which the artifact should be linked to. If not provided,
`parents` or `analysis` should be provided.
parents : iterable of qiita_db.artifact.Artifact, optional
The list of artifacts from which the new artifact has been
generated. If not provided, `prep_template` or `analysis`
should be provided.
processing_parameters : qiita_db.software.Parameters, optional
The processing parameters used to generate the new artifact
from `parents`. It is required if `parents` is provided. It should
not be provided if `processing_parameters` is not provided.
move_files : bool, optional
If False the files will not be moved but copied
analysis : qiita_db.analysis.Analysis, optional
If the artifact is the inital artifact of an analysis, the analysis
to which the artifact belongs to. If not provided, `prep_template`
or `parents` should be provided.
data_type : str
The data_type of the artifact in the `analysis`. It is required if
`analysis` is provided. It should not be provided if `analysis` is
not provided.
Returns
-------
qiita_db.artifact.Artifact
A new instance of Artifact
Raises
------
QiitaDBArtifactCreationError
If `filepaths` is not provided
If both `parents` and `prep_template` are provided
If none of `parents` and `prep_template` are provided
If `parents` is provided but `processing_parameters` is not
If both `prep_template` and `processing_parameters` is provided
If not all the artifacts in `parents` belong to the same study
Notes
-----
The visibility of the artifact is set by default to `sandbox` if
prep_template is passed but if parents is passed we will inherit the
most closed visibility.
The timestamp of the artifact is set by default to `datetime.now()`.
The value of `submitted_to_vamps` is set by default to `False`.
"""
# We need at least one file
if not filepaths:
raise qdb.exceptions.QiitaDBArtifactCreationError(
"at least one filepath is required.")
# Check that the combination of parameters is correct
counts = (int(bool(parents or processing_parameters)) +
int(prep_template is not None) +
int(bool(analysis or data_type)))
if counts != 1:
# More than one parameter has been provided
raise qdb.exceptions.QiitaDBArtifactCreationError(
"One and only one of parents, prep template or analysis must "
"be provided")
elif bool(parents) != bool(processing_parameters):
# When provided, parents and processing parameters both should be
# provided (this is effectively doing an XOR)
raise qdb.exceptions.QiitaDBArtifactCreationError(
"When provided, both parents and processing parameters should "
"be provided")
elif bool(analysis) and not bool(data_type):
# When provided, analysis and data_type both should be
# provided (this is effectively doing an XOR)
raise qdb.exceptions.QiitaDBArtifactCreationError(
"When provided, both analysis and data_type should "
"be provided")
# There are three different ways of creating an Artifact, but all of
# them execute a set of common operations. Declare functions to avoid
# code duplication. These functions should not be used outside of the
# CREATE OR REPLACE FUNCTION, hence declaring them here
def _common_creation_steps(atype, cmd_id, data_type, cmd_parameters):
gen_timestamp = datetime.now()
visibility_id = qdb.util.convert_to_id("sandbox", "visibility")
atype_id = qdb.util.convert_to_id(atype, "artifact_type")
dtype_id = qdb.util.convert_to_id(data_type, "data_type")
# Create the artifact row in the artifact table
sql = """INSERT INTO qiita.artifact
(generated_timestamp, command_id, data_type_id,
command_parameters, visibility_id,
artifact_type_id, submitted_to_vamps)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING artifact_id"""
sql_args = [gen_timestamp, cmd_id, dtype_id,
cmd_parameters, visibility_id, atype_id, False]
qdb.sql_connection.TRN.add(sql, sql_args)
a_id = qdb.sql_connection.TRN.execute_fetchlast()
qdb.sql_connection.TRN.execute()
return cls(a_id)
def _associate_with_study(instance, study_id, prep_template_id):
# Associate the artifact with the study
sql = """INSERT INTO qiita.study_artifact
(study_id, artifact_id)
VALUES (%s, %s)"""
sql_args = [study_id, instance.id]
qdb.sql_connection.TRN.add(sql, sql_args)
sql = """INSERT INTO qiita.preparation_artifact
(prep_template_id, artifact_id)
VALUES (%s, %s)"""
sql_args = [prep_template_id, instance.id]
qdb.sql_connection.TRN.add(sql, sql_args)
qdb.sql_connection.TRN.execute()
def _associate_with_analysis(instance, analysis_id):
# Associate the artifact with the analysis
sql = """INSERT INTO qiita.analysis_artifact
(analysis_id, artifact_id)
VALUES (%s, %s)"""
sql_args = [analysis_id, instance.id]
qdb.sql_connection.perform_as_transaction(sql, sql_args)
with qdb.sql_connection.TRN:
if parents:
dtypes = {p.data_type for p in parents}
# If an artifact has parents, it can be either from the
# processing pipeline or the analysis pipeline. Decide which
# one here
studies = {p.study for p in parents}
analyses = {p.analysis for p in parents}
studies.discard(None)
analyses.discard(None)
studies = {s.id for s in studies}
analyses = {a.id for a in analyses}
# The first 2 cases should never happen, but it doesn't hurt
# to check them
len_studies = len(studies)
len_analyses = len(analyses)
if len_studies > 0 and len_analyses > 0:
raise qdb.exceptions.QiitaDBArtifactCreationError(
"All the parents from an artifact should be either "
"from the analysis pipeline or all from the processing"
" pipeline")
elif len_studies > 1 or len_studies > 1:
raise qdb.exceptions.QiitaDBArtifactCreationError(
"Parents from multiple studies/analyses provided. "
"Analyses: %s. Studies: %s."
% (', '.join(analyses), ', '.join(studies)))
elif len_studies == 1:
# This artifact is part of the processing pipeline
study_id = studies.pop()
# In the processing pipeline, artifacts can have only
# one dtype
if len(dtypes) > 1:
raise qdb.exceptions.QiitaDBArtifactCreationError(
"parents have multiple data types: %s"
% ", ".join(dtypes))
instance = _common_creation_steps(
artifact_type, processing_parameters.command.id,
dtypes.pop(), processing_parameters.dump())
_associate_with_study(
instance, study_id, parents[0].prep_templates[0].id)
else:
# This artifact is part of the analysis pipeline
analysis_id = analyses.pop()
# In the processing pipeline, artifact parents can have
# more than one data type
data_type = ("Multiomic"
if len(dtypes) > 1 else dtypes.pop())
instance = _common_creation_steps(
artifact_type, processing_parameters.command.id,
data_type, processing_parameters.dump())
_associate_with_analysis(instance, analysis_id)
# Associate the artifact with its parents
sql = """INSERT INTO qiita.parent_artifact
(artifact_id, parent_id)
VALUES (%s, %s)"""
sql_args = [(instance.id, p.id) for p in parents]
qdb.sql_connection.TRN.add(sql, sql_args, many=True)
# inheriting visibility
visibilities = {a.visibility for a in instance.parents}
# set based on the "lowest" visibility
if 'sandbox' in visibilities:
instance.visibility = 'sandbox'
elif 'private' in visibilities:
instance.visibility = 'private'
else:
instance._set_visibility('public')
elif prep_template:
# This artifact is uploaded by the user in the
# processing pipeline
instance = _common_creation_steps(
artifact_type, None, prep_template.data_type(), None)
# Associate the artifact with the prep template
prep_template.artifact = instance
# Associate the artifact with the study
_associate_with_study(
instance, prep_template.study_id, prep_template.id)
else:
# This artifact is an initial artifact of an analysis
instance = _common_creation_steps(
artifact_type, None, data_type, None)
# Associate the artifact with the analysis
if bool(analysis):
analysis.add_artifact(instance)
# Associate the artifact with its filepaths
fp_ids = qdb.util.insert_filepaths(
filepaths, instance.id, artifact_type,
move_files=move_files, copy=(not move_files))
sql = """INSERT INTO qiita.artifact_filepath
(artifact_id, filepath_id)
VALUES (%s, %s)"""
sql_args = [[instance.id, fp_id] for fp_id in fp_ids]
qdb.sql_connection.TRN.add(sql, sql_args, many=True)
if name:
instance.name = name
return instance
@classmethod
def delete(cls, artifact_id):
r"""Deletes an artifact from the system with its children
Parameters
----------
artifact_id : int
The parent artifact to be removed
Raises
------
QiitaDBArtifactDeletionError
If the artifacts are public
If the artifacts have been analyzed
If the artifacts have been submitted to EBI
If the artifacts have been submitted to VAMPS
"""
with qdb.sql_connection.TRN:
# This will fail if the artifact with id=artifact_id doesn't exist
instance = cls(artifact_id)
# Check if the artifact is public
if instance.visibility == 'public':
raise qdb.exceptions.QiitaDBArtifactDeletionError(
artifact_id, "it is public")
all_artifacts = list(set(instance.descendants.nodes()))
all_artifacts.reverse()
all_ids = tuple([a.id for a in all_artifacts])
# Check if this or any of the children have been analyzed
sql = """SELECT email, analysis_id
FROM qiita.analysis
WHERE analysis_id IN (
SELECT DISTINCT analysis_id
FROM qiita.analysis_sample
WHERE artifact_id IN %s)"""
qdb.sql_connection.TRN.add(sql, [all_ids])
analyses = qdb.sql_connection.TRN.execute_fetchindex()
if analyses:
analyses = '\n'.join(
['Analysis id: %s, Owner: %s' % (aid, email)
for email, aid in analyses])
raise qdb.exceptions.QiitaDBArtifactDeletionError(
artifact_id, 'it or one of its children has been '
'analyzed by: \n %s' % analyses)
# Check if the artifacts have been submitted to EBI
for a in all_artifacts:
if a.can_be_submitted_to_ebi and a.ebi_run_accessions:
raise qdb.exceptions.QiitaDBArtifactDeletionError(
artifact_id, "Artifact %d has been submitted to "
"EBI" % a.id)
# Check if the artifacts have been submitted to VAMPS
for a in all_artifacts:
if a.can_be_submitted_to_vamps and a.is_submitted_to_vamps:
raise qdb.exceptions.QiitaDBArtifactDeletionError(
artifact_id, "Artifact %d has been submitted to "
"VAMPS" % a.id)
# Check if there is a job queued, running, waiting or
# in_construction that will use/is using the artifact
sql = """SELECT processing_job_id
FROM qiita.artifact_processing_job
JOIN qiita.processing_job USING (processing_job_id)
JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE artifact_id IN %s
AND processing_job_status IN (
'queued', 'running', 'waiting')"""
qdb.sql_connection.TRN.add(sql, [all_ids])
jobs = qdb.sql_connection.TRN.execute_fetchflatten()
if jobs:
# if the artifact has active jobs we need to raise an error
# but we also need to check that if it's only 1 job, that the
# job is not the delete_artifact actual job
raise_error = True
job_name = qdb.processing_job.ProcessingJob(
jobs[0]).command.name
if len(jobs) == 1 and job_name == 'delete_artifact':
raise_error = False
if raise_error:
raise qdb.exceptions.QiitaDBArtifactDeletionError(
artifact_id, "there is a queued/running job that "
"uses this artifact or one of it's children")
# We can now remove the artifacts
filepaths = [f for a in all_artifacts for f in a.filepaths]
study = instance.study
# Delete any failed/successful job that had the artifact as input
sql = """SELECT processing_job_id
FROM qiita.artifact_processing_job
WHERE artifact_id IN %s"""
qdb.sql_connection.TRN.add(sql, [all_ids])
job_ids = tuple(qdb.sql_connection.TRN.execute_fetchflatten())
if job_ids:
sql = """DELETE FROM qiita.artifact_processing_job
WHERE artifact_id IN %s"""
qdb.sql_connection.TRN.add(sql, [all_ids])
# Delete the entry from the artifact_output_processing_job table
sql = """DELETE FROM qiita.artifact_output_processing_job
WHERE artifact_id IN %s"""
qdb.sql_connection.TRN.add(sql, [all_ids])
# Detach the artifact from its filepaths
sql = """DELETE FROM qiita.artifact_filepath
WHERE artifact_id IN %s"""
qdb.sql_connection.TRN.add(sql, [all_ids])
# If the first artifact to be deleted, instance, doesn't have
# parents and study is not None (None means is an analysis), we
# move the files to the uploads folder. We also need
# to nullify the column in the prep template table
if not instance.parents and study is not None:
qdb.util.move_filepaths_to_upload_folder(study.id,
filepaths)
# there are cases that an artifact would not be linked to a
# study
pt_ids = [tuple([pt.id]) for a in all_artifacts
for pt in a.prep_templates]
if pt_ids:
sql = """UPDATE qiita.prep_template
SET artifact_id = NULL
WHERE prep_template_id IN %s"""
qdb.sql_connection.TRN.add(sql, pt_ids)
else:
sql = """DELETE FROM qiita.parent_artifact
WHERE artifact_id IN %s"""
qdb.sql_connection.TRN.add(sql, [all_ids])
# Detach the artifacts from the study_artifact table
sql = "DELETE FROM qiita.study_artifact WHERE artifact_id IN %s"
qdb.sql_connection.TRN.add(sql, [all_ids])
# Detach the artifacts from the analysis_artifact table
sql = "DELETE FROM qiita.analysis_artifact WHERE artifact_id IN %s"
qdb.sql_connection.TRN.add(sql, [all_ids])
# Detach artifact from preparation_artifact
sql = """DELETE FROM qiita.preparation_artifact
WHERE artifact_id IN %s"""
qdb.sql_connection.TRN.add(sql, [all_ids])
# Delete the rows in the artifact table
sql = "DELETE FROM qiita.artifact WHERE artifact_id IN %s"
qdb.sql_connection.TRN.add(sql, [all_ids])
@classmethod
def archive(cls, artifact_id, clean_ancestors=True):
"""Archive artifact with artifact_id
Parameters
----------
artifact_id : int
The artifact to be archived
clean_ancestors : bool
If other childless artifacts should be deleted
Raises
------
QiitaDBOperationNotPermittedError
If the artifact is not public
If the artifact_type is not BIOM
If the artifact belongs to an analysis
If the artifact has no parents (raw file)
"""
artifact = cls(artifact_id)
if artifact.visibility != 'public':
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
'Only public artifacts can be archived')
if artifact.artifact_type != 'BIOM':
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
'Only BIOM artifacts can be archived')
if artifact.analysis is not None:
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
'Only non analysis artifacts can be archived')
if not artifact.parents:
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
'Only non raw artifacts can be archived')
to_delete = []
if clean_ancestors:
# let's find all ancestors that can be deleted (it has parents and
# no ancestors (that have no descendants), and delete them
to_delete = [x for x in artifact.ancestors.nodes()
if x.id != artifact_id and x.parents and
not [y for y in x.descendants.nodes()
if y.id not in (artifact_id, x.id)]]
# ignore artifacts that can and has been submitted to EBI
to_delete = [x for x in to_delete if not x.can_be_submitted_to_ebi
and not x.is_submitted_to_ebi
and not x.is_submitted_to_vamps]
# get the log file so we can delete
fids = [x['fp_id'] for x in artifact.filepaths
if x['fp_type'] == 'log']
archive_data = dumps({"merging_scheme": artifact.merging_scheme})
with qdb.sql_connection.TRN:
artifact._set_visibility('archived', propagate=False)
sql = 'DELETE FROM qiita.parent_artifact WHERE artifact_id = %s'
qdb.sql_connection.TRN.add(sql, [artifact_id])
sql = '''DELETE FROM qiita.artifact_output_processing_job
WHERE artifact_id = %s'''
qdb.sql_connection.TRN.add(sql, [artifact_id])
if fids:
sql = '''DELETE FROM qiita.artifact_filepath
WHERE filepath_id IN %s'''
qdb.sql_connection.TRN.add(sql, [tuple(fids)])
sql = """UPDATE qiita.{0}
SET archive_data = %s
WHERE artifact_id = %s""".format(cls._table)
qdb.sql_connection.TRN.add(sql, [archive_data, artifact_id])
qdb.sql_connection.TRN.execute()
# cleaning the extra artifacts
for x in to_delete:
x._set_visibility('sandbox', propagate=False)
cls.delete(x.id)
@property
def name(self):
"""The name of the artifact
Returns
-------
str
The artifact name
"""
with qdb.sql_connection.TRN:
sql = """SELECT name
FROM qiita.artifact
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
@name.setter
def name(self, value):
"""Set the name of the artifact
Parameters
----------
value : str
The new artifact's name
Raises
------
ValueError
If `value` contains more than 35 chars
"""
sql = """UPDATE qiita.artifact
SET name = %s
WHERE artifact_id = %s"""
qdb.sql_connection.perform_as_transaction(sql, [value, self.id])
@property
def timestamp(self):
"""The timestamp when the artifact was generated
Returns
-------
datetime
The timestamp when the artifact was generated
"""
with qdb.sql_connection.TRN:
sql = """SELECT generated_timestamp
FROM qiita.artifact
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
@property
def processing_parameters(self):
"""The processing parameters used to generate the artifact
Returns
-------
qiita_db.software.Parameters or None
The parameters used to generate the artifact if it has parents.
None otherwise.
"""
with qdb.sql_connection.TRN:
sql = """SELECT command_id, command_parameters
FROM qiita.artifact
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
# Only one row will be returned
res = qdb.sql_connection.TRN.execute_fetchindex()[0]
if res[0] is None:
return None
return qdb.software.Parameters.load(
qdb.software.Command(res[0]), values_dict=res[1])
@property
def visibility(self):
"""The visibility of the artifact
Returns
-------
str
The visibility of the artifact
"""
with qdb.sql_connection.TRN:
sql = """SELECT visibility
FROM qiita.artifact
JOIN qiita.visibility USING (visibility_id)
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
def _set_visibility(self, value, propagate=True):
"helper method to split validation and actual set of the visibility"
# In order to correctly propagate the visibility we need to find
# the root of this artifact and then propagate to all the artifacts
vis_id = qdb.util.convert_to_id(value, "visibility")
if propagate:
sql = "SELECT * FROM qiita.find_artifact_roots(%s)"
qdb.sql_connection.TRN.add(sql, [self.id])
root_id = qdb.sql_connection.TRN.execute_fetchlast()
root = qdb.artifact.Artifact(root_id)
# these are the ids of all the children from the root
ids = [a.id for a in root.descendants.nodes()]
else:
ids = [self.id]
sql = """UPDATE qiita.artifact
SET visibility_id = %s
WHERE artifact_id IN %s"""
qdb.sql_connection.perform_as_transaction(sql, [vis_id, tuple(ids)])
@visibility.setter
def visibility(self, value):
"""Sets the visibility of the artifact
Parameters
----------
value : str
The new visibility of the artifact
Notes
-----
The visibility of an artifact is propagated to its ancestors, but it
only applies when the new visibility is more open than before.
"""
with qdb.sql_connection.TRN:
# first let's check that this is a valid visibility
study = self.study
# then let's check that the sample/prep info files have the correct
# restrictions
if value != 'sandbox' and study is not None:
reply = study.sample_template.validate_restrictions()
success = [not reply[0]]
message = [reply[1]]
for pt in self.prep_templates:
reply = pt.validate_restrictions()
success.append(not reply[0])
message.append(reply[1])
if any(success):
raise ValueError(
"Errors in your info files:%s" % '\n'.join(message))
self._set_visibility(value)
@property
def artifact_type(self):
"""The artifact type
Returns
-------
str
The artifact type
"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_type
FROM qiita.artifact
JOIN qiita.artifact_type USING (artifact_type_id)
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
@property
def data_type(self):
"""The data type of the artifact
Returns
-------
str
The artifact data type
"""
with qdb.sql_connection.TRN:
sql = """SELECT data_type
FROM qiita.artifact
JOIN qiita.data_type USING (data_type_id)
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
@property
def can_be_submitted_to_ebi(self):
"""Whether the artifact can be submitted to EBI or not
Returns
-------
bool
True if the artifact can be submitted to EBI. False otherwise.
"""
with qdb.sql_connection.TRN:
# we should always return False if this artifact is not directly
# attached to the prep_template or is the second after. In other
# words has more that one processing step behind it
fine_to_send = []
fine_to_send.extend([pt.artifact for pt in self.prep_templates])
fine_to_send.extend([c for a in fine_to_send if a is not None
for c in a.children])
if self not in fine_to_send:
return False
sql = """SELECT can_be_submitted_to_ebi
FROM qiita.artifact_type
JOIN qiita.artifact USING (artifact_type_id)
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
@property
def is_submitted_to_ebi(self):
"""Whether the artifact has been submitted to EBI or not
Returns
-------
bool
True if the artifact has been submitted to EBI. False otherwise
Raises
------
QiitaDBOperationNotPermittedError
If the artifact cannot be submitted to EBI
"""
with qdb.sql_connection.TRN:
if not self.can_be_submitted_to_ebi:
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Artifact %s cannot be submitted to EBI" % self.id)
sql = """SELECT EXISTS(
SELECT *
FROM qiita.ebi_run_accession
WHERE artifact_id = %s)"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
@property
def ebi_run_accessions(self):
"""The EBI run accessions attached to this artifact
Returns
-------
dict of {str: str}
The EBI run accessions keyed by sample id
Raises
------
QiitaDBOperationNotPermittedError
If the artifact cannot be submitted to EBI
"""
with qdb.sql_connection.TRN:
if not self.can_be_submitted_to_ebi:
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Artifact %s cannot be submitted to EBI" % self.id)
sql = """SELECT sample_id, ebi_run_accession
FROM qiita.ebi_run_accession
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return {s_id: ebi_acc for s_id, ebi_acc in
qdb.sql_connection.TRN.execute_fetchindex()}
@ebi_run_accessions.setter
def ebi_run_accessions(self, values):
"""Set the EBI run accession attached to this artifact
Parameters
----------
values : dict of {str: str}
The EBI accession number keyed by sample id
Raises
------
QiitaDBOperationNotPermittedError
If the artifact cannot be submitted to EBI
If the artifact has been already submitted to EBI
"""
with qdb.sql_connection.TRN:
if not self.can_be_submitted_to_ebi:
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Artifact %s cannot be submitted to EBI" % self.id)
sql = """SELECT EXISTS(SELECT *
FROM qiita.ebi_run_accession
WHERE artifact_id = %s)"""
qdb.sql_connection.TRN.add(sql, [self.id])
if qdb.sql_connection.TRN.execute_fetchlast():
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Artifact %s already submitted to EBI" % self.id)
sql = """INSERT INTO qiita.ebi_run_accession
(sample_id, artifact_id, ebi_run_accession)
VALUES (%s, %s, %s)"""
sql_args = [[sample, self.id, accession]
for sample, accession in values.items()]
qdb.sql_connection.TRN.add(sql, sql_args, many=True)
qdb.sql_connection.TRN.execute()
@property
def can_be_submitted_to_vamps(self):
"""Whether the artifact can be submitted to VAMPS or not
Returns
-------
bool
True if the artifact can be submitted to VAMPS. False otherwise.
"""
with qdb.sql_connection.TRN:
sql = """SELECT can_be_submitted_to_vamps
FROM qiita.artifact_type
JOIN qiita.artifact USING (artifact_type_id)
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
@property
def is_submitted_to_vamps(self):
"""Whether if the artifact has been submitted to VAMPS or not
Returns
-------
bool
True if the artifact has been submitted to VAMPS. False otherwise
Raises
------
QiitaDBOperationNotPermittedError
If the artifact cannot be submitted to VAMPS
"""
with qdb.sql_connection.TRN:
if not self.can_be_submitted_to_vamps:
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Artifact %s cannot be submitted to VAMPS" % self.id)
sql = """SELECT submitted_to_vamps
FROM qiita.artifact
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
@is_submitted_to_vamps.setter
def is_submitted_to_vamps(self, value):
"""Set if the artifact has been submitted to VAMPS
Parameters
----------
value : bool
Whether the artifact has been submitted to VAMPS or not
Raises
------
QiitaDBOperationNotPermittedError
If the artifact cannot be submitted to VAMPS
"""
if not self.can_be_submitted_to_vamps:
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Artifact %s cannot be submitted to VAMPS" % self.id)
sql = """UPDATE qiita.artifact
SET submitted_to_vamps = %s
WHERE artifact_id = %s"""
qdb.sql_connection.perform_as_transaction(sql, [value, self.id])
@property
def filepaths(self):
"""Returns the filepaths associated with the artifact
Returns
-------
list of dict
A list of dict as defined by qiita_db.util.retrieve_filepaths
"""
return qdb.util.retrieve_filepaths(
"artifact_filepath", "artifact_id", self.id, sort='ascending')
@property
def html_summary_fp(self):
"""Returns the HTML summary filepath
Returns
-------
tuple of (int, str)
The filepath id and the path to the HTML summary
"""
fps = qdb.util.retrieve_filepaths("artifact_filepath", "artifact_id",
self.id, fp_type='html_summary')
if fps:
# If fps is not the empty list, then we have exactly one file
# retrieve_filepaths returns a list of lists of 3 values: the
# filepath id, the filepath and the filepath type. We don't want
# to return the filepath type here, so just grabbing the first and
# second element of the list
res = (fps[0]['fp_id'], fps[0]['fp'])
else:
res = None
return res
def set_html_summary(self, html_fp, support_dir=None):
"""Sets the HTML summary of the artifact
Parameters
----------
html_fp : str
Path to the new HTML summary
support_dir : str
Path to the directory containing any support files needed by
the HTML file
"""
with qdb.sql_connection.TRN:
old_summs = self.html_summary_fp
to_delete_fps = []
if old_summs:
# Delete from the DB current HTML summary; below we will remove
# files, if necessary
to_delete_ids = []
for x in self.filepaths:
if x['fp_type'] in ('html_summary', 'html_summary_dir'):
to_delete_ids.append([x['fp_id']])
to_delete_fps.append(x['fp'])
# From the artifact_filepath table
sql = """DELETE FROM qiita.artifact_filepath
WHERE filepath_id = %s"""
qdb.sql_connection.TRN.add(sql, to_delete_ids, many=True)
# From the filepath table
sql = "DELETE FROM qiita.filepath WHERE filepath_id=%s"
qdb.sql_connection.TRN.add(sql, to_delete_ids, many=True)
# Add the new HTML summary
filepaths = [(html_fp, 'html_summary')]
if support_dir is not None:
filepaths.append((support_dir, 'html_summary_dir'))
fp_ids = qdb.util.insert_filepaths(
filepaths, self.id, self.artifact_type)
sql = """INSERT INTO qiita.artifact_filepath
(artifact_id, filepath_id)
VALUES (%s, %s)"""
sql_args = [[self.id, id_] for id_ in fp_ids]
qdb.sql_connection.TRN.add(sql, sql_args, many=True)
qdb.sql_connection.TRN.execute()
# to avoid deleting potentially necessary files, we are going to add
# that check after the previous transaction is commited
if to_delete_fps:
for x in self.filepaths:
if x['fp'] in to_delete_fps:
to_delete_fps.remove(x['fp'])
for fp in to_delete_fps:
if isfile(fp):
remove(fp)
else:
rmtree(fp)
@property
def parents(self):
"""Returns the parents of the artifact
Returns
-------
list of qiita_db.artifact.Artifact
The parent artifacts
"""
with qdb.sql_connection.TRN:
sql = """SELECT parent_id
FROM qiita.parent_artifact
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return [Artifact(p_id)
for p_id in qdb.sql_connection.TRN.execute_fetchflatten()]
def _create_lineage_graph_from_edge_list(self, edge_list):
"""Generates an artifact graph from the given `edge_list`
Parameters
----------
edge_list : list of (int, int)
List of (parent_artifact_id, artifact_id)
Returns
-------
networkx.DiGraph
The graph representing the artifact lineage stored in `edge_list`
"""
lineage = nx.DiGraph()
# In case the edge list is empty, only 'self' is present in the graph
if edge_list:
# By creating all the artifacts here we are saving DB calls
nodes = {a_id: Artifact(a_id)
for a_id in set(chain.from_iterable(edge_list))}
for parent, child in edge_list:
lineage.add_edge(nodes[parent], nodes[child])
else:
lineage.add_node(self)
return lineage
@property
def ancestors(self):
"""Returns the ancestors of the artifact
Returns
-------
networkx.DiGraph
The ancestors of the artifact
"""
with qdb.sql_connection.TRN:
sql = """SELECT parent_id, artifact_id
FROM qiita.artifact_ancestry(%s)"""
qdb.sql_connection.TRN.add(sql, [self.id])
edges = qdb.sql_connection.TRN.execute_fetchindex()
return self._create_lineage_graph_from_edge_list(edges)
@property
def descendants(self):
"""Returns the descendants of the artifact
Returns
-------
networkx.DiGraph
The descendants of the artifact
"""
with qdb.sql_connection.TRN:
sql = """SELECT parent_id, artifact_id
FROM qiita.artifact_descendants(%s)"""
qdb.sql_connection.TRN.add(sql, [self.id])
edges = qdb.sql_connection.TRN.execute_fetchindex()
return self._create_lineage_graph_from_edge_list(edges)
@property
def descendants_with_jobs(self):
"""Returns the descendants of the artifact with their jobs
Returns
-------
networkx.DiGraph
The descendants of the artifact
"""
def _add_edge(edges, src, dest):
"""Aux function to add the edge (src, dest) to edges"""
edge = (src, dest)
if edge not in edges:
edges.add(edge)
with qdb.sql_connection.TRN:
sql = """SELECT processing_job_id, input_id, output_id
FROM qiita.artifact_descendants_with_jobs(%s)"""
qdb.sql_connection.TRN.add(sql, [self.id])
sql_edges = qdb.sql_connection.TRN.execute_fetchindex()
# helper function to reduce code duplication
def _helper(sql_edges, edges, nodes):
for jid, pid, cid in sql_edges:
if jid not in nodes:
nodes[jid] = ('job',
qdb.processing_job.ProcessingJob(jid))
if pid not in nodes:
nodes[pid] = ('artifact', qdb.artifact.Artifact(pid))
if cid not in nodes:
nodes[cid] = ('artifact', qdb.artifact.Artifact(cid))
edges.add((nodes[pid], nodes[jid]))
edges.add((nodes[jid], nodes[cid]))
lineage = nx.DiGraph()
edges = set()
nodes = dict()
extra_edges = set()
extra_nodes = dict()
if sql_edges:
_helper(sql_edges, edges, nodes)
else:
nodes[self.id] = ('artifact', self)
lineage.add_node(nodes[self.id])
# if this is an Analysis we need to check if there are extra
# edges/nodes as there is a chance that there are connecions
# between them
if self.analysis is not None:
roots = [a for a in self.analysis.artifacts
if not a.parents and a != self]
for r in roots:
# add the root to the options then their children
extra_nodes[r.id] = ('artifact', r)
qdb.sql_connection.TRN.add(sql, [r.id])
sql_edges = qdb.sql_connection.TRN.execute_fetchindex()
_helper(sql_edges, extra_edges, extra_nodes)
# The code above returns all the jobs that have been successfully
# executed. We need to add all the jobs that are in all the other
# status. Approach: Loop over all the artifacts and add all the
# jobs that have been attached to them.
visited = set()
queue = list(nodes.keys())
while queue:
current = queue.pop(0)
if current not in visited:
visited.add(current)
n_type, n_obj = nodes[current]
if n_type == 'artifact':
# Add all the jobs to the queue
for job in n_obj.jobs():
queue.append(job.id)
if job.id not in nodes:
nodes[job.id] = ('job', job)
elif n_type == 'job':
# skip private and artifact definition jobs as they
# don't create new artifacts and they would create
# edges without artifacts + they can be safely ignored
if n_obj.command.software.type in {
'private', 'artifact definition'}:
continue
jstatus = n_obj.status
# If the job is in success we don't need to do anything
# else since it would've been added by the code above
if jstatus != 'success':
if jstatus != 'error':
# If the job is not errored, we can add the
# future outputs and the children jobs to
# the graph.
# Add all the job outputs as new nodes
for o_name, o_type in n_obj.command.outputs:
node_id = '%s:%s' % (n_obj.id, o_name)
node = TypeNode(
id=node_id, job_id=n_obj.id,
name=o_name, type=o_type)
queue.append(node_id)
if node_id not in nodes:
nodes[node_id] = ('type', node)
# Add all his children jobs to the queue
for cjob in n_obj.children:
queue.append(cjob.id)
if cjob.id not in nodes:
nodes[cjob.id] = ('job', cjob)
# including the outputs
for o_name, o_type in cjob.command.outputs:
node_id = '%s:%s' % (cjob.id, o_name)
node = TypeNode(
id=node_id, job_id=cjob.id,
name=o_name, type=o_type)
if node_id not in nodes:
nodes[node_id] = ('type', node)
# Connect the job with his input artifacts, the
# input artifacts may or may not exist yet, so we
# need to check both the input_artifacts and the
# pending properties
for in_art in n_obj.input_artifacts:
iid = in_art.id
if iid not in nodes and iid in extra_nodes:
nodes[iid] = extra_nodes[iid]
_add_edge(edges, nodes[iid], nodes[n_obj.id])
pending = n_obj.pending
for pred_id in pending:
for pname in pending[pred_id]:
in_node_id = '%s:%s' % (
pred_id, pending[pred_id][pname])
_add_edge(edges, nodes[in_node_id],
nodes[n_obj.id])
elif n_type == 'type':
# Connect this 'future artifact' with the job that will
# generate it
_add_edge(edges, nodes[n_obj.job_id], nodes[current])
else:
raise ValueError('Unrecognized type: %s' % n_type)
# Add all edges to the lineage graph - adding the edges creates the
# nodes in networkx
for source, dest in edges:
lineage.add_edge(source, dest)
return lineage
@property
def children(self):
"""Returns the list of children of the artifact
Returns
-------
list of qiita_db.artifact.Artifact
The children artifacts
"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id
FROM qiita.parent_artifact
WHERE parent_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return [Artifact(c_id)
for c_id in qdb.sql_connection.TRN.execute_fetchflatten()]
@property
def youngest_artifact(self):
"""Returns the youngest artifact of the artifact's lineage
Returns
-------
qiita_db.artifact.Artifact
The youngest descendant of the artifact's lineage
"""
with qdb.sql_connection.TRN:
sql = """SELECT artifact_id
FROM qiita.artifact_descendants(%s)
JOIN qiita.artifact USING (artifact_id)
WHERE visibility_id NOT IN %s
ORDER BY generated_timestamp DESC
LIMIT 1"""
qdb.sql_connection.TRN.add(
sql, [self.id, qdb.util.artifact_visibilities_to_skip()])
a_id = qdb.sql_connection.TRN.execute_fetchindex()
# If the current artifact has no children, the previous call will
# return an empty list, so the youngest artifact in the lineage is
# the current artifact. On the other hand, if it has descendants,
# the id of the youngest artifact will be in a_id[0][0]
result = Artifact(a_id[0][0]) if a_id else self
return result
@property
def prep_templates(self):
"""The prep templates attached to this artifact
Returns
-------
list of qiita_db.metadata_template.PrepTemplate
"""
with qdb.sql_connection.TRN:
sql = """SELECT prep_template_id
FROM qiita.preparation_artifact
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return [qdb.metadata_template.prep_template.PrepTemplate(pt_id)
for pt_id in qdb.sql_connection.TRN.execute_fetchflatten()]
@property
def study(self):
"""The study to which the artifact belongs to
Returns
-------
qiita_db.study.Study or None
The study that owns the artifact, if any
"""
with qdb.sql_connection.TRN:
sql = """SELECT study_id
FROM qiita.study_artifact
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
res = qdb.sql_connection.TRN.execute_fetchindex()
return qdb.study.Study(res[0][0]) if res else None
@property
def analysis(self):
"""The analysis to which the artifact belongs to
Returns
-------
qiita_db.analysis.Analysis or None
The analysis that owns the artifact, if any
"""
with qdb.sql_connection.TRN:
sql = """SELECT analysis_id
FROM qiita.analysis_artifact
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
res = qdb.sql_connection.TRN.execute_fetchindex()
return qdb.analysis.Analysis(res[0][0]) if res else None
@property
def merging_scheme(self):
"""The merging scheme of this artifact_type
Returns
-------
str, str
The human readable merging scheme and the parent software
information for this artifact
"""
vid = qdb.util.convert_to_id(self.visibility, "visibility")
if vid in qdb.util.artifact_visibilities_to_skip():
with qdb.sql_connection.TRN:
sql = f"""SELECT archive_data
FROM qiita.{self._table}
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
archive_data = qdb.sql_connection.TRN.execute_fetchlast()
merging_schemes = [archive_data['merging_scheme'][0]]
parent_softwares = [archive_data['merging_scheme'][1]]
else:
processing_params = self.processing_parameters
if processing_params is None:
return '', ''
cmd_name = processing_params.command.name
ms = processing_params.command.merging_scheme
afps = [x['fp'] for x in self.filepaths
if x['fp'].endswith('biom')]
merging_schemes = []
parent_softwares = []
# this loop is necessary as in theory an artifact can be
# generated from multiple prep info files
for p in self.parents:
pparent = p.processing_parameters
# if parent is None, then is a direct upload; for example
# per_sample_FASTQ in shotgun data
if pparent is None:
parent_cmd_name = None
parent_merging_scheme = None
parent_pp = None
parent_software = 'N/A'
else:
parent_cmd_name = pparent.command.name
parent_merging_scheme = pparent.command.merging_scheme
parent_pp = pparent.values
psoftware = pparent.command.software
parent_software = '%s v%s' % (
psoftware.name, psoftware.version)
merging_schemes.append(qdb.util.human_merging_scheme(
cmd_name, ms, parent_cmd_name, parent_merging_scheme,
processing_params.values, afps, parent_pp))
parent_softwares.append(parent_software)
return ', '.join(merging_schemes), ', '.join(parent_softwares)
@property
def being_deleted_by(self):
"""The running job that is deleting this artifact
Returns
-------
qiita_db.processing_job.ProcessingJob
The running job that is deleting this artifact, None if it
doesn't exist
"""
with qdb.sql_connection.TRN:
sql = """
SELECT processing_job_id FROM qiita.artifact_processing_job
LEFT JOIN qiita.processing_job using (processing_job_id)
LEFT JOIN qiita.processing_job_status using (
processing_job_status_id)
LEFT JOIN qiita.software_command using (command_id)
WHERE artifact_id = %s AND name = 'delete_artifact' AND
processing_job_status in (
'running', 'queued', 'in_construction')"""
qdb.sql_connection.TRN.add(sql, [self.id])
res = qdb.sql_connection.TRN.execute_fetchindex()
return qdb.processing_job.ProcessingJob(res[0][0]) if res else None
@property
def has_human(self):
has_human = False
# we are going to check the metadata if:
# - the prep data_type is _not_ target gene
# - the prep is not current_human_filtering
# - if the artifact_type is 'per_sample_FASTQ'
pts = self.prep_templates
tgs = qdb.metadata_template.constants.TARGET_GENE_DATA_TYPES
ntg = any([pt.data_type() not in tgs for pt in pts])
chf = any([not pt.current_human_filtering for pt in pts])
if ntg and chf and self.artifact_type == 'per_sample_FASTQ':
st = self.study.sample_template
if 'env_package' in st.categories:
sql = f"""SELECT DISTINCT sample_values->>'env_package'
FROM qiita.sample_{st.id} WHERE sample_id in (
SELECT sample_id from qiita.preparation_artifact
LEFT JOIN qiita.prep_template_sample USING (
prep_template_id)
WHERE artifact_id = {self.id})"""
with qdb.sql_connection.TRN:
qdb.sql_connection.TRN.add(sql)
for v in qdb.sql_connection.TRN.execute_fetchflatten():
# str is needed as v could be None
if str(v).startswith('human-'):
has_human = True
break
return has_human
def jobs(self, cmd=None, status=None, show_hidden=False):
"""Jobs that used this artifact as input
Parameters
----------
cmd : qiita_db.software.Command, optional
If provided, only jobs that executed this command will be returned
status : str, optional
If provided, only jobs in this status will be returned
show_hidden : bool, optional
If true, return also the "hidden" jobs
Returns
-------
list of qiita_db.processing_job.ProcessingJob
The list of jobs that used this artifact as input
"""
with qdb.sql_connection.TRN:
sql = """SELECT processing_job_id
FROM qiita.artifact_processing_job
JOIN qiita.processing_job USING (processing_job_id)
JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE artifact_id = %s"""
sql_args = [self.id]
if cmd:
sql = "{} AND command_id = %s".format(sql)
sql_args.append(cmd.id)
if status:
sql = "{} AND processing_job_status = %s".format(sql)
sql_args.append(status)
if not show_hidden:
sql = "{} AND hidden = %s".format(sql)
sql_args.append(False)
qdb.sql_connection.TRN.add(sql, sql_args)
return [qdb.processing_job.ProcessingJob(jid)
for jid in qdb.sql_connection.TRN.execute_fetchflatten()]
@property
def get_commands(self):
"""Returns the active commands that can process this kind of artifact
Returns
-------
list of qiita_db.software.Command
The commands that can process the given artifact tyoes
"""
dws = []
with qdb.sql_connection.TRN:
# get all the possible commands
sql = """SELECT DISTINCT qiita.command_parameter.command_id
FROM qiita.artifact
JOIN qiita.parameter_artifact_type
USING (artifact_type_id)
JOIN qiita.command_parameter USING (command_parameter_id)
JOIN qiita.software_command ON (
qiita.command_parameter.command_id =
qiita.software_command.command_id)
WHERE artifact_id = %s AND active = True"""
if self.analysis is None:
sql += " AND is_analysis = False"
# get the workflows that match this artifact so we can filter
# the available commands based on the commands in the worflows
# for that artifact - except is the artifact_type == 'BIOM'
if self.artifact_type != 'BIOM':
dws = [w for w in qdb.software.DefaultWorkflow.iter()
if self.data_type in w.data_type]
else:
sql += " AND is_analysis = True"
qdb.sql_connection.TRN.add(sql, [self.id])
cids = set(qdb.sql_connection.TRN.execute_fetchflatten())
if dws:
cmds = {n.default_parameter.command.id
for w in dws for n in w.graph.nodes}
cids = cmds & cids
return [qdb.software.Command(cid) for cid in cids]
@property
def human_reads_filter_method(self):
"""The human_reads_filter_method of the artifact
Returns
-------
str
The human_reads_filter_method name
"""
with qdb.sql_connection.TRN:
sql = """SELECT human_reads_filter_method
FROM qiita.artifact
LEFT JOIN qiita.human_reads_filter_method
USING (human_reads_filter_method_id)
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()
@human_reads_filter_method.setter
def human_reads_filter_method(self, value):
"""Set the human_reads_filter_method of the artifact
Parameters
----------
value : str
The new artifact's human_reads_filter_method
Raises
------
ValueError
If `value` doesn't exist in the database
"""
with qdb.sql_connection.TRN:
sql = """SELECT human_reads_filter_method_id
FROM qiita.human_reads_filter_method
WHERE human_reads_filter_method = %s"""
qdb.sql_connection.TRN.add(sql, [value])
idx = qdb.sql_connection.TRN.execute_fetchflatten()
if len(idx) == 0:
raise ValueError(
f'"{value}" is not a valid human_reads_filter_method')
sql = """UPDATE qiita.artifact
SET human_reads_filter_method_id = %s
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [idx[0], self.id])