[973924]: / qiita_db / archive.py

Download this file

217 lines (184 with data), 7.9 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
import qiita_db as qdb
class Archive(qdb.base.QiitaObject):
r"""Extra information for any features stored in a BIOM Artifact
Methods
-------
insert_from_artifact
get_merging_scheme_from_job
retrieve_feature_values
insert_features
See Also
--------
qiita_db.QiitaObject
"""
@classmethod
def merging_schemes(cls):
r"""Returns the available merging schemes
Returns
-------
Iterator
Iterator over the sample ids
See Also
--------
keys
"""
with qdb.sql_connection.TRN:
sql = """SELECT archive_merging_scheme_id, archive_merging_scheme
FROM qiita.archive_merging_scheme"""
qdb.sql_connection.TRN.add(sql)
return dict(qdb.sql_connection.TRN.execute_fetchindex())
@classmethod
def _inserting_main_steps(cls, ms, features):
with qdb.sql_connection.TRN:
sql = """INSERT INTO qiita.archive_merging_scheme
(archive_merging_scheme)
SELECT %s WHERE NOT EXISTS (
SELECT 1 FROM qiita.archive_merging_scheme
WHERE archive_merging_scheme = %s)"""
qdb.sql_connection.TRN.add(sql, [ms, ms])
sql = """SELECT archive_merging_scheme_id
FROM qiita.archive_merging_scheme
WHERE archive_merging_scheme = %s"""
qdb.sql_connection.TRN.add(sql, [ms])
amsi = qdb.sql_connection.TRN.execute_fetchlast()
vals = [[amsi, _id, val] for _id, val in features.items()]
qdb.sql_connection.TRN.add(
"SELECT archive_upsert(%s, %s, %s)", vals, many=True)
qdb.sql_connection.TRN.execute()
@classmethod
def insert_from_artifact(cls, artifact, features):
r"""Inserts new features to the database based on a given artifact
Parameters
----------
artifact : qiita_db.artifact.Artifact
The artifact from which the features were generated
features : dict {str: str}
A dictionary of the features and the values to be stored
Raises
------
ValueError
If the Artifact type is not BIOM
If the artifact doesn't have a biom filepath
"""
with qdb.sql_connection.TRN:
atype = artifact.artifact_type
if atype != 'BIOM':
raise ValueError(
"To archive artifact must be BIOM but %s" % atype)
bfps = [x['fp'] for x in artifact.filepaths
if x['fp_type'] == 'biom']
if not bfps:
raise ValueError("The artifact has no biom files")
# [0] as it returns a list
ms = qdb.util.get_artifacts_information(
[artifact.id])[0]['algorithm']
cls._inserting_main_steps(ms, features)
@classmethod
def get_merging_scheme_from_job(cls, job):
r"""Inserts new features to the database based on a given job
Parameters
----------
job : qiita_db.processing_job.ProcessingJob
The Qiita process job_id generating the artifact holding the
features to be retrieved or stored.
Raises
------
ValueError
If the Artifact type is not BIOM
If the artifact doesn't have a biom filepath
"""
with qdb.sql_connection.TRN:
acmd = job.command
parent = job.input_artifacts[0]
parent_pparameters = parent.processing_parameters
phms = None
if parent_pparameters is None:
parent_cmd_name = None
parent_parameters = None
parent_merging_scheme = None
else:
pcmd = parent_pparameters.command
parent_cmd_name = pcmd.name
parent_parameters = parent_pparameters.values
parent_merging_scheme = pcmd.merging_scheme
if not parent_merging_scheme['ignore_parent_command']:
gp = parent.parents[0]
gp_params = gp.processing_parameters
if gp_params is not None:
gp_cmd = gp_params.command
phms = qdb.util.human_merging_scheme(
parent_cmd_name, parent_merging_scheme,
gp_cmd.name, gp_cmd.merging_scheme,
parent_parameters, [], gp_params.values)
hms = qdb.util.human_merging_scheme(
acmd.name, acmd.merging_scheme,
parent_cmd_name, parent_merging_scheme,
job.parameters.values, [], parent_parameters)
if phms is not None:
hms = qdb.util.merge_overlapping_strings(hms, phms)
return hms
@classmethod
def retrieve_feature_values(cls, archive_merging_scheme=None,
features=None):
r"""Retrieves all features/values from the archive
Parameters
----------
archive_merging_scheme : optional, str
The name of the archive_merging_scheme to retrieve
features : list of str, optional
List of features to retrieve information from the archive
Notes
-----
If archive_merging_scheme is None it will return all
feature values
"""
with qdb.sql_connection.TRN:
extras = []
vals = []
if archive_merging_scheme is not None:
extras.append("""archive_merging_scheme = %s""")
vals.append(archive_merging_scheme)
if features is not None:
extras.append("""archive_feature IN %s""")
# depending on the method calling test retrieve_feature_values
# the features elements can be string or bytes; making sure
# everything is string for SQL
vals.append(
tuple([f.decode('ascii') if isinstance(f, bytes) else f
for f in features]))
sql = """SELECT archive_feature, archive_feature_value
FROM qiita.archive_feature_value
LEFT JOIN qiita.archive_merging_scheme
USING (archive_merging_scheme_id) {0}
ORDER BY archive_merging_scheme, archive_feature"""
if extras:
sql = sql.format('WHERE ' + ' AND '.join(extras))
qdb.sql_connection.TRN.add(sql, vals)
else:
qdb.sql_connection.TRN.add(sql.format(''))
return dict(qdb.sql_connection.TRN.execute_fetchindex())
@classmethod
def insert_features(cls, merging_scheme, features):
r"""Inserts new features to the database based on a given artifact
Parameters
----------
merging_scheme : str
The merging scheme to store these features
features : dict {str: str}
A dictionary of the features and the values to be stored
Returns
-------
dict, feature: value
The inserted new values
"""
cls._inserting_main_steps(merging_scheme, features)
inserted = cls.retrieve_feature_values(
archive_merging_scheme=merging_scheme, features=features.keys())
return inserted